This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 91ffcc759f Reduce Disk Footprint for Segment Processor Framework to 
Avoid Out of Disk Issues (#12220)
91ffcc759f is described below

commit 91ffcc759f4f178cbfb6b9a1541069734b4c0031
Author: aishikbh <ais...@startree.ai>
AuthorDate: Wed Jan 24 11:17:29 2024 +0530

    Reduce Disk Footprint for Segment Processor Framework to Avoid Out of Disk 
Issues (#12220)
    
    * modularized map, reduce and segment generation phase.
    
    * added interfaces and classes
    
    * added staetefulrecordreaderfileconfig instead of recordreaderfileconfig
    
    * save Progress
    
    * major changes, working tests.
    
    * more changes
    
    * made the number of bytes to be configurable
    
    * added global sequence id to segments since it is modularised out, made 
the intermediate file size configurable through segmentsConfig.
    
    * remove logic for global segment IDs.
    
    * used recordreaderconfig instead of statefulrecordreaderconfig, changed 
looping conditions, made recordreader of recordreaderfileconfig modifiable.
    
    * added proper sequence ids for segments.
    
    * ingestion working without errors. Should be good in sunny day scenarios.
    
    * remove typo from testing
    
    * replace _partitionToFilemanagerMap with local variable asthe mapping is 
happening in multiple iterations. Change exception handling for mapper.
    
    * added precondition check for intermediateFileSizeThresholdInBytes and 
inferred observer from segmentsConfig
    
    * fix typo and add comment.
    
    * removed redundant method.
    
    * config related changes, fix unit tests.
    
    * remove redundant flag.
    
    * replaced size based constraint checker with size based constraint writer.
    
    * consolidated all the constraint checks into one place.
    
    * add test to validate segmentprocessorframework
    
    * decoupled recordreader closing logic from segment mapper
    
    * addressing comments related to segmentprocessorframework.
    
    * delegated initialisation and check for recordreader being done with all 
the records to recordreaderfileconfig.
    
    * simplify logic for termination of map phase.
    
    * change variable names for better readability.
    
    * Added tests and minor changes in logic
    
    - Added further tests for SegmentProcessorFramework.
    - Removed redundant checks for SegmentProcessorFramework.
    - Detected an edge case and added an additional statement in
      SegmentMapper.
    
    * isolated the termination logs to mapAndTransformRow.
    
    * Keep SegmentMapper public interface unchanged
    
    - Pass the total count of RecordReaders through SegmentMapper
      Constructor instead of map() arguments.
    
    * addressing review comments.
    
    * cleaned up logging logic.
    
    * Channged Logs
    
    * kept the logging for default scenario same as before
    * conditionally logged the progress between iterations in case the
      feature is enabled.
    
    * modify tests
    
    - PR #12290 touched dimBaseballTeams.csv on which the tests depends on. 
Modified the test to reflect that.
    - Changed import statements (addressed comment).
---
 .../apache/pinot/core/common/MinionConstants.java  |   1 +
 .../processing/framework/SegmentConfig.java        |  30 ++-
 .../framework/SegmentProcessorFramework.java       | 139 ++++++++++----
 .../genericrow/AdaptiveConstraintsWriter.java      |  33 ++++
 .../genericrow/AdaptiveSizeBasedWriter.java        |  51 ++++++
 .../segment/processing/genericrow/FileWriter.java  |  30 +++
 .../genericrow/GenericRowFileWriter.java           |  11 +-
 .../segment/processing/mapper/SegmentMapper.java   |  70 ++++---
 .../framework/SegmentProcessorFrameworkTest.java   | 203 +++++++++++++++++++++
 .../pinot/plugin/minion/tasks/MergeTaskUtils.java  |   4 +
 .../plugin/minion/tasks/MergeTaskUtilsTest.java    |   6 +-
 .../spi/data/readers/RecordReaderFileConfig.java   |  51 +++++-
 12 files changed, 561 insertions(+), 68 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java 
b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index 8e4afd465a..4193984559 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -104,6 +104,7 @@ public class MinionConstants {
     // Segment config
     public static final String MAX_NUM_RECORDS_PER_TASK_KEY = 
"maxNumRecordsPerTask";
     public static final String MAX_NUM_RECORDS_PER_SEGMENT_KEY = 
"maxNumRecordsPerSegment";
+    public static final String SEGMENT_MAPPER_FILE_SIZE_IN_BYTES = 
"segmentMapperFileSizeThresholdInBytes";
     public static final String MAX_NUM_PARALLEL_BUCKETS = 
"maxNumParallelBuckets";
     public static final String SEGMENT_NAME_PREFIX_KEY = "segmentNamePrefix";
     public static final String SEGMENT_NAME_POSTFIX_KEY = "segmentNamePostfix";
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentConfig.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentConfig.java
index b3302fafc0..95191b658b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentConfig.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentConfig.java
@@ -31,22 +31,28 @@ import javax.annotation.Nullable;
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class SegmentConfig {
   public static final int DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT = 5_000_000;
+  public static final long DEFAULT_SEGMENT_MAPPER_FILE_SIZE_IN_BYTES = 
Long.MAX_VALUE;
 
   private final int _maxNumRecordsPerSegment;
   private final String _segmentNamePrefix;
   private final String _segmentNamePostfix;
   private final String _fixedSegmentName;
+  private final long _segmentMapperFileSizeThresholdInBytes;
 
   @JsonCreator
   private SegmentConfig(@JsonProperty(value = "maxNumRecordsPerSegment", 
required = true) int maxNumRecordsPerSegment,
       @JsonProperty("segmentNamePrefix") @Nullable String segmentNamePrefix,
       @JsonProperty("segmentNamePostfix") @Nullable String segmentNamePostfix,
-      @JsonProperty("fixedSegmentName") @Nullable String fixedSegmentName) {
+      @JsonProperty("fixedSegmentName") @Nullable String fixedSegmentName,
+      @JsonProperty(value = "segmentMapperFileSizeThresholdInBytes", required 
= true)
+      long segmentMapperFileSizeThresholdInBytes) {
     Preconditions.checkState(maxNumRecordsPerSegment > 0, "Max num records per 
segment must be > 0");
+    Preconditions.checkState(segmentMapperFileSizeThresholdInBytes > 0, 
"Intermediate file size threshold must be > 0");
     _maxNumRecordsPerSegment = maxNumRecordsPerSegment;
     _segmentNamePrefix = segmentNamePrefix;
     _segmentNamePostfix = segmentNamePostfix;
     _fixedSegmentName = fixedSegmentName;
+    _segmentMapperFileSizeThresholdInBytes = 
segmentMapperFileSizeThresholdInBytes;
   }
 
   /**
@@ -71,15 +77,21 @@ public class SegmentConfig {
     return _fixedSegmentName;
   }
 
+  public long getIntermediateFileSizeThreshold() {
+    return _segmentMapperFileSizeThresholdInBytes;
+  }
+
   /**
    * Builder for SegmentConfig
    */
   public static class Builder {
     private int _maxNumRecordsPerSegment = DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT;
+    private long _segmentMapperFileSizeThresholdInBytes = 
DEFAULT_SEGMENT_MAPPER_FILE_SIZE_IN_BYTES;
     private String _segmentNamePrefix;
     private String _segmentNamePostfix;
     private String _fixedSegmentName;
 
+
     public Builder setMaxNumRecordsPerSegment(int maxNumRecordsPerSegment) {
       _maxNumRecordsPerSegment = maxNumRecordsPerSegment;
       return this;
@@ -99,17 +111,25 @@ public class SegmentConfig {
       _fixedSegmentName = fixedSegmentName;
       return this;
     }
+    public Builder setIntermediateFileSizeThreshold(long 
segmentMapperFileSizeThresholdInBytes) {
+      _segmentMapperFileSizeThresholdInBytes = 
segmentMapperFileSizeThresholdInBytes;
+      return this;
+    }
 
     public SegmentConfig build() {
       Preconditions.checkState(_maxNumRecordsPerSegment > 0, "Max num records 
per segment must be > 0");
-      return new SegmentConfig(_maxNumRecordsPerSegment, _segmentNamePrefix, 
_segmentNamePostfix, _fixedSegmentName);
+      Preconditions.checkState(_segmentMapperFileSizeThresholdInBytes > 0,
+          "Intermediate file size threshold must be > 0");
+      return new SegmentConfig(_maxNumRecordsPerSegment, _segmentNamePrefix, 
_segmentNamePostfix, _fixedSegmentName,
+          _segmentMapperFileSizeThresholdInBytes);
     }
   }
 
   @Override
   public String toString() {
-    return "SegmentConfig{" + "_maxNumRecordsPerSegment=" + 
_maxNumRecordsPerSegment + ", _segmentNamePrefix='"
-        + _segmentNamePrefix + '\'' + ", _segmentNamePostfix='" + 
_segmentNamePostfix + '\'' + ", _fixedSegmentName='"
-        + _fixedSegmentName + '\'' + '}';
+    return "SegmentConfig{" + "_maxNumRecordsPerSegment=" + 
_maxNumRecordsPerSegment
+        + ", _segmentMapperFileSizeThresholdInBytes=" + 
_segmentMapperFileSizeThresholdInBytes
+        + ", _segmentNamePrefix='" + _segmentNamePrefix + '\'' + ", 
_segmentNamePostfix='" + _segmentNamePostfix + '\''
+        + ", _fixedSegmentName='" + _fixedSegmentName + '\'' + '}';
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
index fb5a08ff56..868583d0d3 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
@@ -66,8 +66,8 @@ public class SegmentProcessorFramework {
   private final File _mapperOutputDir;
   private final File _reducerOutputDir;
   private final File _segmentsOutputDir;
-  private Map<String, GenericRowFileManager> _partitionToFileManagerMap;
   private final SegmentNumRowProvider _segmentNumRowProvider;
+  private int _segmentSequenceId = 0;
 
   /**
    * Initializes the SegmentProcessorFramework with record readers, config and 
working directory. We will now rely on
@@ -124,14 +124,8 @@ public class SegmentProcessorFramework {
     try {
       return doProcess();
     } catch (Exception e) {
-      // Cleaning up file managers no matter they are from map phase or reduce 
phase. For those from reduce phase, the
-      // reducers should have cleaned up the corresponding file managers from 
map phase already.
-      if (_partitionToFileManagerMap != null) {
-        for (GenericRowFileManager fileManager : 
_partitionToFileManagerMap.values()) {
-          fileManager.cleanUp();
-        }
-      }
-      // Cleaning up output dir as processing has failed.
+      // Cleaning up output dir as processing has failed. file managers left 
from map or reduce phase will be cleaned
+      // up in the respective phases.
       FileUtils.deleteQuietly(_segmentsOutputDir);
       throw e;
     } finally {
@@ -142,24 +136,103 @@ public class SegmentProcessorFramework {
 
   private List<File> doProcess()
       throws Exception {
-    // Map phase
-    LOGGER.info("Beginning map phase on {} record readers", 
_recordReaderFileConfigs.size());
-    SegmentMapper mapper = new SegmentMapper(_recordReaderFileConfigs, 
_customRecordTransformers,
-        _segmentProcessorConfig, _mapperOutputDir);
-    _partitionToFileManagerMap = mapper.map();
-
-    // Check for mapper output files
-    if (_partitionToFileManagerMap.isEmpty()) {
-      LOGGER.info("No partition generated from mapper phase, skipping the 
reducer phase");
-      return Collections.emptyList();
+    List<File> outputSegmentDirs = new ArrayList<>();
+    int numRecordReaders = _recordReaderFileConfigs.size();
+    int nextRecordReaderIndexToBeProcessed = 0;
+    int iterationCount = 1;
+    Consumer<Object> observer = _segmentProcessorConfig.getProgressObserver();
+    boolean isMapperOutputSizeThresholdEnabled =
+        
_segmentProcessorConfig.getSegmentConfig().getIntermediateFileSizeThreshold() 
!= Long.MAX_VALUE;
+
+    while (nextRecordReaderIndexToBeProcessed < numRecordReaders) {
+      // Initialise the mapper. Eliminate the record readers that have been 
processed in the previous iterations.
+      SegmentMapper mapper =
+          new 
SegmentMapper(_recordReaderFileConfigs.subList(nextRecordReaderIndexToBeProcessed,
 numRecordReaders),
+              _customRecordTransformers, _segmentProcessorConfig, 
_mapperOutputDir);
+
+      // Log start of iteration details only if intermediate file size 
threshold is set.
+      if (isMapperOutputSizeThresholdEnabled) {
+        String logMessage =
+            String.format("Starting iteration %d with %d record readers. 
Starting index = %d, end index = %d",
+                iterationCount,
+                
_recordReaderFileConfigs.subList(nextRecordReaderIndexToBeProcessed, 
numRecordReaders).size(),
+                nextRecordReaderIndexToBeProcessed + 1, numRecordReaders);
+        LOGGER.info(logMessage);
+        observer.accept(logMessage);
+      }
+
+      // Map phase.
+      long mapStartTimeInMs = System.currentTimeMillis();
+      Map<String, GenericRowFileManager> partitionToFileManagerMap = 
mapper.map();
+
+      // Log the time taken to map.
+      LOGGER.info("Finished iteration {} in {}ms", iterationCount, 
System.currentTimeMillis() - mapStartTimeInMs);
+
+      // Check for mapper output files, if no files are generated, skip the 
reducer phase and move on to the next
+      // iteration.
+      if (partitionToFileManagerMap.isEmpty()) {
+        LOGGER.info("No mapper output files generated, skipping reduce phase");
+        nextRecordReaderIndexToBeProcessed = 
getNextRecordReaderIndexToBeProcessed(nextRecordReaderIndexToBeProcessed);
+        continue;
+      }
+
+      // Reduce phase.
+      doReduce(partitionToFileManagerMap);
+
+      // Segment creation phase. Add the created segments to the final list.
+      outputSegmentDirs.addAll(generateSegment(partitionToFileManagerMap));
+
+      // Store the starting index of the record readers that were processed in 
this iteration for logging purposes.
+      int startingProcessedRecordReaderIndex = 
nextRecordReaderIndexToBeProcessed;
+
+      // Update next record reader index to be processed.
+      nextRecordReaderIndexToBeProcessed = 
getNextRecordReaderIndexToBeProcessed(nextRecordReaderIndexToBeProcessed);
+
+      // Log the details between iteration only if intermediate file size 
threshold is set.
+      if (isMapperOutputSizeThresholdEnabled) {
+        // Take care of logging the proper RecordReader index in case of the 
last iteration.
+        int boundaryIndexToLog =
+            nextRecordReaderIndexToBeProcessed == numRecordReaders ? 
nextRecordReaderIndexToBeProcessed
+                : nextRecordReaderIndexToBeProcessed + 1;
+
+        // We are sure that the last RecordReader is completely processed in 
the last iteration else it may or may not
+        // have completed processing. Log it accordingly.
+        String logMessage;
+        if (nextRecordReaderIndexToBeProcessed == numRecordReaders) {
+          logMessage = String.format("Finished processing all of %d 
RecordReaders", numRecordReaders);
+        } else {
+          logMessage = String.format(
+              "Finished processing RecordReaders %d to %d (RecordReader %d 
might be partially processed) out of %d in "
+                  + "iteration %d", startingProcessedRecordReaderIndex + 1, 
boundaryIndexToLog,
+              nextRecordReaderIndexToBeProcessed + 1, numRecordReaders, 
iterationCount);
+        }
+
+        observer.accept(logMessage);
+        LOGGER.info(logMessage);
+      }
+
+      iterationCount++;
     }
+    return outputSegmentDirs;
+  }
 
-    // Reduce phase
-    LOGGER.info("Beginning reduce phase on partitions: {}", 
_partitionToFileManagerMap.keySet());
+  private int getNextRecordReaderIndexToBeProcessed(int currentRecordIndex) {
+    for (int i = currentRecordIndex; i < _recordReaderFileConfigs.size(); i++) 
{
+      RecordReaderFileConfig recordReaderFileConfig = 
_recordReaderFileConfigs.get(i);
+      if (!recordReaderFileConfig.isRecordReaderDone()) {
+        return i;
+      }
+    }
+    return _recordReaderFileConfigs.size();
+  }
+
+  private void doReduce(Map<String, GenericRowFileManager> 
partitionToFileManagerMap)
+      throws Exception {
+    LOGGER.info("Beginning reduce phase on partitions: {}", 
partitionToFileManagerMap.keySet());
     Consumer<Object> observer = _segmentProcessorConfig.getProgressObserver();
-    int totalCount = _partitionToFileManagerMap.keySet().size();
+    int totalCount = partitionToFileManagerMap.keySet().size();
     int count = 1;
-    for (Map.Entry<String, GenericRowFileManager> entry : 
_partitionToFileManagerMap.entrySet()) {
+    for (Map.Entry<String, GenericRowFileManager> entry : 
partitionToFileManagerMap.entrySet()) {
       String partitionId = entry.getKey();
       observer.accept(
           String.format("Doing reduce phase on data from partition: %s (%d out 
of %d)", partitionId, count++,
@@ -168,9 +241,11 @@ public class SegmentProcessorFramework {
       Reducer reducer = ReducerFactory.getReducer(partitionId, fileManager, 
_segmentProcessorConfig, _reducerOutputDir);
       entry.setValue(reducer.reduce());
     }
+  }
 
-    // Segment creation phase
-    LOGGER.info("Beginning segment creation phase on partitions: {}", 
_partitionToFileManagerMap.keySet());
+  private List<File> generateSegment(Map<String, GenericRowFileManager> 
partitionToFileManagerMap)
+      throws Exception {
+    LOGGER.info("Beginning segment creation phase on partitions: {}", 
partitionToFileManagerMap.keySet());
     List<File> outputSegmentDirs = new ArrayList<>();
     TableConfig tableConfig = _segmentProcessorConfig.getTableConfig();
     Schema schema = _segmentProcessorConfig.getSchema();
@@ -179,6 +254,7 @@ public class SegmentProcessorFramework {
     String fixedSegmentName = 
_segmentProcessorConfig.getSegmentConfig().getFixedSegmentName();
     SegmentGeneratorConfig generatorConfig = new 
SegmentGeneratorConfig(tableConfig, schema);
     generatorConfig.setOutDir(_segmentsOutputDir.getPath());
+    Consumer<Object> observer = _segmentProcessorConfig.getProgressObserver();
 
     if (tableConfig.getIndexingConfig().getSegmentNameGeneratorType() != null) 
{
       generatorConfig.setSegmentNameGenerator(
@@ -190,8 +266,7 @@ public class SegmentProcessorFramework {
       generatorConfig.setSegmentNamePostfix(segmentNamePostfix);
     }
 
-    int sequenceId = 0;
-    for (Map.Entry<String, GenericRowFileManager> entry : 
_partitionToFileManagerMap.entrySet()) {
+    for (Map.Entry<String, GenericRowFileManager> entry : 
partitionToFileManagerMap.entrySet()) {
       String partitionId = entry.getKey();
       GenericRowFileManager fileManager = entry.getValue();
       try {
@@ -202,15 +277,15 @@ public class SegmentProcessorFramework {
             numSortFields);
         GenericRowFileRecordReader recordReader = fileReader.getRecordReader();
         int maxNumRecordsPerSegment;
-        for (int startRowId = 0; startRowId < numRows; startRowId += 
maxNumRecordsPerSegment, sequenceId++) {
+        for (int startRowId = 0; startRowId < numRows; startRowId += 
maxNumRecordsPerSegment, _segmentSequenceId++) {
           maxNumRecordsPerSegment = _segmentNumRowProvider.getNumRows();
           int endRowId = Math.min(startRowId + maxNumRecordsPerSegment, 
numRows);
-          LOGGER.info("Start creating segment of sequenceId: {} with row 
range: {} to {}", sequenceId, startRowId,
-              endRowId);
+          LOGGER.info("Start creating segment of sequenceId: {} with row 
range: {} to {}", _segmentSequenceId,
+              startRowId, endRowId);
           observer.accept(String.format(
               "Creating segment of sequentId: %d with data from partition: %s 
and row range: [%d, %d) out of [0, %d)",
-              sequenceId, partitionId, startRowId, endRowId, numRows));
-          generatorConfig.setSequenceId(sequenceId);
+              _segmentSequenceId, partitionId, startRowId, endRowId, numRows));
+          generatorConfig.setSequenceId(_segmentSequenceId);
           GenericRowFileRecordReader recordReaderForRange = 
recordReader.getRecordReaderForRange(startRowId, endRowId);
           SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
           driver.init(generatorConfig, new 
RecordReaderSegmentCreationDataSource(recordReaderForRange),
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/AdaptiveConstraintsWriter.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/AdaptiveConstraintsWriter.java
new file mode 100644
index 0000000000..d0a3daffa0
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/AdaptiveConstraintsWriter.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.genericrow;
+
+import java.io.IOException;
+
+
+/**
+ * Interface for a writer which can track constraints. This will be used by 
SegmentProcessorFramework.
+ * */
+
+public interface AdaptiveConstraintsWriter<W, D> {
+  boolean canWrite();
+
+  void write(W writer, D dataUnit)
+      throws IOException;
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/AdaptiveSizeBasedWriter.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/AdaptiveSizeBasedWriter.java
new file mode 100644
index 0000000000..541bd14e26
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/AdaptiveSizeBasedWriter.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.genericrow;
+
+import java.io.IOException;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+public class AdaptiveSizeBasedWriter implements 
AdaptiveConstraintsWriter<GenericRowFileWriter, GenericRow> {
+
+  private final long _bytesLimit;
+  private long _numBytesWritten;
+
+  public AdaptiveSizeBasedWriter(long bytesLimit) {
+    _bytesLimit = bytesLimit;
+    _numBytesWritten = 0;
+  }
+
+  public long getBytesLimit() {
+    return _bytesLimit;
+  }
+  public long getNumBytesWritten() {
+    return _numBytesWritten;
+  }
+
+  @Override
+  public boolean canWrite() {
+    return _numBytesWritten < _bytesLimit;
+  }
+
+  @Override
+  public void write(GenericRowFileWriter writer, GenericRow row) throws 
IOException {
+    _numBytesWritten += writer.writeData(row);
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/FileWriter.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/FileWriter.java
new file mode 100644
index 0000000000..8dd0a7d0a1
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/FileWriter.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.genericrow;
+
+import java.io.IOException;
+
+/**
+ * Abstraction for writing data units to a file.
+ * */
+
+public interface FileWriter<T> {
+  void close() throws IOException;
+  long writeData(T dataUnit) throws IOException;
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileWriter.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileWriter.java
index 171488f963..281838a0b5 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileWriter.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/GenericRowFileWriter.java
@@ -38,7 +38,7 @@ import org.apache.pinot.spi.data.readers.GenericRow;
  *
  * TODO: Consider using ByteBuffer instead of OutputStream.
  */
-public class GenericRowFileWriter implements Closeable {
+public class GenericRowFileWriter implements Closeable, FileWriter<GenericRow> 
{
   private final DataOutputStream _offsetStream;
   private final BufferedOutputStream _dataStream;
   private final GenericRowSerializer _serializer;
@@ -63,6 +63,15 @@ public class GenericRowFileWriter implements Closeable {
     _nextOffset += bytes.length;
   }
 
+  public long writeData(GenericRow genericRow)
+      throws IOException {
+    _offsetStream.writeLong(_nextOffset);
+    byte[] bytes = _serializer.serialize(genericRow);
+    _dataStream.write(bytes);
+    _nextOffset += bytes.length;
+    return bytes.length;
+  }
+
   @Override
   public void close()
       throws IOException {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
index b883d775cc..407cbd4dcd 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
@@ -30,7 +30,9 @@ import java.util.stream.Collectors;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import 
org.apache.pinot.core.segment.processing.framework.SegmentProcessorConfig;
+import 
org.apache.pinot.core.segment.processing.genericrow.AdaptiveSizeBasedWriter;
 import 
org.apache.pinot.core.segment.processing.genericrow.GenericRowFileManager;
+import 
org.apache.pinot.core.segment.processing.genericrow.GenericRowFileWriter;
 import org.apache.pinot.core.segment.processing.partitioner.Partitioner;
 import org.apache.pinot.core.segment.processing.partitioner.PartitionerConfig;
 import org.apache.pinot.core.segment.processing.partitioner.PartitionerFactory;
@@ -45,7 +47,6 @@ import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
-import org.apache.pinot.spi.data.readers.RecordReaderFactory;
 import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
 import org.apache.pinot.spi.utils.StringUtil;
 import org.slf4j.Logger;
@@ -62,22 +63,20 @@ import org.slf4j.LoggerFactory;
  */
 public class SegmentMapper {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentMapper.class);
-
-  private List<RecordReaderFileConfig> _recordReaderFileConfigs;
-  private List<RecordTransformer> _customRecordTransformers;
   private final SegmentProcessorConfig _processorConfig;
   private final File _mapperOutputDir;
-
   private final List<FieldSpec> _fieldSpecs;
   private final boolean _includeNullFields;
   private final int _numSortFields;
-
   private final CompositeTransformer _recordTransformer;
   private final TimeHandler _timeHandler;
   private final Partitioner[] _partitioners;
   private final String[] _partitionsBuffer;
   // NOTE: Use TreeMap so that the order is deterministic
   private final Map<String, GenericRowFileManager> _partitionToFileManagerMap 
= new TreeMap<>();
+  private AdaptiveSizeBasedWriter _adaptiveSizeBasedWriter;
+  private List<RecordReaderFileConfig> _recordReaderFileConfigs;
+  private List<RecordTransformer> _customRecordTransformers;
 
   public SegmentMapper(List<RecordReaderFileConfig> recordReaderFileConfigs,
       List<RecordTransformer> customRecordTransformers, SegmentProcessorConfig 
processorConfig, File mapperOutputDir) {
@@ -107,6 +106,10 @@ public class SegmentMapper {
     LOGGER.info("Initialized mapper with {} record readers, output dir: {}, 
timeHandler: {}, partitioners: {}",
         _recordReaderFileConfigs.size(), _mapperOutputDir, 
_timeHandler.getClass(),
         Arrays.stream(_partitioners).map(p -> 
p.getClass().toString()).collect(Collectors.joining(",")));
+
+    // initialize adaptive writer.
+    _adaptiveSizeBasedWriter =
+        new 
AdaptiveSizeBasedWriter(processorConfig.getSegmentConfig().getIntermediateFileSizeThreshold());
   }
 
   /**
@@ -118,7 +121,7 @@ public class SegmentMapper {
     try {
       return doMap();
     } catch (Exception e) {
-      // Cleaning up resources created by the mapper, leaving others to the 
caller like the input _recordReaders.
+      // Cleaning up resources created by the mapper.
       for (GenericRowFileManager fileManager : 
_partitionToFileManagerMap.values()) {
         fileManager.cleanUp();
       }
@@ -129,40 +132,39 @@ public class SegmentMapper {
   private Map<String, GenericRowFileManager> doMap()
       throws Exception {
     Consumer<Object> observer = _processorConfig.getProgressObserver();
-    int totalCount = _recordReaderFileConfigs.size();
     int count = 1;
+    int totalNumRecordReaders = _recordReaderFileConfigs.size();
     GenericRow reuse = new GenericRow();
     for (RecordReaderFileConfig recordReaderFileConfig : 
_recordReaderFileConfigs) {
-      RecordReader recordReader = recordReaderFileConfig._recordReader;
-      if (recordReader == null) {
-        // We create and use the recordReader here.
-        try {
-          recordReader =
-              
RecordReaderFactory.getRecordReader(recordReaderFileConfig._fileFormat, 
recordReaderFileConfig._dataFile,
-                  recordReaderFileConfig._fieldsToRead, 
recordReaderFileConfig._recordReaderConfig);
-          mapAndTransformRow(recordReader, reuse, observer, count, totalCount);
-        } finally {
-          if (recordReader != null) {
-            recordReader.close();
-          }
-        }
-      } else {
-        mapAndTransformRow(recordReader, reuse, observer, count, totalCount);
+      RecordReader recordReader = recordReaderFileConfig.getRecordReader();
+
+      // Mapper can terminate midway of reading a file if the intermediate 
file size has crossed the configured
+      // threshold. Map phase will continue in the next iteration right where 
we are leaving off in the current
+      // iteration.
+      boolean shouldMapperTerminate =
+          !completeMapAndTransformRow(recordReader, reuse, observer, count, 
totalNumRecordReaders);
+
+      // Terminate the map phase if intermediate file size has crossed the 
threshold.
+      if (shouldMapperTerminate) {
+        break;
       }
+      recordReaderFileConfig.closeRecordReader();
       count++;
     }
 
     for (GenericRowFileManager fileManager : 
_partitionToFileManagerMap.values()) {
       fileManager.closeFileWriter();
     }
-
     return _partitionToFileManagerMap;
   }
 
-  private void mapAndTransformRow(RecordReader recordReader, GenericRow reuse,
+
+//   Returns true if the map phase can continue, false if it should terminate 
based on the configured threshold for
+//   intermediate file size during map phase.
+  private boolean completeMapAndTransformRow(RecordReader recordReader, 
GenericRow reuse,
       Consumer<Object> observer, int count, int totalCount) throws Exception {
     observer.accept(String.format("Doing map phase on data from RecordReader 
(%d out of %d)", count, totalCount));
-    while (recordReader.hasNext()) {
+    while (recordReader.hasNext() && (_adaptiveSizeBasedWriter.canWrite())) {
       reuse = recordReader.next(reuse);
 
       // TODO: Add ComplexTypeTransformer here. Currently it is not idempotent 
so cannot add it
@@ -183,6 +185,16 @@ public class SegmentMapper {
       }
       reuse.clear();
     }
+    if (recordReader.hasNext() && !_adaptiveSizeBasedWriter.canWrite()) {
+      String logMessage = String.format(
+          "Stopping record readers at index: %d out of %d passed to mapper as 
size limit reached, bytes written = %d,"
+              + " bytes " + "limit = %d", count, totalCount, 
_adaptiveSizeBasedWriter.getNumBytesWritten(),
+          _adaptiveSizeBasedWriter.getBytesLimit());
+      observer.accept(logMessage);
+      LOGGER.info(logMessage);
+      return false;
+    }
+    return true;
   }
 
   private void writeRecord(GenericRow row)
@@ -210,6 +222,10 @@ public class SegmentMapper {
       _partitionToFileManagerMap.put(partition, fileManager);
     }
 
-    fileManager.getFileWriter().write(row);
+    // Get the file writer.
+    GenericRowFileWriter fileWriter = fileManager.getFileWriter();
+
+    // Write the row.
+    _adaptiveSizeBasedWriter.write(fileWriter, row);
   }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
index 8b3e150a1b..0ec9261d92 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
@@ -56,8 +56,10 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.assertTrue;
 
 
@@ -581,6 +583,207 @@ public class SegmentProcessorFrameworkTest {
     rewindRecordReaders(_multipleSegments);
   }
 
+  @Test
+  public void testConfigurableMapperOutputSize()
+      throws Exception {
+    File workingDir = new File(TEMP_DIR, "configurable_mapper_test_output");
+    FileUtils.forceMkdir(workingDir);
+    int expectedTotalDocsCount = 10;
+
+    // Test 1 :  Default case i.e. no limit to mapper output file size (single 
record reader).
+
+    SegmentProcessorConfig config =
+        new 
SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).build();
+    SegmentProcessorFramework framework = new 
SegmentProcessorFramework(_singleSegment, config, workingDir);
+    List<File> outputSegments = framework.process();
+    assertEquals(outputSegments.size(), 1);
+    String[] outputDirs = workingDir.list();
+    assertTrue(outputDirs != null && outputDirs.length == 1, 
Arrays.toString(outputDirs));
+    SegmentMetadata segmentMetadata = new 
SegmentMetadataImpl(outputSegments.get(0));
+    assertEquals(segmentMetadata.getTotalDocs(), expectedTotalDocsCount);
+    assertEquals(segmentMetadata.getName(), 
"myTable_1597719600000_1597892400000_0");
+    FileUtils.cleanDirectory(workingDir);
+    rewindRecordReaders(_singleSegment);
+
+    // Test 2 :  Default case i.e. no limit to mapper output file size 
(multiple record readers).
+    config = new 
SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).build();
+    framework = new SegmentProcessorFramework(_multipleSegments, config, 
workingDir);
+    outputSegments = framework.process();
+    assertEquals(outputSegments.size(), 1);
+    outputDirs = workingDir.list();
+    assertTrue(outputDirs != null && outputDirs.length == 1, 
Arrays.toString(outputDirs));
+    segmentMetadata = new SegmentMetadataImpl(outputSegments.get(0));
+    assertEquals(segmentMetadata.getTotalDocs(), expectedTotalDocsCount);
+    assertEquals(segmentMetadata.getName(), 
"myTable_1597719600000_1597892400000_0");
+    FileUtils.cleanDirectory(workingDir);
+    rewindRecordReaders(_multipleSegments);
+
+    // Test 3 :  Test mapper with threshold output size (single record reader).
+
+    // Create a segmentConfig with intermediate mapper output size threshold 
set to the number of bytes in each row
+    // from the data. In this way, we can test if each row is written to a 
separate segment.
+    SegmentConfig segmentConfig =
+        new 
SegmentConfig.Builder().setIntermediateFileSizeThreshold(16).setSegmentNamePrefix("testPrefix")
+            .setSegmentNamePostfix("testPostfix").build();
+    config = new 
SegmentProcessorConfig.Builder().setSegmentConfig(segmentConfig).setTableConfig(_tableConfig)
+        .setSchema(_schema).build();
+    framework = new SegmentProcessorFramework(_singleSegment, config, 
workingDir);
+    outputSegments = framework.process();
+    assertEquals(outputSegments.size(), expectedTotalDocsCount);
+    outputDirs = workingDir.list();
+    assertTrue(outputDirs != null && outputDirs.length == 1, 
Arrays.toString(outputDirs));
+
+    // Verify that each segment has only one row, and the segment name is 
correct.
+
+    for (int i = 0; i < expectedTotalDocsCount; i++) {
+      segmentMetadata = new SegmentMetadataImpl(outputSegments.get(i));
+      assertEquals(segmentMetadata.getTotalDocs(), 1);
+      
assertTrue(segmentMetadata.getName().matches("testPrefix_.*_testPostfix_" + i));
+    }
+    FileUtils.cleanDirectory(workingDir);
+    rewindRecordReaders(_singleSegment);
+
+    // Test 4 :  Test mapper with threshold output size (multiple record 
readers).
+
+    // Create a segmentConfig with intermediate mapper output size threshold 
set to the number of bytes in each row
+    // from the data. In this way, we can test if each row is written to a 
separate segment.
+    segmentConfig = new 
SegmentConfig.Builder().setIntermediateFileSizeThreshold(16).setSegmentNamePrefix("testPrefix")
+        .setSegmentNamePostfix("testPostfix").build();
+    config = new 
SegmentProcessorConfig.Builder().setSegmentConfig(segmentConfig).setTableConfig(_tableConfig)
+        .setSchema(_schema).build();
+    framework = new SegmentProcessorFramework(_multipleSegments, config, 
workingDir);
+    outputSegments = framework.process();
+    assertEquals(outputSegments.size(), expectedTotalDocsCount);
+    outputDirs = workingDir.list();
+    assertTrue(outputDirs != null && outputDirs.length == 1, 
Arrays.toString(outputDirs));
+
+    // Verify that each segment has only one row, and the segment name is 
correct.
+
+    for (int i = 0; i < expectedTotalDocsCount; i++) {
+      segmentMetadata = new SegmentMetadataImpl(outputSegments.get(i));
+      assertEquals(segmentMetadata.getTotalDocs(), 1);
+      
assertTrue(segmentMetadata.getName().matches("testPrefix_.*_testPostfix_" + i));
+    }
+    FileUtils.cleanDirectory(workingDir);
+    rewindRecordReaders(_multipleSegments);
+
+    // Test 5 :  Test with injected failure in mapper to verify output 
directory is cleaned up.
+
+    List<RecordReader> testList = new ArrayList<>(_multipleSegments);
+    testList.set(1, null);
+    segmentConfig = new 
SegmentConfig.Builder().setIntermediateFileSizeThreshold(16).setSegmentNamePrefix("testPrefix")
+        .setSegmentNamePostfix("testPostfix").build();
+    config = new 
SegmentProcessorConfig.Builder().setSegmentConfig(segmentConfig).setTableConfig(_tableConfig)
+        .setSchema(_schema).build();
+    SegmentProcessorFramework failureTest = new 
SegmentProcessorFramework(testList, config, workingDir);
+    assertThrows(NullPointerException.class, failureTest::process);
+    assertTrue(FileUtils.isEmptyDirectory(workingDir));
+    rewindRecordReaders(_multipleSegments);
+
+    // Test 6: RecordReader should be closed when recordReader is created 
inside RecordReaderFileConfig (without mapper
+    // output size threshold configured).
+
+    ClassLoader classLoader = getClass().getClassLoader();
+    URL resource = classLoader.getResource("data/dimBaseballTeams.csv");
+    RecordReaderFileConfig recordReaderFileConfig =
+        new RecordReaderFileConfig(FileFormat.CSV, new File(resource.toURI()), 
null, null);
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setTimeColumnName("time").build();
+    Schema schema =
+        new 
Schema.SchemaBuilder().setSchemaName("mySchema").addSingleValueDimension("teamId",
 DataType.STRING, "")
+            .addSingleValueDimension("teamName", DataType.STRING, "")
+            .addDateTime("time", DataType.LONG, "1:MILLISECONDS:EPOCH", 
"1:MILLISECONDS").build();
+
+    config = new 
SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema).build();
+
+    SegmentProcessorFramework frameworkWithRecordReaderFileConfig =
+        new SegmentProcessorFramework(config, workingDir, 
ImmutableList.of(recordReaderFileConfig),
+            Collections.emptyList(), null);
+    outputSegments = frameworkWithRecordReaderFileConfig.process();
+
+    // Verify the number of segments created and the total docs.
+    assertEquals(outputSegments.size(), 1);
+    ImmutableSegment segment = 
ImmutableSegmentLoader.load(outputSegments.get(0), ReadMode.mmap);
+    segmentMetadata = segment.getSegmentMetadata();
+    assertEquals(segmentMetadata.getTotalDocs(), 52);
+
+    // Verify that the record reader is closed from RecordReaderFileConfig.
+    
assertTrue(recordReaderFileConfig.isRecordReaderClosedFromRecordReaderFileConfig());
+    FileUtils.cleanDirectory(workingDir);
+
+    // Test 7: RecordReader should not be closed when recordReader is passed 
to RecordReaderFileConfig. (Without
+    // mapper output size threshold configured)
+
+    RecordReader recordReader = recordReaderFileConfig.getRecordReader();
+    recordReader.rewind();
+
+    // Pass the recordReader to RecordReaderFileConfig.
+    recordReaderFileConfig = new RecordReaderFileConfig(recordReader);
+    SegmentProcessorFramework frameworkWithDelegateRecordReader =
+        new SegmentProcessorFramework(config, workingDir, 
ImmutableList.of(recordReaderFileConfig),
+            Collections.emptyList(), null);
+    outputSegments = frameworkWithDelegateRecordReader.process();
+
+    // Verify the number of segments created and the total docs.
+    assertEquals(outputSegments.size(), 1);
+    segment = ImmutableSegmentLoader.load(outputSegments.get(0), 
ReadMode.mmap);
+    segmentMetadata = segment.getSegmentMetadata();
+    assertEquals(segmentMetadata.getTotalDocs(), 52);
+
+    // Verify that the record reader is not closed from RecordReaderFileConfig.
+    
assertFalse(recordReaderFileConfig.isRecordReaderClosedFromRecordReaderFileConfig());
+    FileUtils.cleanDirectory(workingDir);
+
+    // Test 8: RecordReader should be closed when recordReader is created 
inside RecordReaderFileConfig (With mapper
+    // output size threshold configured).
+
+    expectedTotalDocsCount = 52;
+    recordReaderFileConfig = new RecordReaderFileConfig(FileFormat.CSV, new 
File(resource.toURI()), null, null);
+
+    segmentConfig = new 
SegmentConfig.Builder().setIntermediateFileSizeThreshold(19).setSegmentNamePrefix("testPrefix")
+        .setSegmentNamePostfix("testPostfix").build();
+    config = new 
SegmentProcessorConfig.Builder().setSegmentConfig(segmentConfig).setTableConfig(tableConfig)
+        .setSchema(schema).build();
+
+    frameworkWithRecordReaderFileConfig =
+        new SegmentProcessorFramework(config, workingDir, 
ImmutableList.of(recordReaderFileConfig),
+            Collections.emptyList(), null);
+    outputSegments = frameworkWithRecordReaderFileConfig.process();
+
+    // Verify that each segment has only one row.
+    for (int i = 0; i < expectedTotalDocsCount; i++) {
+      segmentMetadata = new SegmentMetadataImpl(outputSegments.get(i));
+      assertEquals(segmentMetadata.getTotalDocs(), 1);
+    }
+
+    // Verify that the record reader is closed from RecordReaderFileConfig.
+    
assertTrue(recordReaderFileConfig.isRecordReaderClosedFromRecordReaderFileConfig());
+    FileUtils.cleanDirectory(workingDir);
+
+    // Test 9: RecordReader should not be closed when recordReader is passed 
to RecordReaderFileConfig (With mapper
+    // output size threshold configured).
+
+    recordReader = recordReaderFileConfig.getRecordReader();
+    recordReader.rewind();
+
+    // Pass the recordReader to RecordReaderFileConfig.
+    recordReaderFileConfig = new RecordReaderFileConfig(recordReader);
+    frameworkWithDelegateRecordReader =
+        new SegmentProcessorFramework(config, workingDir, 
ImmutableList.of(recordReaderFileConfig),
+            Collections.emptyList(), null);
+    outputSegments = frameworkWithDelegateRecordReader.process();
+
+    // Verify that each segment has only one row.
+    for (int i = 0; i < expectedTotalDocsCount; i++) {
+      segmentMetadata = new SegmentMetadataImpl(outputSegments.get(i));
+      assertEquals(segmentMetadata.getTotalDocs(), 1);
+    }
+
+    // Verify that the record reader is not closed from RecordReaderFileConfig.
+    
assertFalse(recordReaderFileConfig.isRecordReaderClosedFromRecordReaderFileConfig());
+    FileUtils.cleanDirectory(workingDir);
+  }
+
   @Test
   public void testMultiValue()
       throws Exception {
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java
index 26c9e59e39..233dfd7f03 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java
@@ -147,6 +147,10 @@ public class MergeTaskUtils {
     if (maxNumRecordsPerSegment != null) {
       
segmentConfigBuilder.setMaxNumRecordsPerSegment(Integer.parseInt(maxNumRecordsPerSegment));
     }
+    String segmentMapperFileSizeThreshold = 
taskConfig.get(MergeTask.SEGMENT_MAPPER_FILE_SIZE_IN_BYTES);
+    if (segmentMapperFileSizeThreshold != null) {
+      
segmentConfigBuilder.setIntermediateFileSizeThreshold(Long.parseLong(segmentMapperFileSizeThreshold));
+    }
     
segmentConfigBuilder.setSegmentNamePrefix(taskConfig.get(MergeTask.SEGMENT_NAME_PREFIX_KEY));
     
segmentConfigBuilder.setSegmentNamePostfix(taskConfig.get(MergeTask.SEGMENT_NAME_POSTFIX_KEY));
     
segmentConfigBuilder.setFixedSegmentName(taskConfig.get(MergeTask.FIXED_SEGMENT_NAME_KEY));
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtilsTest.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtilsTest.java
index bc6b897211..731607784b 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtilsTest.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtilsTest.java
@@ -178,15 +178,17 @@ public class MergeTaskUtilsTest {
     taskConfig.put(MergeTask.SEGMENT_NAME_PREFIX_KEY, "myPrefix");
     taskConfig.put(MergeTask.SEGMENT_NAME_POSTFIX_KEY, "myPostfix");
     taskConfig.put(MergeTask.FIXED_SEGMENT_NAME_KEY, "mySegment");
+    taskConfig.put(MergeTask.SEGMENT_MAPPER_FILE_SIZE_IN_BYTES, "1000000000");
     SegmentConfig segmentConfig = MergeTaskUtils.getSegmentConfig(taskConfig);
     assertEquals(segmentConfig.getMaxNumRecordsPerSegment(), 10000);
     assertEquals(segmentConfig.getSegmentNamePrefix(), "myPrefix");
     assertEquals(segmentConfig.getSegmentNamePostfix(), "myPostfix");
     assertEquals(segmentConfig.getSegmentNamePostfix(), "myPostfix");
     assertEquals(segmentConfig.getFixedSegmentName(), "mySegment");
+    assertEquals(segmentConfig.getIntermediateFileSizeThreshold(), 
1000000000L);
     assertEquals(segmentConfig.toString(),
-        "SegmentConfig{_maxNumRecordsPerSegment=10000, 
_segmentNamePrefix='myPrefix', "
-            + "_segmentNamePostfix='myPostfix', 
_fixedSegmentName='mySegment'}");
+        "SegmentConfig{_maxNumRecordsPerSegment=10000, 
_segmentMapperFileSizeThresholdInBytes=1000000000, "
+            + "_segmentNamePrefix='myPrefix', _segmentNamePostfix='myPostfix', 
_fixedSegmentName='mySegment'}");
 
     segmentConfig = MergeTaskUtils.getSegmentConfig(Collections.emptyMap());
     assertEquals(segmentConfig.getMaxNumRecordsPerSegment(), 
SegmentConfig.DEFAULT_MAX_NUM_RECORDS_PER_SEGMENT);
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java
index 8a2ebb9e16..51e4ed0cfb 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFileConfig.java
@@ -34,8 +34,13 @@ public class RecordReaderFileConfig {
   public final File _dataFile;
   public final Set<String> _fieldsToRead;
   public final RecordReaderConfig _recordReaderConfig;
+  private final boolean _isDelegateReader;
   // Record Readers created/passed from clients.
-  public final RecordReader _recordReader;
+  public RecordReader _recordReader;
+  // Track if RecordReaderFileConfig initialized the RecordReader for to aid 
in closing the RecordReader.
+  private boolean _isRecordReaderInitialized;
+  // Track if RecordReaderFileConfig closed the RecordReader for testing 
purposes.
+  private boolean _isRecordReaderClosed;
 
   // Pass in the info needed to initialize the reader
   public RecordReaderFileConfig(FileFormat fileFormat, File dataFile, 
Set<String> fieldsToRead,
@@ -45,6 +50,11 @@ public class RecordReaderFileConfig {
     _fieldsToRead = fieldsToRead;
     _recordReaderConfig = recordReaderConfig;
     _recordReader = null;
+    // This is not a delegate RecordReader i.e. RecordReaderFileConfig owns 
the RecordReader, so it should be closed
+    // by RecordReaderFileConfig as well.
+    _isDelegateReader = false;
+    _isRecordReaderInitialized = false;
+    _isRecordReaderClosed = false;
   }
 
   // Pass in the reader instance directly
@@ -54,5 +64,44 @@ public class RecordReaderFileConfig {
     _dataFile = null;
     _fieldsToRead = null;
     _recordReaderConfig = null;
+    // This is a delegate RecordReader i.e. RecordReader instance has been 
passed to RecordReaderFileConfig instead
+    // of the configs. It means RecordReaderFileConfig does not own the 
RecordReader, so it should not be closed by
+    // RecordReaderFileConfig as well. The responsibility of closing the 
RecordReader lies with the caller.
+    _isDelegateReader = true;
+    _isRecordReaderInitialized = true;
+    _isRecordReaderClosed = false;
+  }
+
+  // Return the RecordReader instance. Initialize the RecordReader if not 
already initialized.
+  public RecordReader getRecordReader()
+      throws Exception {
+    if (!_isRecordReaderInitialized) {
+      _recordReader = RecordReaderFactory.getRecordReader(_fileFormat, 
_dataFile, _fieldsToRead, _recordReaderConfig);
+      _isRecordReaderInitialized = true;
+    }
+    return _recordReader;
+  }
+
+  // Close the RecordReader instance if RecordReaderFileConfig initialized it.
+  public void closeRecordReader()
+      throws Exception {
+    // If RecordReaderFileConfig did not create the RecordReader, then it 
should not close it.
+    if (_isRecordReaderInitialized && !_isDelegateReader) {
+      _recordReader.close();
+      _isRecordReaderClosed = true;
+    }
+  }
+
+  // Return true if RecordReader is done processing.
+  public boolean isRecordReaderDone() {
+    if (_isRecordReaderInitialized) {
+      return !_recordReader.hasNext();
+    }
+    return false;
+  }
+
+  // For testing purposes only.
+  public boolean isRecordReaderClosedFromRecordReaderFileConfig() {
+    return _isRecordReaderClosed;
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to