aishikbh commented on code in PR #12220: URL: https://github.com/apache/pinot/pull/12220#discussion_r1449236651
########## pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java: ########## @@ -141,28 +146,43 @@ private Map<String, GenericRowFileManager> doMap() RecordReaderFactory.getRecordReader(recordReaderFileConfig._fileFormat, recordReaderFileConfig._dataFile, recordReaderFileConfig._fieldsToRead, recordReaderFileConfig._recordReaderConfig); mapAndTransformRow(recordReader, reuse, observer, count, totalCount); + _recordReaderFileConfigs.get(i)._recordReader = recordReader; } finally { - if (recordReader != null) { + if (recordReader != null && !recordReader.hasNext()) { recordReader.close(); } } } else { + if (!recordReader.hasNext()) { + LOGGER.info("Skipping record reader as it is already processed at index: {}", i); + count++; + continue; + } mapAndTransformRow(recordReader, reuse, observer, count, totalCount); + _recordReaderFileConfigs.get(i)._recordReader = recordReader; + } + if (!_adaptiveSizeBasedWriter.canWrite()) { Review Comment: makes sense. I have added this logic. ########## pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java: ########## @@ -141,28 +146,43 @@ private Map<String, GenericRowFileManager> doMap() RecordReaderFactory.getRecordReader(recordReaderFileConfig._fileFormat, recordReaderFileConfig._dataFile, recordReaderFileConfig._fieldsToRead, recordReaderFileConfig._recordReaderConfig); mapAndTransformRow(recordReader, reuse, observer, count, totalCount); + _recordReaderFileConfigs.get(i)._recordReader = recordReader; } finally { - if (recordReader != null) { + if (recordReader != null && !recordReader.hasNext()) { recordReader.close(); } } } else { + if (!recordReader.hasNext()) { + LOGGER.info("Skipping record reader as it is already processed at index: {}", i); + count++; + continue; + } mapAndTransformRow(recordReader, reuse, observer, count, totalCount); + _recordReaderFileConfigs.get(i)._recordReader = recordReader; Review Comment: removed it in the current iteration. We do not need this. ########## pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java: ########## @@ -129,10 +133,11 @@ public Map<String, GenericRowFileManager> map() private Map<String, GenericRowFileManager> doMap() throws Exception { Consumer<Object> observer = _processorConfig.getProgressObserver(); - int totalCount = _recordReaderFileConfigs.size(); int count = 1; + int totalCount = _recordReaderFileConfigs.size(); GenericRow reuse = new GenericRow(); - for (RecordReaderFileConfig recordReaderFileConfig : _recordReaderFileConfigs) { + for (int i = 0; i < _recordReaderFileConfigs.size(); i++) { + RecordReaderFileConfig recordReaderFileConfig = _recordReaderFileConfigs.get(i); RecordReader recordReader = recordReaderFileConfig._recordReader; Review Comment: I have done that in the current iteration. Having RecordReaderFileConfig handling recordreader related work(allocation, clean up, checks etc.) makes sense. ########## pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java: ########## @@ -141,28 +146,43 @@ private Map<String, GenericRowFileManager> doMap() RecordReaderFactory.getRecordReader(recordReaderFileConfig._fileFormat, recordReaderFileConfig._dataFile, recordReaderFileConfig._fieldsToRead, recordReaderFileConfig._recordReaderConfig); mapAndTransformRow(recordReader, reuse, observer, count, totalCount); + _recordReaderFileConfigs.get(i)._recordReader = recordReader; } finally { - if (recordReader != null) { + if (recordReader != null && !recordReader.hasNext()) { recordReader.close(); } } } else { + if (!recordReader.hasNext()) { Review Comment: Changed the logic around this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org