swaminathanmanish commented on code in PR #12220:
URL: https://github.com/apache/pinot/pull/12220#discussion_r1446795582


##########
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java:
##########
@@ -141,28 +146,43 @@ private Map<String, GenericRowFileManager> doMap()
               
RecordReaderFactory.getRecordReader(recordReaderFileConfig._fileFormat, 
recordReaderFileConfig._dataFile,
                   recordReaderFileConfig._fieldsToRead, 
recordReaderFileConfig._recordReaderConfig);
           mapAndTransformRow(recordReader, reuse, observer, count, totalCount);
+          _recordReaderFileConfigs.get(i)._recordReader = recordReader;
         } finally {
-          if (recordReader != null) {
+          if (recordReader != null && !recordReader.hasNext()) {
             recordReader.close();
           }
         }
       } else {
+        if (!recordReader.hasNext()) {

Review Comment:
   Wouldn't this be covered by the above check where we close the reader as 
well ?



##########
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java:
##########
@@ -141,28 +146,43 @@ private Map<String, GenericRowFileManager> doMap()
               
RecordReaderFactory.getRecordReader(recordReaderFileConfig._fileFormat, 
recordReaderFileConfig._dataFile,
                   recordReaderFileConfig._fieldsToRead, 
recordReaderFileConfig._recordReaderConfig);
           mapAndTransformRow(recordReader, reuse, observer, count, totalCount);
+          _recordReaderFileConfigs.get(i)._recordReader = recordReader;
         } finally {
-          if (recordReader != null) {
+          if (recordReader != null && !recordReader.hasNext()) {

Review Comment:
   Can you put this in a public util inside RecordReaderFileConfig ? Looks like 
we do these types of checks in bunch of other places. 



##########
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java:
##########
@@ -129,10 +133,11 @@ public Map<String, GenericRowFileManager> map()
   private Map<String, GenericRowFileManager> doMap()
       throws Exception {
     Consumer<Object> observer = _processorConfig.getProgressObserver();
-    int totalCount = _recordReaderFileConfigs.size();
     int count = 1;
+    int totalCount = _recordReaderFileConfigs.size();
     GenericRow reuse = new GenericRow();
-    for (RecordReaderFileConfig recordReaderFileConfig : 
_recordReaderFileConfigs) {
+    for (int i = 0; i < _recordReaderFileConfigs.size(); i++) {
+      RecordReaderFileConfig recordReaderFileConfig = 
_recordReaderFileConfigs.get(i);
       RecordReader recordReader = recordReaderFileConfig._recordReader;

Review Comment:
   Can we create a public static method in RecordReaderFileConfig that takes 
care of creating recordReader and assigning it to the recordReader instance 
(for that particular RecordReaderFileConfig). Basically we can keep all the 
allocation, clean up contained within RecordReaderFileConfig, for readability. 



##########
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java:
##########
@@ -141,28 +146,43 @@ private Map<String, GenericRowFileManager> doMap()
               
RecordReaderFactory.getRecordReader(recordReaderFileConfig._fileFormat, 
recordReaderFileConfig._dataFile,
                   recordReaderFileConfig._fieldsToRead, 
recordReaderFileConfig._recordReaderConfig);
           mapAndTransformRow(recordReader, reuse, observer, count, totalCount);
+          _recordReaderFileConfigs.get(i)._recordReader = recordReader;
         } finally {
-          if (recordReader != null) {
+          if (recordReader != null && !recordReader.hasNext()) {
             recordReader.close();
           }
         }
       } else {
+        if (!recordReader.hasNext()) {
+          LOGGER.info("Skipping record reader as it is already processed at 
index: {}", i);
+          count++;
+          continue;
+        }
         mapAndTransformRow(recordReader, reuse, observer, count, totalCount);
+        _recordReaderFileConfigs.get(i)._recordReader = recordReader;
+      }
+      if (!_adaptiveSizeBasedWriter.canWrite()) {

Review Comment:
   Instead of checking for this again,  we can have mapAndTransformRow pass a 
return value to terminate this loop ?



##########
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java:
##########
@@ -141,28 +146,43 @@ private Map<String, GenericRowFileManager> doMap()
               
RecordReaderFactory.getRecordReader(recordReaderFileConfig._fileFormat, 
recordReaderFileConfig._dataFile,
                   recordReaderFileConfig._fieldsToRead, 
recordReaderFileConfig._recordReaderConfig);
           mapAndTransformRow(recordReader, reuse, observer, count, totalCount);
+          _recordReaderFileConfigs.get(i)._recordReader = recordReader;
         } finally {
-          if (recordReader != null) {
+          if (recordReader != null && !recordReader.hasNext()) {
             recordReader.close();
           }
         }
       } else {
+        if (!recordReader.hasNext()) {
+          LOGGER.info("Skipping record reader as it is already processed at 
index: {}", i);
+          count++;
+          continue;
+        }
         mapAndTransformRow(recordReader, reuse, observer, count, totalCount);
+        _recordReaderFileConfigs.get(i)._recordReader = recordReader;

Review Comment:
   Why is this needed? This is the case where RecordReaderFileConfig already 
has the recordReader passed by caller right (not created here) ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to