aishikbh commented on code in PR #12220: URL: https://github.com/apache/pinot/pull/12220#discussion_r1455683353
########## pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java: ########## @@ -62,29 +63,30 @@ */ public class SegmentMapper { private static final Logger LOGGER = LoggerFactory.getLogger(SegmentMapper.class); - - private List<RecordReaderFileConfig> _recordReaderFileConfigs; - private List<RecordTransformer> _customRecordTransformers; private final SegmentProcessorConfig _processorConfig; private final File _mapperOutputDir; - private final List<FieldSpec> _fieldSpecs; private final boolean _includeNullFields; private final int _numSortFields; - private final CompositeTransformer _recordTransformer; private final TimeHandler _timeHandler; private final Partitioner[] _partitioners; private final String[] _partitionsBuffer; // NOTE: Use TreeMap so that the order is deterministic private final Map<String, GenericRowFileManager> _partitionToFileManagerMap = new TreeMap<>(); + private AdaptiveSizeBasedWriter _adaptiveSizeBasedWriter; + private List<RecordReaderFileConfig> _recordReaderFileConfigs; + private List<RecordTransformer> _customRecordTransformers; + private final int _totalNumRecordReaders; Review Comment: Changed up the logging keeping in mind your suggestions. In addition to the points, I have also added a log in `SegmentProcessorFramework` on the progress of RecordReaders globally. ########## pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java: ########## @@ -129,40 +135,30 @@ public Map<String, GenericRowFileManager> map() private Map<String, GenericRowFileManager> doMap() throws Exception { Consumer<Object> observer = _processorConfig.getProgressObserver(); - int totalCount = _recordReaderFileConfigs.size(); - int count = 1; + int count = _totalNumRecordReaders - _recordReaderFileConfigs.size() + 1; GenericRow reuse = new GenericRow(); for (RecordReaderFileConfig recordReaderFileConfig : _recordReaderFileConfigs) { - RecordReader recordReader = recordReaderFileConfig._recordReader; - if (recordReader == null) { - // We create and use the recordReader here. - try { - recordReader = - RecordReaderFactory.getRecordReader(recordReaderFileConfig._fileFormat, recordReaderFileConfig._dataFile, - recordReaderFileConfig._fieldsToRead, recordReaderFileConfig._recordReaderConfig); - mapAndTransformRow(recordReader, reuse, observer, count, totalCount); - } finally { - if (recordReader != null) { - recordReader.close(); - } - } - } else { - mapAndTransformRow(recordReader, reuse, observer, count, totalCount); + RecordReader recordReader = recordReaderFileConfig.getRecordReader(); + boolean shouldMapperTerminate = mapAndTransformRow(recordReader, reuse, observer, count, _totalNumRecordReaders); + + // Terminate the map phase if intermediate file size has crossed the threshold. + if (shouldMapperTerminate) { + break; } + recordReaderFileConfig.closeRecordReader(); count++; } for (GenericRowFileManager fileManager : _partitionToFileManagerMap.values()) { fileManager.closeFileWriter(); } - return _partitionToFileManagerMap; } - private void mapAndTransformRow(RecordReader recordReader, GenericRow reuse, + private boolean mapAndTransformRow(RecordReader recordReader, GenericRow reuse, Consumer<Object> observer, int count, int totalCount) throws Exception { observer.accept(String.format("Doing map phase on data from RecordReader (%d out of %d)", count, totalCount)); - while (recordReader.hasNext()) { + while (recordReader.hasNext() && (_adaptiveSizeBasedWriter.canWrite())) { Review Comment: changed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org