swaminathanmanish commented on code in PR #12220: URL: https://github.com/apache/pinot/pull/12220#discussion_r1446795582
########## pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java: ########## @@ -141,28 +146,43 @@ private Map<String, GenericRowFileManager> doMap() RecordReaderFactory.getRecordReader(recordReaderFileConfig._fileFormat, recordReaderFileConfig._dataFile, recordReaderFileConfig._fieldsToRead, recordReaderFileConfig._recordReaderConfig); mapAndTransformRow(recordReader, reuse, observer, count, totalCount); + _recordReaderFileConfigs.get(i)._recordReader = recordReader; } finally { - if (recordReader != null) { + if (recordReader != null && !recordReader.hasNext()) { recordReader.close(); } } } else { + if (!recordReader.hasNext()) { Review Comment: Wouldn't this be covered by the above check where we close the reader as well ? ########## pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java: ########## @@ -141,28 +146,43 @@ private Map<String, GenericRowFileManager> doMap() RecordReaderFactory.getRecordReader(recordReaderFileConfig._fileFormat, recordReaderFileConfig._dataFile, recordReaderFileConfig._fieldsToRead, recordReaderFileConfig._recordReaderConfig); mapAndTransformRow(recordReader, reuse, observer, count, totalCount); + _recordReaderFileConfigs.get(i)._recordReader = recordReader; } finally { - if (recordReader != null) { + if (recordReader != null && !recordReader.hasNext()) { Review Comment: Can you put this in a public util inside RecordReaderFileConfig ? Looks like we do these types of checks in bunch of other places. ########## pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java: ########## @@ -129,10 +133,11 @@ public Map<String, GenericRowFileManager> map() private Map<String, GenericRowFileManager> doMap() throws Exception { Consumer<Object> observer = _processorConfig.getProgressObserver(); - int totalCount = _recordReaderFileConfigs.size(); int count = 1; + int totalCount = _recordReaderFileConfigs.size(); GenericRow reuse = new GenericRow(); - for (RecordReaderFileConfig recordReaderFileConfig : _recordReaderFileConfigs) { + for (int i = 0; i < _recordReaderFileConfigs.size(); i++) { + RecordReaderFileConfig recordReaderFileConfig = _recordReaderFileConfigs.get(i); RecordReader recordReader = recordReaderFileConfig._recordReader; Review Comment: Can we create a public static method in RecordReaderFileConfig that takes care of creating recordReader and assigning it to the recordReader instance (for that particular RecordReaderFileConfig). Basically we can keep all the allocation, clean up contained within RecordReaderFileConfig, for readability. ########## pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java: ########## @@ -141,28 +146,43 @@ private Map<String, GenericRowFileManager> doMap() RecordReaderFactory.getRecordReader(recordReaderFileConfig._fileFormat, recordReaderFileConfig._dataFile, recordReaderFileConfig._fieldsToRead, recordReaderFileConfig._recordReaderConfig); mapAndTransformRow(recordReader, reuse, observer, count, totalCount); + _recordReaderFileConfigs.get(i)._recordReader = recordReader; } finally { - if (recordReader != null) { + if (recordReader != null && !recordReader.hasNext()) { recordReader.close(); } } } else { + if (!recordReader.hasNext()) { + LOGGER.info("Skipping record reader as it is already processed at index: {}", i); + count++; + continue; + } mapAndTransformRow(recordReader, reuse, observer, count, totalCount); + _recordReaderFileConfigs.get(i)._recordReader = recordReader; + } + if (!_adaptiveSizeBasedWriter.canWrite()) { Review Comment: Instead of checking for this again, we can have mapAndTransformRow pass a return value to terminate this loop ? ########## pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java: ########## @@ -141,28 +146,43 @@ private Map<String, GenericRowFileManager> doMap() RecordReaderFactory.getRecordReader(recordReaderFileConfig._fileFormat, recordReaderFileConfig._dataFile, recordReaderFileConfig._fieldsToRead, recordReaderFileConfig._recordReaderConfig); mapAndTransformRow(recordReader, reuse, observer, count, totalCount); + _recordReaderFileConfigs.get(i)._recordReader = recordReader; } finally { - if (recordReader != null) { + if (recordReader != null && !recordReader.hasNext()) { recordReader.close(); } } } else { + if (!recordReader.hasNext()) { + LOGGER.info("Skipping record reader as it is already processed at index: {}", i); + count++; + continue; + } mapAndTransformRow(recordReader, reuse, observer, count, totalCount); + _recordReaderFileConfigs.get(i)._recordReader = recordReader; Review Comment: Why is this needed? This is the case where RecordReaderFileConfig already has the recordReader passed by caller right (not created here) ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org