swaminathanmanish commented on code in PR #12220:
URL: https://github.com/apache/pinot/pull/12220#discussion_r1454284211


##########
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java:
##########
@@ -129,40 +135,30 @@ public Map<String, GenericRowFileManager> map()
   private Map<String, GenericRowFileManager> doMap()
       throws Exception {
     Consumer<Object> observer = _processorConfig.getProgressObserver();
-    int totalCount = _recordReaderFileConfigs.size();
-    int count = 1;
+    int count = _totalNumRecordReaders - _recordReaderFileConfigs.size() + 1;
     GenericRow reuse = new GenericRow();
     for (RecordReaderFileConfig recordReaderFileConfig : 
_recordReaderFileConfigs) {
-      RecordReader recordReader = recordReaderFileConfig._recordReader;
-      if (recordReader == null) {
-        // We create and use the recordReader here.
-        try {
-          recordReader =
-              
RecordReaderFactory.getRecordReader(recordReaderFileConfig._fileFormat, 
recordReaderFileConfig._dataFile,
-                  recordReaderFileConfig._fieldsToRead, 
recordReaderFileConfig._recordReaderConfig);
-          mapAndTransformRow(recordReader, reuse, observer, count, totalCount);
-        } finally {
-          if (recordReader != null) {
-            recordReader.close();
-          }
-        }
-      } else {
-        mapAndTransformRow(recordReader, reuse, observer, count, totalCount);
+      RecordReader recordReader = recordReaderFileConfig.getRecordReader();
+      boolean shouldMapperTerminate = mapAndTransformRow(recordReader, reuse, 
observer, count, _totalNumRecordReaders);
+
+      // Terminate the map phase if intermediate file size has crossed the 
threshold.
+      if (shouldMapperTerminate) {
+        break;
       }
+      recordReaderFileConfig.closeRecordReader();
       count++;
     }
 
     for (GenericRowFileManager fileManager : 
_partitionToFileManagerMap.values()) {
       fileManager.closeFileWriter();
     }
-
     return _partitionToFileManagerMap;
   }
 
-  private void mapAndTransformRow(RecordReader recordReader, GenericRow reuse,
+  private boolean mapAndTransformRow(RecordReader recordReader, GenericRow 
reuse,

Review Comment:
   Could you add documentation/comments on what the return type means here/why 
we can stop without finishing a reader ?
   I would also rename this method to something like 
completeMapAndTransformForReader() which returns true if it has completed and 
false if not ? To make it clear that we can return without completing a reader.



##########
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java:
##########
@@ -142,24 +136,62 @@ public List<File> process()
 
   private List<File> doProcess()
       throws Exception {
-    // Map phase
-    LOGGER.info("Beginning map phase on {} record readers", 
_recordReaderFileConfigs.size());
-    SegmentMapper mapper = new SegmentMapper(_recordReaderFileConfigs, 
_customRecordTransformers,
-        _segmentProcessorConfig, _mapperOutputDir);
-    _partitionToFileManagerMap = mapper.map();
-
-    // Check for mapper output files
-    if (_partitionToFileManagerMap.isEmpty()) {
-      LOGGER.info("No partition generated from mapper phase, skipping the 
reducer phase");
-      return Collections.emptyList();
+    List<File> outputSegmentDirs = new ArrayList<>();
+    int numRecordReaders = _recordReaderFileConfigs.size();
+    int nextRecordReaderIndexToBeProcessed = 0;
+
+    while (nextRecordReaderIndexToBeProcessed < numRecordReaders) {
+      // Initialise the mapper. Eliminate the record readers that have been 
processed in the previous iterations.
+      SegmentMapper mapper =
+          new 
SegmentMapper(_recordReaderFileConfigs.subList(nextRecordReaderIndexToBeProcessed,
 numRecordReaders),
+              _customRecordTransformers, _segmentProcessorConfig, 
_mapperOutputDir, numRecordReaders);
+
+      // Map phase.
+      Map<String, GenericRowFileManager> partitionToFileManagerMap = 
doMap(mapper);
+
+      // Check for mapper output files, if no files are generated, skip the 
reducer phase and move on to the next
+      // iteration.
+      if (partitionToFileManagerMap.isEmpty()) {
+        LOGGER.info("No mapper output files generated, skipping reduce phase");
+        nextRecordReaderIndexToBeProcessed = 
getNextRecordReaderIndexToBeProcessed(nextRecordReaderIndexToBeProcessed);
+        continue;
+      }
+
+      // Reduce phase.
+      doReduce(partitionToFileManagerMap);
+
+      // Segment creation phase. Add the created segments to the final list.
+      outputSegmentDirs.addAll(generateSegment(partitionToFileManagerMap));
+
+      // Update next record reader index to be processed.
+      nextRecordReaderIndexToBeProcessed = 
getNextRecordReaderIndexToBeProcessed(nextRecordReaderIndexToBeProcessed);
     }
+    return outputSegmentDirs;
+  }
 
-    // Reduce phase
-    LOGGER.info("Beginning reduce phase on partitions: {}", 
_partitionToFileManagerMap.keySet());
+  private int getNextRecordReaderIndexToBeProcessed(int currentRecordIndex) {
+    for (int i = currentRecordIndex; i < _recordReaderFileConfigs.size(); i++) 
{
+      RecordReaderFileConfig recordReaderFileConfig = 
_recordReaderFileConfigs.get(i);
+      if (!recordReaderFileConfig.isRecordReaderDone()) {
+        return i;
+      }
+    }
+    return _recordReaderFileConfigs.size();
+  }
+
+  private Map<String, GenericRowFileManager> doMap(SegmentMapper mapper)
+      throws Exception {
+    // Map phase

Review Comment:
   Do we need separate method for calling map() ? May be we can record the 
overall time taken by map() and log.info that here. 



##########
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java:
##########
@@ -62,29 +63,30 @@
  */
 public class SegmentMapper {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentMapper.class);
-
-  private List<RecordReaderFileConfig> _recordReaderFileConfigs;
-  private List<RecordTransformer> _customRecordTransformers;
   private final SegmentProcessorConfig _processorConfig;
   private final File _mapperOutputDir;
-
   private final List<FieldSpec> _fieldSpecs;
   private final boolean _includeNullFields;
   private final int _numSortFields;
-
   private final CompositeTransformer _recordTransformer;
   private final TimeHandler _timeHandler;
   private final Partitioner[] _partitioners;
   private final String[] _partitionsBuffer;
   // NOTE: Use TreeMap so that the order is deterministic
   private final Map<String, GenericRowFileManager> _partitionToFileManagerMap 
= new TreeMap<>();
+  private AdaptiveSizeBasedWriter _adaptiveSizeBasedWriter;
+  private List<RecordReaderFileConfig> _recordReaderFileConfigs;
+  private List<RecordTransformer> _customRecordTransformers;
+  private final int _totalNumRecordReaders;

Review Comment:
   This looks a a little weird :). Can we consider the following, to keep this 
logic simple?
   
   We can add 2 types of logs
   1) SegmentProcessFrameWork can log how many readers it passes to the mapper 
for every iteration.
   2) Mapper shows how many readers it completed from the total it got from 
SegmentProcessFrameWork and where it paused. Basically we don't change any of 
the mapper code for logging. 
   
   Mainly we want to know how mappers are progressing and whether its stuck. 
   
   



##########
pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java:
##########
@@ -54,5 +60,41 @@ public RecordReaderFileConfig(RecordReader recordReader) {
     _dataFile = null;
     _fieldsToRead = null;
     _recordReaderConfig = null;
+    _isDelegateReader = true;

Review Comment:
   Could you add comment of what delegate reader (and its ownership) means?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to