aishikbh commented on code in PR #12220:
URL: https://github.com/apache/pinot/pull/12220#discussion_r1449236651


##########
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java:
##########
@@ -141,28 +146,43 @@ private Map<String, GenericRowFileManager> doMap()
               
RecordReaderFactory.getRecordReader(recordReaderFileConfig._fileFormat, 
recordReaderFileConfig._dataFile,
                   recordReaderFileConfig._fieldsToRead, 
recordReaderFileConfig._recordReaderConfig);
           mapAndTransformRow(recordReader, reuse, observer, count, totalCount);
+          _recordReaderFileConfigs.get(i)._recordReader = recordReader;
         } finally {
-          if (recordReader != null) {
+          if (recordReader != null && !recordReader.hasNext()) {
             recordReader.close();
           }
         }
       } else {
+        if (!recordReader.hasNext()) {
+          LOGGER.info("Skipping record reader as it is already processed at 
index: {}", i);
+          count++;
+          continue;
+        }
         mapAndTransformRow(recordReader, reuse, observer, count, totalCount);
+        _recordReaderFileConfigs.get(i)._recordReader = recordReader;
+      }
+      if (!_adaptiveSizeBasedWriter.canWrite()) {

Review Comment:
   makes sense. I have added this logic.



##########
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java:
##########
@@ -141,28 +146,43 @@ private Map<String, GenericRowFileManager> doMap()
               
RecordReaderFactory.getRecordReader(recordReaderFileConfig._fileFormat, 
recordReaderFileConfig._dataFile,
                   recordReaderFileConfig._fieldsToRead, 
recordReaderFileConfig._recordReaderConfig);
           mapAndTransformRow(recordReader, reuse, observer, count, totalCount);
+          _recordReaderFileConfigs.get(i)._recordReader = recordReader;
         } finally {
-          if (recordReader != null) {
+          if (recordReader != null && !recordReader.hasNext()) {
             recordReader.close();
           }
         }
       } else {
+        if (!recordReader.hasNext()) {
+          LOGGER.info("Skipping record reader as it is already processed at 
index: {}", i);
+          count++;
+          continue;
+        }
         mapAndTransformRow(recordReader, reuse, observer, count, totalCount);
+        _recordReaderFileConfigs.get(i)._recordReader = recordReader;

Review Comment:
   removed it in the current iteration. We do not need this.



##########
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java:
##########
@@ -129,10 +133,11 @@ public Map<String, GenericRowFileManager> map()
   private Map<String, GenericRowFileManager> doMap()
       throws Exception {
     Consumer<Object> observer = _processorConfig.getProgressObserver();
-    int totalCount = _recordReaderFileConfigs.size();
     int count = 1;
+    int totalCount = _recordReaderFileConfigs.size();
     GenericRow reuse = new GenericRow();
-    for (RecordReaderFileConfig recordReaderFileConfig : 
_recordReaderFileConfigs) {
+    for (int i = 0; i < _recordReaderFileConfigs.size(); i++) {
+      RecordReaderFileConfig recordReaderFileConfig = 
_recordReaderFileConfigs.get(i);
       RecordReader recordReader = recordReaderFileConfig._recordReader;

Review Comment:
   I have done that in the current iteration. Having RecordReaderFileConfig 
handling recordreader related work(allocation, clean up, checks etc.) makes 
sense.



##########
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java:
##########
@@ -141,28 +146,43 @@ private Map<String, GenericRowFileManager> doMap()
               
RecordReaderFactory.getRecordReader(recordReaderFileConfig._fileFormat, 
recordReaderFileConfig._dataFile,
                   recordReaderFileConfig._fieldsToRead, 
recordReaderFileConfig._recordReaderConfig);
           mapAndTransformRow(recordReader, reuse, observer, count, totalCount);
+          _recordReaderFileConfigs.get(i)._recordReader = recordReader;
         } finally {
-          if (recordReader != null) {
+          if (recordReader != null && !recordReader.hasNext()) {
             recordReader.close();
           }
         }
       } else {
+        if (!recordReader.hasNext()) {

Review Comment:
   Changed the logic around this. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to