snleee commented on code in PR #12220:
URL: https://github.com/apache/pinot/pull/12220#discussion_r1454376640
##########
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java:
##########
@@ -129,40 +135,30 @@ public Map<String, GenericRowFileManager> map()
private Map<String, GenericRowFileManager> doMap()
throws Exception {
Consumer<Object> observer = _processorConfig.getProgressObserver();
- int totalCount = _recordReaderFileConfigs.size();
- int count = 1;
+ int count = _totalNumRecordReaders - _recordReaderFileConfigs.size() + 1;
GenericRow reuse = new GenericRow();
for (RecordReaderFileConfig recordReaderFileConfig :
_recordReaderFileConfigs) {
- RecordReader recordReader = recordReaderFileConfig._recordReader;
- if (recordReader == null) {
- // We create and use the recordReader here.
- try {
- recordReader =
-
RecordReaderFactory.getRecordReader(recordReaderFileConfig._fileFormat,
recordReaderFileConfig._dataFile,
- recordReaderFileConfig._fieldsToRead,
recordReaderFileConfig._recordReaderConfig);
- mapAndTransformRow(recordReader, reuse, observer, count, totalCount);
- } finally {
- if (recordReader != null) {
- recordReader.close();
- }
- }
- } else {
- mapAndTransformRow(recordReader, reuse, observer, count, totalCount);
+ RecordReader recordReader = recordReaderFileConfig.getRecordReader();
+ boolean shouldMapperTerminate = mapAndTransformRow(recordReader, reuse,
observer, count, _totalNumRecordReaders);
+
+ // Terminate the map phase if intermediate file size has crossed the
threshold.
+ if (shouldMapperTerminate) {
+ break;
}
+ recordReaderFileConfig.closeRecordReader();
count++;
}
for (GenericRowFileManager fileManager :
_partitionToFileManagerMap.values()) {
fileManager.closeFileWriter();
}
-
return _partitionToFileManagerMap;
}
- private void mapAndTransformRow(RecordReader recordReader, GenericRow reuse,
+ private boolean mapAndTransformRow(RecordReader recordReader, GenericRow
reuse,
Consumer<Object> observer, int count, int totalCount) throws Exception {
observer.accept(String.format("Doing map phase on data from RecordReader
(%d out of %d)", count, totalCount));
- while (recordReader.hasNext()) {
+ while (recordReader.hasNext() && (_adaptiveSizeBasedWriter.canWrite())) {
Review Comment:
(nit)
`recordReader.hasNext() && (_adaptiveSizeBasedWriter.canWrite())` ->
`recordReader.hasNext() && _adaptiveSizeBasedWriter.canWrite()`
##########
pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java:
##########
@@ -34,8 +34,11 @@ public class RecordReaderFileConfig {
public final File _dataFile;
public final Set<String> _fieldsToRead;
public final RecordReaderConfig _recordReaderConfig;
+ private final boolean _isDelegateReader;
// Record Readers created/passed from clients.
- public final RecordReader _recordReader;
+ public RecordReader _recordReader;
+ private boolean _isRecordReaderInitialized;
Review Comment:
Why do we need to track whether the record reader is initialized/closed?
Is this for testing? If that's the case, please indicate in the comment.
##########
pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java:
##########
@@ -54,5 +60,41 @@ public RecordReaderFileConfig(RecordReader recordReader) {
_dataFile = null;
_fieldsToRead = null;
_recordReaderConfig = null;
+ _isDelegateReader = true;
Review Comment:
+1
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]