aishikbh commented on code in PR #12220:
URL: https://github.com/apache/pinot/pull/12220#discussion_r1449031738


##########
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java:
##########
@@ -142,24 +136,79 @@ public List<File> process()
 
   private List<File> doProcess()
       throws Exception {
-    // Map phase
-    LOGGER.info("Beginning map phase on {} record readers", 
_recordReaderFileConfigs.size());
-    SegmentMapper mapper = new SegmentMapper(_recordReaderFileConfigs, 
_customRecordTransformers,
-        _segmentProcessorConfig, _mapperOutputDir);
-    _partitionToFileManagerMap = mapper.map();
-
-    // Check for mapper output files
-    if (_partitionToFileManagerMap.isEmpty()) {
-      LOGGER.info("No partition generated from mapper phase, skipping the 
reducer phase");
-      return Collections.emptyList();
+    List<File> outputSegmentDirs = new ArrayList<>();
+    int numRecordReaders = _recordReaderFileConfigs.size();
+    int nextRecordReaderIndexToBeProcessed = 0;
+
+    while (nextRecordReaderIndexToBeProcessed < numRecordReaders) {
+      // Initialise the mapper.
+      SegmentMapper mapper =
+          new SegmentMapper(_recordReaderFileConfigs, 
_customRecordTransformers, _segmentProcessorConfig,
+              _mapperOutputDir);
+
+      // Map phase.
+      Map<String, GenericRowFileManager> partitionToFileManagerMap = 
doMap(mapper);
+
+      // Check for mapper output files, if no files are generated, skip the 
reducer phase and move on to the next
+      // iteration.
+      if (partitionToFileManagerMap.isEmpty()) {
+        nextRecordReaderIndexToBeProcessed = 
getNextRecordReaderIndexToBeProcessed(nextRecordReaderIndexToBeProcessed);
+        continue;
+      }
+
+      // Reduce phase.
+      doReduce(partitionToFileManagerMap);
+
+      // Segment creation phase. Add the created segments to the final list.
+      outputSegmentDirs.addAll(generateSegment(partitionToFileManagerMap));
+
+      // Update next record reader index to be processed.
+      nextRecordReaderIndexToBeProcessed = 
getNextRecordReaderIndexToBeProcessed(nextRecordReaderIndexToBeProcessed);
+    }
+    return outputSegmentDirs;
+  }
+
+  private int getNextRecordReaderIndexToBeProcessed(int currentRecordIndex) {
+    for (int i = currentRecordIndex; i < _recordReaderFileConfigs.size(); i++) 
{
+      RecordReader recordReader = 
_recordReaderFileConfigs.get(i)._recordReader;
+      if (recordReader == null || recordReader.hasNext()) {
+        return i;
+      }
     }
+    return _recordReaderFileConfigs.size();
+  }
 
-    // Reduce phase
-    LOGGER.info("Beginning reduce phase on partitions: {}", 
_partitionToFileManagerMap.keySet());
+  private Map<String, GenericRowFileManager> doMap(SegmentMapper mapper)
+      throws Exception {
+    Map<String, GenericRowFileManager> partitionToFileManagerMap = new 
HashMap<>();
+    try {
+      // Map phase
+      partitionToFileManagerMap = mapper.map();
+
+      // Check for mapper output files
+      if (partitionToFileManagerMap.isEmpty()) {

Review Comment:
   makes sense. Actually the whole block is not required along with the catch 
statement as we are cleaning up the file managers in map phase itself. changed 
accordingly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to