swaminathanmanish commented on code in PR #12220:
URL: https://github.com/apache/pinot/pull/12220#discussion_r1451818099


##########
pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java:
##########
@@ -104,6 +104,7 @@ public static abstract class MergeTask {
     // Segment config
     public static final String MAX_NUM_RECORDS_PER_TASK_KEY = 
"maxNumRecordsPerTask";
     public static final String MAX_NUM_RECORDS_PER_SEGMENT_KEY = 
"maxNumRecordsPerSegment";
+    public static final String INTERMEDIATE_FILE_SIZE_KEY = 
"intermediateFileSizeThresholdInBytes";

Review Comment:
   Could we name this SEGMENT_MAPPER_FILE_SIZE_IN_BYTES ?



##########
pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java:
##########
@@ -34,8 +34,10 @@ public class RecordReaderFileConfig {
   public final File _dataFile;
   public final Set<String> _fieldsToRead;
   public final RecordReaderConfig _recordReaderConfig;
+  private final boolean _canModifyRecordReader;

Review Comment:
   Can you explain what this means ?  May be we can name this _isDelegateReader 
to indicate that the client has passed in its own recordReader ? 



##########
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java:
##########
@@ -142,24 +136,62 @@ public List<File> process()
 
   private List<File> doProcess()
       throws Exception {
-    // Map phase
-    LOGGER.info("Beginning map phase on {} record readers", 
_recordReaderFileConfigs.size());
-    SegmentMapper mapper = new SegmentMapper(_recordReaderFileConfigs, 
_customRecordTransformers,
-        _segmentProcessorConfig, _mapperOutputDir);
-    _partitionToFileManagerMap = mapper.map();
-
-    // Check for mapper output files
-    if (_partitionToFileManagerMap.isEmpty()) {
-      LOGGER.info("No partition generated from mapper phase, skipping the 
reducer phase");
-      return Collections.emptyList();
+    List<File> outputSegmentDirs = new ArrayList<>();
+    int numRecordReaders = _recordReaderFileConfigs.size();
+    int nextRecordReaderIndexToBeProcessed = 0;
+
+    while (nextRecordReaderIndexToBeProcessed < numRecordReaders) {
+      // Initialise the mapper. Eliminate the record readers that have been 
processed in the previous iterations.
+      SegmentMapper mapper =
+          new 
SegmentMapper(_recordReaderFileConfigs.subList(nextRecordReaderIndexToBeProcessed,
 numRecordReaders),
+              _customRecordTransformers, _segmentProcessorConfig, 
_mapperOutputDir);
+
+      // Map phase.
+      Map<String, GenericRowFileManager> partitionToFileManagerMap = 
doMap(mapper);
+
+      // Check for mapper output files, if no files are generated, skip the 
reducer phase and move on to the next
+      // iteration.
+      if (partitionToFileManagerMap.isEmpty()) {
+        LOGGER.info("No mapper output files generated, skipping reduce phase");
+        nextRecordReaderIndexToBeProcessed = 
getNextRecordReaderIndexToBeProcessed(nextRecordReaderIndexToBeProcessed);
+        continue;
+      }
+
+      // Reduce phase.
+      doReduce(partitionToFileManagerMap);
+
+      // Segment creation phase. Add the created segments to the final list.
+      outputSegmentDirs.addAll(generateSegment(partitionToFileManagerMap));
+

Review Comment:
   Can we add log.info on how many record readers were processed in this 
iteration and may be even number of bytes process (in SegmentMapper) ?



##########
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java:
##########
@@ -124,14 +124,8 @@ public List<File> process()
     try {
       return doProcess();
     } catch (Exception e) {
-      // Cleaning up file managers no matter they are from map phase or reduce 
phase. For those from reduce phase, the
-      // reducers should have cleaned up the corresponding file managers from 
map phase already.
-      if (_partitionToFileManagerMap != null) {
-        for (GenericRowFileManager fileManager : 
_partitionToFileManagerMap.values()) {
-          fileManager.cleanUp();
-        }
-      }
-      // Cleaning up output dir as processing has failed.
+      // Cleaning up output dir as processing has failed. file managers left 
from map or reduce phase will be cleaned

Review Comment:
   Do we have tests to verify that we clean up mapper files on failure ? 



##########
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentConfig.java:
##########
@@ -31,22 +31,28 @@
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class SegmentConfig {
   public static final int DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT = 5_000_000;
+  public static final long DEFAULT_INTERMEDIATE_FILE_SIZE_THRESHOLD = 
Long.MAX_VALUE;
 
   private final int _maxNumRecordsPerSegment;
   private final String _segmentNamePrefix;
   private final String _segmentNamePostfix;
   private final String _fixedSegmentName;
+  private final long _intermediateFileSizeThresholdInBytes;

Review Comment:
   segmentMapperFileSizeThresholdInBytes ?



##########
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java:
##########
@@ -107,62 +106,62 @@ public SegmentMapper(List<RecordReaderFileConfig> 
recordReaderFileConfigs,
     LOGGER.info("Initialized mapper with {} record readers, output dir: {}, 
timeHandler: {}, partitioners: {}",
         _recordReaderFileConfigs.size(), _mapperOutputDir, 
_timeHandler.getClass(),
         Arrays.stream(_partitioners).map(p -> 
p.getClass().toString()).collect(Collectors.joining(",")));
+
+    // initialize adaptive writer.
+    _adaptiveSizeBasedWriter =
+        new 
AdaptiveSizeBasedWriter(processorConfig.getSegmentConfig().getIntermediateFileSizeThreshold());
   }
 
   /**
    * Reads the input records and generates partitioned generic row files into 
the mapper output directory.
    * Records for each partition are put into a directory of the partition name 
within the mapper output directory.
    */
-  public Map<String, GenericRowFileManager> map()
+  public Map<String, GenericRowFileManager> map(int totalRecordReaderSize)

Review Comment:
   Is there a way to not change this interface? Looks like SegmentMapper is a 
public interface and we are breaking it.  Why do we need the the totalCount?
   



##########
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentConfig.java:
##########
@@ -31,22 +31,28 @@
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class SegmentConfig {
   public static final int DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT = 5_000_000;
+  public static final long DEFAULT_INTERMEDIATE_FILE_SIZE_THRESHOLD = 
Long.MAX_VALUE;

Review Comment:
   DEFAULT_SEGMENT_MAPPER_FILE_SIZE_IN_BYTES ?



##########
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java:
##########
@@ -142,24 +136,62 @@ public List<File> process()
 
   private List<File> doProcess()
       throws Exception {
-    // Map phase
-    LOGGER.info("Beginning map phase on {} record readers", 
_recordReaderFileConfigs.size());
-    SegmentMapper mapper = new SegmentMapper(_recordReaderFileConfigs, 
_customRecordTransformers,
-        _segmentProcessorConfig, _mapperOutputDir);
-    _partitionToFileManagerMap = mapper.map();
-
-    // Check for mapper output files
-    if (_partitionToFileManagerMap.isEmpty()) {
-      LOGGER.info("No partition generated from mapper phase, skipping the 
reducer phase");
-      return Collections.emptyList();
+    List<File> outputSegmentDirs = new ArrayList<>();
+    int numRecordReaders = _recordReaderFileConfigs.size();
+    int nextRecordReaderIndexToBeProcessed = 0;
+
+    while (nextRecordReaderIndexToBeProcessed < numRecordReaders) {
+      // Initialise the mapper. Eliminate the record readers that have been 
processed in the previous iterations.
+      SegmentMapper mapper =
+          new 
SegmentMapper(_recordReaderFileConfigs.subList(nextRecordReaderIndexToBeProcessed,
 numRecordReaders),
+              _customRecordTransformers, _segmentProcessorConfig, 
_mapperOutputDir);
+
+      // Map phase.
+      Map<String, GenericRowFileManager> partitionToFileManagerMap = 
doMap(mapper);
+
+      // Check for mapper output files, if no files are generated, skip the 
reducer phase and move on to the next
+      // iteration.
+      if (partitionToFileManagerMap.isEmpty()) {
+        LOGGER.info("No mapper output files generated, skipping reduce phase");
+        nextRecordReaderIndexToBeProcessed = 
getNextRecordReaderIndexToBeProcessed(nextRecordReaderIndexToBeProcessed);
+        continue;
+      }
+
+      // Reduce phase.
+      doReduce(partitionToFileManagerMap);
+
+      // Segment creation phase. Add the created segments to the final list.
+      outputSegmentDirs.addAll(generateSegment(partitionToFileManagerMap));
+
+      // Update next record reader index to be processed.
+      nextRecordReaderIndexToBeProcessed = 
getNextRecordReaderIndexToBeProcessed(nextRecordReaderIndexToBeProcessed);
     }
+    return outputSegmentDirs;
+  }
 
-    // Reduce phase
-    LOGGER.info("Beginning reduce phase on partitions: {}", 
_partitionToFileManagerMap.keySet());
+  private int getNextRecordReaderIndexToBeProcessed(int currentRecordIndex) {
+    for (int i = currentRecordIndex; i < _recordReaderFileConfigs.size(); i++) 
{
+      RecordReaderFileConfig recordReaderFileConfig = 
_recordReaderFileConfigs.get(i);
+      if (!recordReaderFileConfig.isRecordReaderInitialized() || 
!recordReaderFileConfig.isRecordReaderDone()) {

Review Comment:
   There can be only one open reader 
(!recordReaderFileConfig.isRecordReaderDone()) right? i.e it has been created 
and hasNext(). If there's a way to add validation for that, will be useful to 
catch issues early. 



##########
pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java:
##########
@@ -581,6 +581,51 @@ public void testMultipleSegments()
     rewindRecordReaders(_multipleSegments);
   }
 
+  @Test

Review Comment:
   Could you cover following scenarios
   1. Default case where there's no limit
   2. Multiple readers where there's limit
   3. Single reader where there's limit
   4. Delegate Record Readers Vs Regular record readers (created in 
RecordReaderFileConfig), to ensure that we handle delegate readers correctly. 
   5. Case where there's limit and there's failure, to make sure that things 
are cleaned up (eg: intermediate mapper files) -> Not sure if we have test for 
this but will be good to verify, at least manually. 
   6. Ensure that we allocate/free reader and there's no leak. 
   
   



##########
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java:
##########
@@ -141,28 +146,43 @@ private Map<String, GenericRowFileManager> doMap()
               
RecordReaderFactory.getRecordReader(recordReaderFileConfig._fileFormat, 
recordReaderFileConfig._dataFile,
                   recordReaderFileConfig._fieldsToRead, 
recordReaderFileConfig._recordReaderConfig);
           mapAndTransformRow(recordReader, reuse, observer, count, totalCount);
+          _recordReaderFileConfigs.get(i)._recordReader = recordReader;
         } finally {
-          if (recordReader != null) {
+          if (recordReader != null && !recordReader.hasNext()) {
             recordReader.close();
           }
         }
       } else {
+        if (!recordReader.hasNext()) {
+          LOGGER.info("Skipping record reader as it is already processed at 
index: {}", i);
+          count++;
+          continue;
+        }
         mapAndTransformRow(recordReader, reuse, observer, count, totalCount);
+        _recordReaderFileConfigs.get(i)._recordReader = recordReader;
+      }
+      if (!_adaptiveSizeBasedWriter.canWrite()) {

Review Comment:
   Could you check if you can keep this constraints logic where its needed (in 
mapAndTransformRow) and add the corresponding logs there itself. The method can 
return boolean to break this loop?
   
   In the log.info, please how many readers we completed, how many pending. Or 
you can add this log in SegmentProcessorFramework where you have the overall 
counts?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to