This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch segment-creation-in-one-pass
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 7bcaa01d55070b36a1f3219e0cf33de8724e7084
Author: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz>
AuthorDate: Fri Jan 22 13:04:21 2021 -0800

    Support data ingestion for offline segment in one pass
---
 .../readers/IntermediateSegmentRecordReader.java   |  78 +++++
 .../generator/SegmentGeneratorConfig.java          |   9 +
 .../indexsegment/mutable/IntermediateSegment.java  | 373 +++++++++++++++++++++
 .../RealtimeIndexOffHeapMemoryManager.java         |   8 +-
 ...tatistics.java => MutableColumnStatistics.java} |   4 +-
 ....java => MutableNoDictionaryColStatistics.java} |   4 +-
 .../stats/RealtimeSegmentStatsContainer.java       |   4 +-
 ...termediateSegmentSegmentCreationDataSource.java |  56 ++++
 .../creator/IntermediateSegmentStatsContainer.java |  52 +++
 .../creator/impl/SegmentColumnarIndexCreator.java  |   2 +-
 .../impl/SegmentIndexCreationDriverImpl.java       |  11 +-
 .../core/segment/creator/impl/V1Constants.java     |   2 +-
 .../index/column/IntermediateIndexContainer.java   | 134 ++++++++
 .../core/segment/index/column/NumValuesInfo.java   |  41 +++
 .../segment/index/loader/IndexLoadingConfig.java   |   3 +
 .../segment/index/metadata/ColumnMetadata.java     |   2 +-
 .../core/indexsegment/IntermediateSegmentTest.java | 222 ++++++++++++
 17 files changed, 992 insertions(+), 13 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/IntermediateSegmentRecordReader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/IntermediateSegmentRecordReader.java
