aishikbh commented on code in PR #12220: URL: https://github.com/apache/pinot/pull/12220#discussion_r1455727311
########## pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java: ########## @@ -129,40 +135,30 @@ public Map<String, GenericRowFileManager> map() private Map<String, GenericRowFileManager> doMap() throws Exception { Consumer<Object> observer = _processorConfig.getProgressObserver(); - int totalCount = _recordReaderFileConfigs.size(); - int count = 1; + int count = _totalNumRecordReaders - _recordReaderFileConfigs.size() + 1; GenericRow reuse = new GenericRow(); for (RecordReaderFileConfig recordReaderFileConfig : _recordReaderFileConfigs) { - RecordReader recordReader = recordReaderFileConfig._recordReader; - if (recordReader == null) { - // We create and use the recordReader here. - try { - recordReader = - RecordReaderFactory.getRecordReader(recordReaderFileConfig._fileFormat, recordReaderFileConfig._dataFile, - recordReaderFileConfig._fieldsToRead, recordReaderFileConfig._recordReaderConfig); - mapAndTransformRow(recordReader, reuse, observer, count, totalCount); - } finally { - if (recordReader != null) { - recordReader.close(); - } - } - } else { - mapAndTransformRow(recordReader, reuse, observer, count, totalCount); + RecordReader recordReader = recordReaderFileConfig.getRecordReader(); + boolean shouldMapperTerminate = mapAndTransformRow(recordReader, reuse, observer, count, _totalNumRecordReaders); + + // Terminate the map phase if intermediate file size has crossed the threshold. + if (shouldMapperTerminate) { Review Comment: I have added the comment to explain it a bit better. The validation to make sure all the recordReaders are closed will only be applicable to non-delegate RecordReaders because for the delegate recordReaders we close them after `SegmentFrameworkProcessor` goes out of scope example [here](https://github.com/apache/pinot/blob/21f3d283d42e1b4f5b16f3ef84549eb34b8b7031/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutor.java#L108C5-L111C8) and [here](https://github.com/apache/pinot/blob/21f3d283d42e1b4f5b16f3ef84549eb34b8b7031/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java#L174C4-L177C8) I can add the validation only for non-delegate ones. Just a question : isn't the fact that we will always hit `closeRecordReader` in map phase enough? I have also added a test to make sure that non-delegate recordreaders are always closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org