aishikbh commented on code in PR #12220:
URL: https://github.com/apache/pinot/pull/12220#discussion_r1455676485


##########
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java:
##########
@@ -129,40 +135,30 @@ public Map<String, GenericRowFileManager> map()
   private Map<String, GenericRowFileManager> doMap()
       throws Exception {
     Consumer<Object> observer = _processorConfig.getProgressObserver();
-    int totalCount = _recordReaderFileConfigs.size();
-    int count = 1;
+    int count = _totalNumRecordReaders - _recordReaderFileConfigs.size() + 1;
     GenericRow reuse = new GenericRow();
     for (RecordReaderFileConfig recordReaderFileConfig : 
_recordReaderFileConfigs) {
-      RecordReader recordReader = recordReaderFileConfig._recordReader;
-      if (recordReader == null) {
-        // We create and use the recordReader here.
-        try {
-          recordReader =
-              
RecordReaderFactory.getRecordReader(recordReaderFileConfig._fileFormat, 
recordReaderFileConfig._dataFile,
-                  recordReaderFileConfig._fieldsToRead, 
recordReaderFileConfig._recordReaderConfig);
-          mapAndTransformRow(recordReader, reuse, observer, count, totalCount);
-        } finally {
-          if (recordReader != null) {
-            recordReader.close();
-          }
-        }
-      } else {
-        mapAndTransformRow(recordReader, reuse, observer, count, totalCount);
+      RecordReader recordReader = recordReaderFileConfig.getRecordReader();
+      boolean shouldMapperTerminate = mapAndTransformRow(recordReader, reuse, 
observer, count, _totalNumRecordReaders);
+
+      // Terminate the map phase if intermediate file size has crossed the 
threshold.
+      if (shouldMapperTerminate) {
+        break;
       }
+      recordReaderFileConfig.closeRecordReader();
       count++;
     }
 
     for (GenericRowFileManager fileManager : 
_partitionToFileManagerMap.values()) {
       fileManager.closeFileWriter();
     }
-
     return _partitionToFileManagerMap;
   }
 
-  private void mapAndTransformRow(RecordReader recordReader, GenericRow reuse,
+  private boolean mapAndTransformRow(RecordReader recordReader, GenericRow 
reuse,

Review Comment:
   added comments to clarify the logic more clearly. Renamed the method and 
added comments on what it does as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to