new file mode 100644
index 0000000..0c2daaf
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/IntermediateSegmentRecordReader.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.readers;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.core.indexsegment.mutable.IntermediateSegment;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+
+
+public class IntermediateSegmentRecordReader implements RecordReader {
+  private final IntermediateSegment _intermediateSegment;
+  private final int _numDocs;
+
+  private int _nextDocId = 0;
+
+  public IntermediateSegmentRecordReader(IntermediateSegment 
intermediateSegment) {
+    _intermediateSegment = intermediateSegment;
+    _numDocs = intermediateSegment.getNumDocsIndexed();
+  }
+
+  @Override
+  public void init(File dataFile, Set<String> fieldsToRead, @Nullable 
RecordReaderConfig recordReaderConfig)
+      throws IOException {
+  }
+
+  @Override
+  public boolean hasNext() {
+    return _nextDocId < _numDocs;
+  }
+
+  @Override
+  public GenericRow next()
+      throws IOException {
+    return next(new GenericRow());
+  }
+
+  @Override
+  public GenericRow next(GenericRow reuse)
+      throws IOException {
+    return _intermediateSegment.getRecord(_nextDocId++, reuse);
+  }
+
+  @Override
+  public void rewind()
+      throws IOException {
+    _nextDocId = 0;
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+  }
+
+  public IntermediateSegment getIntermediateSegment() {
+    return _intermediateSegment;
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
index c630a15..a65c312 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
@@ -102,6 +102,7 @@ public class SegmentGeneratorConfig implements Serializable 
{
   private boolean _onHeap = false;
   private boolean _skipTimeValueCheck = false;
   private boolean _nullHandlingEnabled = false;
+  private boolean _isIntermediateSegmentRecordReader = false;
 
   // constructed from FieldConfig
   private Map<String, Map<String, String>> _columnProperties = new HashMap<>();
@@ -650,4 +651,12 @@ public class SegmentGeneratorConfig implements 
Serializable {
   public void setNullHandlingEnabled(boolean nullHandlingEnabled) {
     _nullHandlingEnabled = nullHandlingEnabled;
   }
+
+  public boolean isIntermediateSegmentRecordReader() {
+    return _isIntermediateSegmentRecordReader;
+  }
+
+  public void setIntermediateSegmentRecordReader(boolean 
intermediateSegmentRecordReader) {
+    _isIntermediateSegmentRecordReader = intermediateSegmentRecordReader;
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/IntermediateSegment.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/IntermediateSegment.java
new file mode 100644
index 0000000..056a05b
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/IntermediateSegment.java
@@ -0,0 +1,373 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.indexsegment.mutable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.data.partition.PartitionFunction;
+import org.apache.pinot.core.data.partition.PartitionFunctionFactory;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager;
+import org.apache.pinot.core.io.writer.impl.DirectMemoryManager;
+import org.apache.pinot.core.io.writer.impl.MmapMemoryManager;
+import org.apache.pinot.core.realtime.impl.dictionary.MutableDictionaryFactory;
+import 
org.apache.pinot.core.realtime.impl.forward.FixedByteMVMutableForwardIndex;
+import 
org.apache.pinot.core.realtime.impl.forward.FixedByteSVMutableForwardIndex;
+import org.apache.pinot.core.segment.creator.impl.V1Constants;
+import org.apache.pinot.core.segment.index.column.IntermediateIndexContainer;
+import org.apache.pinot.core.segment.index.column.NumValuesInfo;
+import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.core.segment.index.metadata.SegmentMetadata;
+import org.apache.pinot.core.segment.index.readers.MutableDictionary;
+import org.apache.pinot.core.segment.index.readers.MutableForwardIndex;
+import org.apache.pinot.core.segment.index.readers.ValidDocIndexReader;
+import org.apache.pinot.core.startree.v2.StarTreeV2;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.RowMetadata;
+import org.apache.pinot.spi.utils.ByteArray;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class IntermediateSegment implements MutableSegment {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(IntermediateSegment.class);
+
+  private static final int MAX_MULTI_VALUES_PER_ROW = 1000;
+  private static final int DEFAULT_CAPACITY = 100_000;
+  private static final int DEFAULT_EST_AVG_COL_SIZE = 32;
+  private static final int DEFAULT_EST_CARDINALITY = 5000;
+
+  private static final String STATS_FILE_NAME = "segment-stats.ser";
+
+  private final SegmentGeneratorConfig _segmentGeneratorConfig;
+  private final Schema _schema;
+  private final TableConfig _tableConfig;
+  private final String _segmentName;
+  private final PartitionFunction _partitionFunction;
+  private final String _partitionColumn;
+  private final Map<String, IntermediateIndexContainer> _indexContainerMap = 
new HashMap<>();
+  private final PinotDataBufferMemoryManager _memoryManager;
+
+  private final int _capacity = DEFAULT_CAPACITY;
+  private volatile int _numDocsIndexed = 0;
+
+  public IntermediateSegment(SegmentGeneratorConfig segmentGeneratorConfig) {
+    _segmentGeneratorConfig = segmentGeneratorConfig;
+    _schema = segmentGeneratorConfig.getSchema();
+    _tableConfig = segmentGeneratorConfig.getTableConfig();
+    _segmentName = _segmentGeneratorConfig.getSegmentName();
+
+    Collection<FieldSpec> allFieldSpecs = _schema.getAllFieldSpecs();
+    List<FieldSpec> physicalFieldSpecs = new ArrayList<>(allFieldSpecs.size());
+    physicalFieldSpecs.addAll(allFieldSpecs);
+    Collection<FieldSpec> physicalFieldSpecs1 = 
Collections.unmodifiableCollection(physicalFieldSpecs);
+
+    SegmentPartitionConfig segmentPartitionConfig = 
segmentGeneratorConfig.getSegmentPartitionConfig();
+    if (segmentPartitionConfig != null) {
+      Map<String, ColumnPartitionConfig> 
segmentPartitionConfigColumnPartitionMap =
+          segmentPartitionConfig.getColumnPartitionMap();
+      _partitionColumn = 
segmentPartitionConfigColumnPartitionMap.keySet().iterator().next();
+      _partitionFunction = PartitionFunctionFactory
+          
.getPartitionFunction(segmentPartitionConfig.getFunctionName(_partitionColumn),
+              segmentPartitionConfig.getNumPartitions(_partitionColumn));
+    } else {
+      _partitionColumn = null;
+      _partitionFunction = null;
+    }
+
+    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, 
_tableConfig);
+    boolean offHeap = indexLoadingConfig.isRealtimeOffHeapAllocation();
+    boolean directOffHeap = 
indexLoadingConfig.isDirectRealtimeOffHeapAllocation();
+    if (offHeap && !directOffHeap) {
+      _memoryManager = new MmapMemoryManager(null, _segmentName, null);
+    } else {
+      _memoryManager = new DirectMemoryManager(_segmentName, null);
+    }
+
+    // Initialize for each column
+    for (FieldSpec fieldSpec : physicalFieldSpecs1) {
+      String column = fieldSpec.getName();
+
+      // Partition info
+      PartitionFunction partitionFunction = null;
+      Set<Integer> partitions = null;
+      if (column.equals(_partitionColumn)) {
+        partitionFunction = _partitionFunction;
+        partitions = new HashSet<>();
+        partitions.add(segmentGeneratorConfig.getSequenceId());
+      }
+
+      FieldSpec.DataType dataType = fieldSpec.getDataType();
+      boolean isFixedWidthColumn = dataType.isFixedWidth();
+      MutableForwardIndex forwardIndex;
+      MutableDictionary dictionary;
+
+      int dictionaryColumnSize;
+      if (isFixedWidthColumn) {
+        dictionaryColumnSize = dataType.size();
+      } else {
+        dictionaryColumnSize = DEFAULT_EST_AVG_COL_SIZE;
+      }
+      // NOTE: preserve 10% buffer for cardinality to reduce the chance of 
re-sizing the dictionary
+      int estimatedCardinality = (int) (DEFAULT_EST_CARDINALITY * 1.1);
+      String dictionaryAllocationContext =
+          buildAllocationContext(_segmentName, column, 
V1Constants.Dict.FILE_EXTENSION);
+      dictionary = MutableDictionaryFactory
+          .getMutableDictionary(dataType, offHeap, _memoryManager, 
dictionaryColumnSize,
+              Math.min(estimatedCardinality, _capacity), 
dictionaryAllocationContext);
+
+      if (fieldSpec.isSingleValueField()) {
+        // Single-value dictionary-encoded forward index
+        String allocationContext =
+            buildAllocationContext(_segmentName, column, 
V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION);
+        forwardIndex = new FixedByteSVMutableForwardIndex(true, 
FieldSpec.DataType.INT, _capacity, _memoryManager,
+            allocationContext);
+      } else {
+        // Multi-value dictionary-encoded forward index
+        String allocationContext =
+            buildAllocationContext(_segmentName, column, 
V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION);
+        // TODO: Start with a smaller capacity on 
FixedByteMVForwardIndexReaderWriter and let it expand
+        forwardIndex = new 
FixedByteMVMutableForwardIndex(MAX_MULTI_VALUES_PER_ROW,
+            indexLoadingConfig.getRealtimeAvgMultiValueCount(), _capacity, 
Integer.BYTES, _memoryManager,
+            allocationContext);
+      }
+
+      _indexContainerMap.put(column,
+          new IntermediateIndexContainer(fieldSpec, partitionFunction, 
partitions, new NumValuesInfo(), forwardIndex,
+              dictionary));
+    }
+  }
+
+  @Override
+  public boolean index(GenericRow row, @Nullable RowMetadata rowMetadata)
+      throws IOException {
+    updateDictionary(row);
+    addNewRow(row);
+    _numDocsIndexed++;
+    return true;
+  }
+
+  @Override
+  public int getNumDocsIndexed() {
+    return _numDocsIndexed;
+  }
+
+  @Override
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
+  @Override
+  public SegmentMetadata getSegmentMetadata() {
+    return null;
+  }
+
+  @Override
+  public Set<String> getColumnNames() {
+    return _schema.getColumnNames();
+  }
+
+  @Override
+  public Set<String> getPhysicalColumnNames() {
+    return _schema.getPhysicalColumnNames();
+  }
+
+  @Override
+  public DataSource getDataSource(String columnName) {
+    return _indexContainerMap.get(columnName).toDataSource(_numDocsIndexed);
+  }
+
+  @Override
+  public List<StarTreeV2> getStarTrees() {
+    return null;
+  }
+
+  @Nullable
+  @Override
+  public ValidDocIndexReader getValidDocIndex() {
+    return null;
+  }
+
+  @Override
+  public GenericRow getRecord(int docId, GenericRow reuse) {
+    for (Map.Entry<String, IntermediateIndexContainer> entry : 
_indexContainerMap.entrySet()) {
+      String column = entry.getKey();
+      IntermediateIndexContainer indexContainer = entry.getValue();
+      Object value = getValue(docId, indexContainer.getForwardIndex(), 
indexContainer.getDictionary(),
+          indexContainer.getNumValuesInfo().getMaxNumValuesPerMVEntry());
+      reuse.putValue(column, value);
+    }
+    return reuse;
+  }
+
+  @Override
+  public void destroy() {
+    String segmentName = getSegmentName();
+    LOGGER.info("Trying to destroy segment : {}", segmentName);
+    for (Map.Entry<String, IntermediateIndexContainer> entry : 
_indexContainerMap.entrySet()) {
+      try {
+        entry.getValue().close();
+      } catch (IOException e) {
+        LOGGER.error("Failed to close indexes for column: {}. Continuing with 
error.", entry.getKey(), e);
+      }
+    }
+  }
+
+  private void updateDictionary(GenericRow row) {
+    for (Map.Entry<String, IntermediateIndexContainer> entry : 
_indexContainerMap.entrySet()) {
+      String column = entry.getKey();
+      IntermediateIndexContainer indexContainer = entry.getValue();
+      Object value = row.getValue(column);
+      MutableDictionary dictionary = indexContainer.getDictionary();
+      if (dictionary != null) {
+        if (indexContainer.getFieldSpec().isSingleValueField()) {
+          indexContainer.setDictId(dictionary.index(value));
+        } else {
+          indexContainer.setDictIds(dictionary.index((Object[]) value));
+        }
+
+        // Update min/max value from dictionary
+        indexContainer.setMinValue(dictionary.getMinVal());
+        indexContainer.setMaxValue(dictionary.getMaxVal());
+      }
+    }
+  }
+
+  private void addNewRow(GenericRow row)
+      throws IOException {
+    int docId = _numDocsIndexed;
+    for (Map.Entry<String, IntermediateIndexContainer> entry : 
_indexContainerMap.entrySet()) {
+      String column = entry.getKey();
+      IntermediateIndexContainer indexContainer = entry.getValue();
+      Object value = row.getValue(column);
+      FieldSpec fieldSpec = indexContainer.getFieldSpec();
+      if (fieldSpec.isSingleValueField()) {
+        // Update numValues info
+        indexContainer.getNumValuesInfo().updateSVEntry();
+
+        // Update indexes
+        MutableForwardIndex forwardIndex = indexContainer.getForwardIndex();
+        int dictId = indexContainer.getDictId();
+        if (dictId >= 0) {
+          // Dictionary-encoded single-value column
+
+          // Update forward index
+          forwardIndex.setDictId(docId, dictId);
+        } else {
+          // Single-value column with raw index
+
+          // Update forward index
+          FieldSpec.DataType dataType = fieldSpec.getDataType();
+          switch (dataType) {
+            case INT:
+              forwardIndex.setInt(docId, (Integer) value);
+              break;
+            case LONG:
+              forwardIndex.setLong(docId, (Long) value);
+              break;
+            case FLOAT:
+              forwardIndex.setFloat(docId, (Float) value);
+              break;
+            case DOUBLE:
+              forwardIndex.setDouble(docId, (Double) value);
+              break;
+            case STRING:
+              forwardIndex.setString(docId, (String) value);
+              break;
+            case BYTES:
+              forwardIndex.setBytes(docId, (byte[]) value);
+              break;
+            default:
+              throw new UnsupportedOperationException(
+                  "Unsupported data type: " + dataType + " for no-dictionary 
column: " + column);
+          }
+
+          // Update min/max value from raw value
+          // NOTE: Skip updating min/max value for aggregated metrics because 
the value will change over time.
+          if (fieldSpec.getFieldType() != FieldSpec.FieldType.METRIC) {
+            Comparable comparable;
+            if (dataType == FieldSpec.DataType.BYTES) {
+              comparable = new ByteArray((byte[]) value);
+            } else {
+              comparable = (Comparable) value;
+            }
+            if (indexContainer.getMinValue() == null) {
+              indexContainer.setMinValue(comparable);
+              indexContainer.setMaxValue(comparable);
+            } else {
+              if (comparable.compareTo(indexContainer.getMinValue()) < 0) {
+                indexContainer.setMinValue(comparable);
+              }
+              if (comparable.compareTo(indexContainer.getMaxValue()) > 0) {
+                indexContainer.setMaxValue(comparable);
+              }
+            }
+          }
+        }
+      } else {
+        // Multi-value column (always dictionary-encoded)
+        int[] dictIds = indexContainer.getDictIds();
+
+        // Update numValues info
+        indexContainer.getNumValuesInfo().updateMVEntry(dictIds.length);
+
+        // Update forward index
+        indexContainer.getForwardIndex().setDictIdMV(docId, dictIds);
+      }
+    }
+  }
+
+  private String buildAllocationContext(String segmentName, String columnName, 
String indexType) {
+    return segmentName + ":" + columnName + indexType;
+  }
+
+  /**
+   * Helper method to read the value for the given document id.
+   */
+  private static Object getValue(int docId, MutableForwardIndex forwardIndex, 
MutableDictionary dictionary,
+      int maxNumMultiValues) {
+    // Dictionary based
+    if (forwardIndex.isSingleValue()) {
+      int dictId = forwardIndex.getDictId(docId);
+      return dictionary.get(dictId);
+    } else {
+      int[] dictIds = new int[maxNumMultiValues];
+      int numValues = forwardIndex.getDictIdMV(docId, dictIds);
+      Object[] value = new Object[numValues];
+      for (int i = 0; i < numValues; i++) {
+        value[i] = dictionary.get(dictIds[i]);
+      }
+      return value;
+    }
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/io/readerwriter/RealtimeIndexOffHeapMemoryManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/io/readerwriter/RealtimeIndexOffHeapMemoryManager.java
index 2687ad8..e3612f1 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/io/readerwriter/RealtimeIndexOffHeapMemoryManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/io/readerwriter/RealtimeIndexOffHeapMemoryManager.java
@@ -79,7 +79,9 @@ public abstract class RealtimeIndexOffHeapMemoryManager 
implements PinotDataBuff
     PinotDataBuffer buffer = allocateInternal(size, allocationContext);
     _totalAllocatedBytes += size;
     _buffers.add(buffer);
-    _serverMetrics.addValueToTableGauge(_tableName, 
ServerGauge.REALTIME_OFFHEAP_MEMORY_USED, size);
+    if (_serverMetrics != null) {
+      _serverMetrics.addValueToTableGauge(_tableName, 
ServerGauge.REALTIME_OFFHEAP_MEMORY_USED, size);
+    }
     return buffer;
   }
 
@@ -104,7 +106,9 @@ public abstract class RealtimeIndexOffHeapMemoryManager 
implements PinotDataBuff
     for (PinotDataBuffer buffer : _buffers) {
       buffer.close();
     }
-    _serverMetrics.addValueToTableGauge(_tableName, 
ServerGauge.REALTIME_OFFHEAP_MEMORY_USED, -_totalAllocatedBytes);
+    if (_serverMetrics != null) {
+      _serverMetrics.addValueToTableGauge(_tableName, 
ServerGauge.REALTIME_OFFHEAP_MEMORY_USED, -_totalAllocatedBytes);
+    }
     doClose();
     _buffers.clear();
     _totalAllocatedBytes = 0;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeColumnStatistics.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/MutableColumnStatistics.java
similarity index 97%
rename from 
pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeColumnStatistics.java
rename to 
pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/MutableColumnStatistics.java
index 8c4c93a..8b76bda 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeColumnStatistics.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/MutableColumnStatistics.java
@@ -33,7 +33,7 @@ import org.apache.pinot.spi.data.FieldSpec;
  *
  * TODO: Gather more info on the fly to avoid scanning the segment
  */
-public class RealtimeColumnStatistics implements ColumnStatistics {
+public class MutableColumnStatistics implements ColumnStatistics {
   private final DataSource _dataSource;
   private final int[] _sortedDocIdIterationOrder;
 
@@ -41,7 +41,7 @@ public class RealtimeColumnStatistics implements 
ColumnStatistics {
   //       dictionary.
   private final Dictionary _dictionary;
 
-  public RealtimeColumnStatistics(DataSource dataSource, int[] 
sortedDocIdIterationOrder) {
+  public MutableColumnStatistics(DataSource dataSource, int[] 
sortedDocIdIterationOrder) {
     _dataSource = dataSource;
     _sortedDocIdIterationOrder = sortedDocIdIterationOrder;
     _dictionary = dataSource.getDictionary();
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeNoDictionaryColStatistics.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/MutableNoDictionaryColStatistics.java
similarity index 95%
rename from 
pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeNoDictionaryColStatistics.java
rename to 
pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/MutableNoDictionaryColStatistics.java
index d639b17..109884d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeNoDictionaryColStatistics.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/MutableNoDictionaryColStatistics.java
@@ -28,11 +28,11 @@ import 
org.apache.pinot.core.segment.creator.ColumnStatistics;
 import static org.apache.pinot.core.common.Constants.UNKNOWN_CARDINALITY;
 
 
-public class RealtimeNoDictionaryColStatistics implements ColumnStatistics {
+public class MutableNoDictionaryColStatistics implements ColumnStatistics {
   private final DataSourceMetadata _dataSourceMetadata;
   private final MutableForwardIndex _forwardIndex;
 
-  public RealtimeNoDictionaryColStatistics(DataSource dataSource) {
+  public MutableNoDictionaryColStatistics(DataSource dataSource) {
     _dataSourceMetadata = dataSource.getDataSourceMetadata();
     _forwardIndex = (MutableForwardIndex) dataSource.getForwardIndex();
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeSegmentStatsContainer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeSegmentStatsContainer.java
index bedf50c..d9b382d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeSegmentStatsContainer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeSegmentStatsContainer.java
@@ -42,10 +42,10 @@ public class RealtimeSegmentStatsContainer implements 
SegmentPreIndexStatsContai
     for (String columnName : realtimeSegment.getPhysicalColumnNames()) {
       DataSource dataSource = realtimeSegment.getDataSource(columnName);
       if (dataSource.getDictionary() != null) {
-        _columnStatisticsMap.put(columnName, new 
RealtimeColumnStatistics(realtimeSegment.getDataSource(columnName),
+        _columnStatisticsMap.put(columnName, new 
MutableColumnStatistics(realtimeSegment.getDataSource(columnName),
             realtimeSegmentRecordReader.getSortedDocIdIterationOrder()));
       } else {
-        _columnStatisticsMap.put(columnName, new 
RealtimeNoDictionaryColStatistics(dataSource));
+        _columnStatisticsMap.put(columnName, new 
MutableNoDictionaryColStatistics(dataSource));
       }
     }
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/IntermediateSegmentSegmentCreationDataSource.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/IntermediateSegmentSegmentCreationDataSource.java
new file mode 100644
index 0000000..7a27ed5
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/IntermediateSegmentSegmentCreationDataSource.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.creator;
+
+import org.apache.pinot.common.Utils;
+import org.apache.pinot.core.data.readers.IntermediateSegmentRecordReader;
+import org.apache.pinot.core.indexsegment.mutable.IntermediateSegment;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class IntermediateSegmentSegmentCreationDataSource implements 
SegmentCreationDataSource {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(IntermediateSegmentSegmentCreationDataSource.class);
+  private final IntermediateSegment _intermediateSegment;
+  private final IntermediateSegmentRecordReader 
_intermediateSegmentRecordReader;
+
+
+  public 
IntermediateSegmentSegmentCreationDataSource(IntermediateSegmentRecordReader 
intermediateSegmentRecordReader) {
+    _intermediateSegmentRecordReader = intermediateSegmentRecordReader;
+    _intermediateSegment = 
_intermediateSegmentRecordReader.getIntermediateSegment();
+  }
+
+  @Override
+  public SegmentPreIndexStatsContainer gatherStats(StatsCollectorConfig 
statsCollectorConfig) {
+
+    return new IntermediateSegmentStatsContainer(_intermediateSegment);
+  }
+
+  @Override
+  public RecordReader getRecordReader() {
+    try {
+      _intermediateSegmentRecordReader.rewind();
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while rewinding record reader", e);
+      Utils.rethrowException(e);
+    }
+    return _intermediateSegmentRecordReader;
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/IntermediateSegmentStatsContainer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/IntermediateSegmentStatsContainer.java
new file mode 100644
index 0000000..3e29d52
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/IntermediateSegmentStatsContainer.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.creator;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.indexsegment.mutable.IntermediateSegment;
+import org.apache.pinot.core.realtime.converter.stats.MutableColumnStatistics;
+
+
+public class IntermediateSegmentStatsContainer implements 
SegmentPreIndexStatsContainer {
+  private final IntermediateSegment _intermediateSegment;
+  private final Map<String, ColumnStatistics> _columnStatisticsMap = new 
HashMap<>();
+
+  public IntermediateSegmentStatsContainer(IntermediateSegment 
intermediateSegment) {
+    _intermediateSegment = intermediateSegment;
+
+    // Create all column statistics
+    for (String columnName : intermediateSegment.getPhysicalColumnNames()) {
+      DataSource dataSource = intermediateSegment.getDataSource(columnName);
+      // Always use dictionary for intermediate segment stats
+      _columnStatisticsMap.put(columnName, new 
MutableColumnStatistics(dataSource, null));
+    }
+  }
+
+  @Override
+  public ColumnStatistics getColumnProfileFor(String column) {
+    return _columnStatisticsMap.get(column);
+  }
+
+  @Override
+  public int getTotalDocCount() {
+    return _intermediateSegment.getNumDocsIndexed();
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
index d2b4e52..3c79af4 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -602,7 +602,7 @@ public class SegmentColumnarIndexCreator implements 
SegmentCreator {
     properties.setProperty(getKeyFor(column, HAS_FST_INDEX), 
String.valueOf(hasFSTIndex));
     properties.setProperty(getKeyFor(column, HAS_JSON_INDEX), 
String.valueOf(hasJsonIndex));
     properties.setProperty(getKeyFor(column, IS_SINGLE_VALUED), 
String.valueOf(fieldSpec.isSingleValueField()));
-    properties.setProperty(getKeyFor(column, MAX_MULTI_VALUE_ELEMTS),
+    properties.setProperty(getKeyFor(column, MAX_MULTI_VALUE_ELEMENTS),
         
String.valueOf(columnIndexCreationInfo.getMaxNumberOfMultiValueElements()));
     properties.setProperty(getKeyFor(column, TOTAL_NUMBER_OF_ENTRIES),
         String.valueOf(columnIndexCreationInfo.getTotalNumberOfEntries()));
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
index 3078dc7..1efabfc 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
@@ -32,6 +32,7 @@ import java.util.Set;
 import java.util.UUID;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.data.readers.IntermediateSegmentRecordReader;
 import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
 import org.apache.pinot.core.data.recordtransformer.CompositeTransformer;
 import org.apache.pinot.core.data.recordtransformer.RecordTransformer;
@@ -39,6 +40,7 @@ import 
org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
 import org.apache.pinot.core.segment.creator.ColumnIndexCreationInfo;
 import org.apache.pinot.core.segment.creator.ColumnStatistics;
+import 
org.apache.pinot.core.segment.creator.IntermediateSegmentSegmentCreationDataSource;
 import 
org.apache.pinot.core.segment.creator.RecordReaderSegmentCreationDataSource;
 import org.apache.pinot.core.segment.creator.SegmentCreationDataSource;
 import org.apache.pinot.core.segment.creator.SegmentCreator;
@@ -136,8 +138,13 @@ public class SegmentIndexCreationDriverImpl implements 
SegmentIndexCreationDrive
 
   public void init(SegmentGeneratorConfig config, RecordReader recordReader)
       throws Exception {
-    init(config, new RecordReaderSegmentCreationDataSource(recordReader),
-        CompositeTransformer.getDefaultTransformer(config.getTableConfig(), 
config.getSchema()));
+    SegmentCreationDataSource dataSource;
+    if (config.isIntermediateSegmentRecordReader()) {
+      dataSource = new 
IntermediateSegmentSegmentCreationDataSource((IntermediateSegmentRecordReader) 
recordReader);
+    } else {
+      dataSource = new RecordReaderSegmentCreationDataSource(recordReader);
+    }
+    init(config, dataSource, 
CompositeTransformer.getDefaultTransformer(config.getTableConfig(), 
config.getSchema()));
   }
 
   public void init(SegmentGeneratorConfig config, SegmentCreationDataSource 
dataSource,
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
index b22ceac..82d4ae1 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
@@ -82,7 +82,7 @@ public class V1Constants {
       public static final String HAS_FST_INDEX = "hasFSTIndex";
       public static final String HAS_JSON_INDEX = "hasJsonIndex";
       public static final String IS_SINGLE_VALUED = "isSingleValues";
-      public static final String MAX_MULTI_VALUE_ELEMTS = 
"maxNumberOfMultiValues";
+      public static final String MAX_MULTI_VALUE_ELEMENTS = 
"maxNumberOfMultiValues";
       public static final String TOTAL_NUMBER_OF_ENTRIES = 
"totalNumberOfEntries";
       public static final String IS_AUTO_GENERATED = "isAutoGenerated";
       public static final String DEFAULT_NULL_VALUE = "defaultNullValue";
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/IntermediateIndexContainer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/IntermediateIndexContainer.java
new file mode 100644
index 0000000..7c8263f
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/IntermediateIndexContainer.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.index.column;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.data.partition.PartitionFunction;
+import org.apache.pinot.core.segment.index.datasource.MutableDataSource;
+import org.apache.pinot.core.segment.index.readers.MutableDictionary;
+import org.apache.pinot.core.segment.index.readers.MutableForwardIndex;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class IntermediateIndexContainer implements Closeable {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(IntermediateIndexContainer.class);
+
+  final FieldSpec _fieldSpec;
+  final PartitionFunction _partitionFunction;
+  final Set<Integer> _partitions;
+  final NumValuesInfo _numValuesInfo;
+  final MutableForwardIndex _forwardIndex;
+  final MutableDictionary _dictionary;
+
+  volatile Comparable _minValue;
+  volatile Comparable _maxValue;
+
+  // Hold the dictionary id for the latest record
+  int _dictId = Integer.MIN_VALUE;
+  int[] _dictIds;
+
+  public IntermediateIndexContainer(FieldSpec fieldSpec, @Nullable 
PartitionFunction partitionFunction,
+      @Nullable Set<Integer> partitions, NumValuesInfo numValuesInfo, 
MutableForwardIndex forwardIndex,
+      MutableDictionary dictionary) {
+    _fieldSpec = fieldSpec;
+    _partitionFunction = partitionFunction;
+    _partitions = partitions;
+    _numValuesInfo = numValuesInfo;
+    _forwardIndex = forwardIndex;
+    _dictionary = dictionary;
+  }
+
+  public DataSource toDataSource(int numDocsIndexed) {
+    return new MutableDataSource(_fieldSpec, numDocsIndexed, 
_numValuesInfo._numValues,
+        _numValuesInfo._maxNumValuesPerMVEntry, _partitionFunction, 
_partitions, _minValue, _maxValue, _forwardIndex,
+        _dictionary, null, null, null, false, null, null, null);
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    String column = _fieldSpec.getName();
+    try {
+      _forwardIndex.close();
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while closing forward index for column: 
{}, continuing with error", column, e);
+    }
+    if (_dictionary != null) {
+      try {
+        _dictionary.close();
+      } catch (Exception e) {
+        LOGGER.error("Caught exception while closing dictionary for column: 
{}, continuing with error", column, e);
+      }
+    }
+  }
+
+  public FieldSpec getFieldSpec() {
+    return _fieldSpec;
+  }
+
+  public NumValuesInfo getNumValuesInfo() {
+    return _numValuesInfo;
+  }
+
+  public MutableForwardIndex getForwardIndex() {
+    return _forwardIndex;
+  }
+
+  public MutableDictionary getDictionary() {
+    return _dictionary;
+  }
+
+  public int getDictId() {
+    return _dictId;
+  }
+
+  public void setDictId(int dictId) {
+    _dictId = dictId;
+  }
+
+  public int[] getDictIds() {
+    return _dictIds;
+  }
+
+  public void setDictIds(int[] dictIds) {
+    _dictIds = dictIds;
+  }
+
+  public Comparable getMinValue() {
+    return _minValue;
+  }
+
+  public void setMinValue(Comparable minValue) {
+    _minValue = minValue;
+  }
+
+  public Comparable getMaxValue() {
+    return _maxValue;
+  }
+
+  public void setMaxValue(Comparable maxValue) {
+    _maxValue = maxValue;
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/NumValuesInfo.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/NumValuesInfo.java
new file mode 100644
index 0000000..6992c8f
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/NumValuesInfo.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.index.column;
+
+public class NumValuesInfo {
+  volatile int _numValues = 0;
+  volatile int _maxNumValuesPerMVEntry = 0;
+
+  public void updateSVEntry() {
+    _numValues++;
+  }
+
+  public void updateMVEntry(int numValuesInMVEntry) {
+    _numValues += numValuesInMVEntry;
+    _maxNumValuesPerMVEntry = Math.max(_maxNumValuesPerMVEntry, 
numValuesInMVEntry);
+  }
+
+  public int getNumValues() {
+    return _numValues;
+  }
+
+  public int getMaxNumValuesPerMVEntry() {
+    return _maxNumValuesPerMVEntry;
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
index e307aaf..6056f6a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
@@ -194,6 +194,9 @@ public class IndexLoadingConfig {
   }
 
   private void extractFromInstanceConfig(InstanceDataManagerConfig 
instanceDataManagerConfig) {
+    if (instanceDataManagerConfig == null) {
+      return;
+    }
     ReadMode instanceReadMode = instanceDataManagerConfig.getReadMode();
     if (instanceReadMode != null) {
       _readMode = instanceReadMode;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/ColumnMetadata.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/ColumnMetadata.java
index 43990ab..b67b0fc 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/ColumnMetadata.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/ColumnMetadata.java
@@ -100,7 +100,7 @@ public class ColumnMetadata {
     builder.setHasInvertedIndex(config.getBoolean(getKeyFor(column, 
HAS_INVERTED_INDEX)));
     builder.setHasFSTIndex(config.getBoolean(getKeyFor(column, 
HAS_INVERTED_INDEX), false));
     builder.setSingleValue(config.getBoolean(getKeyFor(column, 
IS_SINGLE_VALUED)));
-    builder.setMaxNumberOfMultiValues(config.getInt(getKeyFor(column, 
MAX_MULTI_VALUE_ELEMTS)));
+    builder.setMaxNumberOfMultiValues(config.getInt(getKeyFor(column, 
MAX_MULTI_VALUE_ELEMENTS)));
     builder.setTotalNumberOfEntries(config.getInt(getKeyFor(column, 
TOTAL_NUMBER_OF_ENTRIES)));
     builder.setAutoGenerated(config.getBoolean(getKeyFor(column, 
IS_AUTO_GENERATED), false));
     builder.setDefaultNullValueString(config.getString(getKeyFor(column, 
DEFAULT_NULL_VALUE), null));
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/IntermediateSegmentTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/IntermediateSegmentTest.java
new file mode 100644
index 0000000..1220beb
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/IntermediateSegmentTest.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.indexsegment;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.segment.ReadMode;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.data.readers.IntermediateSegmentRecordReader;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.core.indexsegment.mutable.IntermediateSegment;
+import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
+import 
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
+import org.apache.pinot.core.segment.index.readers.InvertedIndexReader;
+import org.apache.pinot.plugin.inputformat.avro.AvroRecordReader;
+import org.apache.pinot.segments.v1.creator.SegmentTestUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.TimeGranularitySpec;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+
+public class IntermediateSegmentTest {
+  private static final String AVRO_DATA_SV = "data/test_data-sv.avro"; // 
"data/test_data-sv.avro";
+  private static final String AVRO_DATA_MV = "data/test_data-mv.avro"; // 
"data/test_data-sv.avro";
+  private static final String SEGMENT_NAME = "testSegmentName";
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), 
"IntermediateSegmentTest");
+
+  @BeforeMethod
+  public void setUp()
+      throws Exception {
+    FileUtils.deleteQuietly(INDEX_DIR);
+  }
+
+  @AfterMethod
+  public void tearDown() {
+    FileUtils.deleteQuietly(INDEX_DIR);
+  }
+
+  @DataProvider(name = "segmentCreationTestCases")
+  private static Object[][] createSegmentCreationTestCases()
+      throws IOException {
+    return new Object[][]{{AVRO_DATA_SV, createSchema(AVRO_DATA_SV), 
createTableConfig(
+        AVRO_DATA_SV)}, {AVRO_DATA_MV, createSchema(AVRO_DATA_MV), 
createTableConfig(AVRO_DATA_MV)}};
+  }
+
+  @Test(dataProvider = "segmentCreationTestCases")
+  public void testOfflineSegmentCreationFromDifferentWays(String inputFile, 
Schema schema, TableConfig tableConfig)
+      throws Exception {
+    // Get resource file path.
+    URL resource = getClass().getClassLoader().getResource(inputFile);
+    assertNotNull(resource);
+    String filePath = resource.getFile();
+
+    // Create the segment generator config.
+    SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(tableConfig, schema);
+    segmentGeneratorConfig.setInputFilePath(filePath);
+    segmentGeneratorConfig.setTableName("testTable");
+    segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
+    segmentGeneratorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
+    segmentGeneratorConfig.setSkipTimeValueCheck(true);
+    
segmentGeneratorConfig.setInvertedIndexCreationColumns(Arrays.asList("column6", 
"column7"));
+
+    IndexSegment segmentFromIntermediateSegment = 
buildSegmentFromIntermediateSegment(segmentGeneratorConfig);
+    IndexSegment segmentFromAvroRecordReader = 
buildSegmentFromAvroRecordReader(segmentGeneratorConfig);
+
+    assertNotNull(segmentFromIntermediateSegment);
+    assertNotNull(segmentFromAvroRecordReader);
+    assertEquals(segmentFromIntermediateSegment.getColumnNames(), 
segmentFromAvroRecordReader.getColumnNames());
+    Set<String> physicalColumnsFromIntermediateSegment = 
segmentFromIntermediateSegment.getPhysicalColumnNames();
+    Set<String> physicalColumnsFromAvroSegment = 
segmentFromAvroRecordReader.getPhysicalColumnNames();
+    assertEquals(physicalColumnsFromIntermediateSegment, 
physicalColumnsFromAvroSegment);
+
+    // Comparison for every columns
+    for (String column : physicalColumnsFromIntermediateSegment) {
+      DataSource dataSourceFromIntermediateSegment = 
segmentFromIntermediateSegment.getDataSource(column);
+      DataSource dataSourceFromAvroRecordReader = 
segmentFromAvroRecordReader.getDataSource(column);
+
+      // Comparison for dictionaries.
+      Dictionary actualDictionary = 
dataSourceFromIntermediateSegment.getDictionary();
+      Dictionary expectedDictionary = 
dataSourceFromAvroRecordReader.getDictionary();
+      assertEquals(actualDictionary.getMinVal(), 
expectedDictionary.getMinVal());
+      assertEquals(actualDictionary.getMaxVal(), 
expectedDictionary.getMaxVal());
+      assertEquals(actualDictionary.getValueType(), 
expectedDictionary.getValueType());
+      assertEquals(actualDictionary.length(), expectedDictionary.length());
+      int dictionaryLength = actualDictionary.length();
+      for (int i = 0; i < dictionaryLength; i++) {
+        assertEquals(actualDictionary.get(i), expectedDictionary.get(i));
+      }
+
+      // Comparison for inverted index
+      InvertedIndexReader actualInvertedIndexReader = 
dataSourceFromIntermediateSegment.getInvertedIndex();
+      InvertedIndexReader expectedInvertedIndexReader = 
dataSourceFromAvroRecordReader.getInvertedIndex();
+      if (actualInvertedIndexReader != null) {
+        for (int j = 0; j < dictionaryLength; j++) {
+          assertEquals(actualInvertedIndexReader.getDocIds(j), 
expectedInvertedIndexReader.getDocIds(j));
+        }
+      }
+    }
+  }
+
+  private IndexSegment 
buildSegmentFromIntermediateSegment(SegmentGeneratorConfig 
segmentGeneratorConfig)
+      throws Exception {
+    // Set intermediate segment record reader.
+    segmentGeneratorConfig.setIntermediateSegmentRecordReader(true);
+    String segmentName = SEGMENT_NAME + "_from_intermediate_segment";
+    segmentGeneratorConfig.setSegmentName(segmentName);
+
+    IntermediateSegment intermediateSegment = new 
IntermediateSegment(segmentGeneratorConfig);
+
+    // Ingest data.
+    ingestDataToIntermediateSegment(segmentGeneratorConfig, 
intermediateSegment);
+    IntermediateSegmentRecordReader intermediateSegmentRecordReader =
+        new IntermediateSegmentRecordReader(intermediateSegment);
+
+    // Build the segment from intermediate segment.
+    SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig, intermediateSegmentRecordReader);
+    driver.build();
+
+    // Destroy intermediate segment after the segment creation.
+    intermediateSegment.destroy();
+
+    return ImmutableSegmentLoader.load(new File(INDEX_DIR, segmentName), 
ReadMode.heap);
+  }
+
+  private IndexSegment buildSegmentFromAvroRecordReader(SegmentGeneratorConfig 
segmentGeneratorConfig)
+      throws Exception {
+    // Reset default value to use avro record reader
+    segmentGeneratorConfig.setIntermediateSegmentRecordReader(false);
+    String segmentName = SEGMENT_NAME + "_from_avro_reader";
+    segmentGeneratorConfig.setSegmentName(segmentName);
+
+    // Build the index segment.
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig);
+    driver.build();
+
+    return ImmutableSegmentLoader.load(new File(INDEX_DIR, segmentName), 
ReadMode.heap);
+  }
+
+  private void ingestDataToIntermediateSegment(SegmentGeneratorConfig 
segmentGeneratorConfig,
+      IntermediateSegment intermediateSegment)
+      throws IOException {
+    AvroRecordReader avroRecordReader = new AvroRecordReader();
+    avroRecordReader.init(new File(segmentGeneratorConfig.getInputFilePath()), 
null, null);
+
+    GenericRow genericRow = new GenericRow();
+    while (avroRecordReader.hasNext()) {
+      genericRow.clear();
+      genericRow = avroRecordReader.next(genericRow);
+      intermediateSegment.index(genericRow, null);
+    }
+  }
+
+  private static Schema createSchema(String inputFile)
+      throws IOException {
+    Schema schema;
+    if (AVRO_DATA_SV.equals(inputFile)) {
+      schema = new 
Schema.SchemaBuilder().setSchemaName("testTable").addMetric("column1", 
FieldSpec.DataType.INT)
+          .addMetric("column3", 
FieldSpec.DataType.INT).addSingleValueDimension("column5", 
FieldSpec.DataType.STRING)
+          .addSingleValueDimension("column6", FieldSpec.DataType.INT)
+          .addSingleValueDimension("column7", FieldSpec.DataType.INT)
+          .addSingleValueDimension("column9", FieldSpec.DataType.INT)
+          .addSingleValueDimension("column11", FieldSpec.DataType.STRING)
+          .addSingleValueDimension("column12", 
FieldSpec.DataType.STRING).addMetric("column17", FieldSpec.DataType.INT)
+          .addMetric("column18", FieldSpec.DataType.INT)
+          .addTime(new TimeGranularitySpec(FieldSpec.DataType.INT, 
TimeUnit.DAYS, "daysSinceEpoch"), null).build();
+    } else {
+      URL resource = 
IntermediateSegmentTest.class.getClassLoader().getResource(inputFile);
+      assertNotNull(resource);
+      String filePath = resource.getFile();
+      schema = SegmentTestUtils.extractSchemaFromAvroWithoutTime(new 
File(filePath));
+    }
+    return schema;
+  }
+
+  private static TableConfig createTableConfig(String inputFile) {
+    TableConfig tableConfig;
+    if (AVRO_DATA_SV.equals(inputFile)) {
+      tableConfig =
+          new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName("daysSinceEpoch")
+              .setInvertedIndexColumns(Arrays.asList("column6", "column7", 
"column11", "column17", "column18")).build();
+    } else {
+      tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+    }
+    return tableConfig;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to