This is an automated email from the ASF dual-hosted git repository.

kishoreg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new aed1307  Add MV raw forward index and MV `BYTES` data type (#7595)
aed1307 is described below

commit aed13072dac0d8dae29056fac77f0f457be7adba
Author: Richard Startin <rich...@startree.ai>
AuthorDate: Fri Oct 22 15:50:36 2021 +0100

    Add MV raw forward index and MV `BYTES` data type (#7595)
    
    * Initial code for MultiValue forward Index
    
    * Wiring in the segment creation driver Impl
    
    * cleanup
    
    * finish off adding BYTES_ARRAY type
    
    * use less memory and fewer passes during encoding
    
    * reduce memory requirement for forwardindexwriter
    
    * track size in bytes of largest row so chunks can be sized to accommodate 
it
    
    * remove TODOs
    
    * force derivation of number of docs for raw MV columns
    
    * specify character encoding
    
    * leave changes to integration tests to MV TEXT index implementation
    
    * fix javadoc
    
    * don't use StringUtils
    
    * fix formatting after rebase
    
    * fix javadoc formatting again
    
    * use zstd's compress bound
    
    Co-authored-by: kishoreg <g.kish...@gmail.com>
---
 .../org/apache/pinot/common/utils/DataSchema.java  |   9 +-
 .../apache/pinot/common/utils/PinotDataType.java   |  37 +++-
 .../apache/pinot/common/utils/DataSchemaTest.java  |  19 +-
 .../pinot/common/utils/PinotDataTypeTest.java      |   6 +-
 .../pinot/core/minion/RawIndexConverter.java       |   2 +-
 .../local/io/compression/LZ4Compressor.java        |   5 +
 .../io/compression/PassThroughCompressor.java      |   5 +
 .../local/io/compression/SnappyCompressor.java     |   5 +
 .../local/io/compression/ZstandardCompressor.java  |   5 +
 .../writer/impl/BaseChunkSVForwardIndexWriter.java |  60 ++++---
 .../impl/FixedByteChunkSVForwardIndexWriter.java   |   4 +-
 .../impl/VarByteChunkSVForwardIndexWriter.java     |  77 ++++++--
 .../creator/impl/SegmentColumnarIndexCreator.java  | 192 +++++++++++++++++---
 .../fwd/MultiValueFixedByteRawIndexCreator.java    | 181 +++++++++++++++++++
 .../impl/fwd/MultiValueVarByteRawIndexCreator.java | 122 +++++++++++++
 .../stats/AbstractColumnStatisticsCollector.java   |   5 +
 .../stats/BytesColumnPredIndexStatsCollector.java  |  44 +++--
 .../stats/StringColumnPreIndexStatsCollector.java  |  10 ++
 .../forward/VarByteChunkMVForwardIndexReader.java  | 193 +++++++++++++++++++++
 .../local/segment/store/FilePerIndexDirectory.java |   6 +-
 .../MultiValueVarByteRawIndexCreatorTest.java      | 141 +++++++++++++++
 .../segment/index/creator/RawIndexCreatorTest.java | 135 +++++++++++---
 .../org/apache/pinot/segment/spi/V1Constants.java  |   1 +
 .../segment/spi/compression/ChunkCompressor.java   |   2 +
 .../spi/creator/ColumnIndexCreationInfo.java       |   4 +
 .../segment/spi/creator/ColumnStatistics.java      |   7 +
 .../spi/index/creator/ForwardIndexCreator.java     |   9 +
 .../spi/index/reader/ForwardIndexReader.java       |  19 ++
 .../converter/DictionaryToRawIndexConverter.java   |   2 +-
 29 files changed, 1187 insertions(+), 120 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
index 6b61cfc..37fb392 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
@@ -255,12 +255,13 @@ public class DataSchema {
     DOUBLE_ARRAY,
     BOOLEAN_ARRAY /* Stored as INT_ARRAY */,
     TIMESTAMP_ARRAY /* Stored as LONG_ARRAY */,
+    BYTES_ARRAY,
     STRING_ARRAY;
 
     private static final EnumSet<ColumnDataType> NUMERIC_TYPES = 
EnumSet.of(INT, LONG, FLOAT, DOUBLE);
     private static final EnumSet<ColumnDataType> INTEGRAL_TYPES = 
EnumSet.of(INT, LONG);
     private static final EnumSet<ColumnDataType> ARRAY_TYPES = 
EnumSet.of(INT_ARRAY, LONG_ARRAY, FLOAT_ARRAY,
-        DOUBLE_ARRAY, STRING_ARRAY, BOOLEAN_ARRAY, TIMESTAMP_ARRAY);
+        DOUBLE_ARRAY, STRING_ARRAY, BOOLEAN_ARRAY, TIMESTAMP_ARRAY, 
BYTES_ARRAY);
     private static final EnumSet<ColumnDataType> NUMERIC_ARRAY_TYPES = 
EnumSet.of(INT_ARRAY, LONG_ARRAY, FLOAT_ARRAY,
         DOUBLE_ARRAY);
     private static final EnumSet<ColumnDataType> INTEGRAL_ARRAY_TYPES = 
EnumSet.of(INT_ARRAY, LONG_ARRAY);
@@ -368,6 +369,8 @@ public class DataSchema {
           return toBooleanArray(value);
         case TIMESTAMP_ARRAY:
           return toTimestampArray(value);
+        case BYTES_ARRAY:
+          return (byte[][]) value;
         default:
           throw new IllegalStateException(String.format("Cannot convert: '%s' 
to type: %s", value, this));
       }
@@ -424,6 +427,8 @@ public class DataSchema {
           return toBooleanArray(value);
         case TIMESTAMP_ARRAY:
           return formatTimestampArray(value);
+        case BYTES_ARRAY:
+          return (byte[][]) value;
         default:
           throw new IllegalStateException(String.format("Cannot convert and 
format: '%s' to type: %s", value, this));
       }
@@ -541,6 +546,8 @@ public class DataSchema {
           return BOOLEAN_ARRAY;
         case TIMESTAMP:
           return TIMESTAMP_ARRAY;
+        case BYTES:
+          return BYTES_ARRAY;
         default:
           throw new IllegalStateException("Unsupported data type: " + 
dataType);
       }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java
index f64bf4a..5352820 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java
@@ -784,6 +784,13 @@ public enum PinotDataType {
     }
   },
 
+  BYTES_ARRAY {
+    @Override
+    public byte[][] convert(Object value, PinotDataType sourceType) {
+      return sourceType.toBytesArray(value);
+    }
+  },
+
   OBJECT_ARRAY;
 
   /**
@@ -1034,6 +1041,24 @@ public enum PinotDataType {
     }
   }
 
+  public byte[][] toBytesArray(Object value) {
+    if (value instanceof byte[][]) {
+      return (byte[][]) value;
+    }
+    if (isSingleValue()) {
+      return new byte[][]{toBytes(value)};
+    } else {
+      Object[] valueArray = toObjectArray(value);
+      int length = valueArray.length;
+      byte[][] bytesArray = new byte[length][];
+      PinotDataType singleValueType = getSingleValueType();
+      for (int i = 0; i < length; i++) {
+        bytesArray[i] = singleValueType.toBytes(valueArray[i]);
+      }
+      return bytesArray;
+    }
+  }
+
   private static Object[] toObjectArray(Object array) {
     Class<?> componentType = array.getClass().getComponentType();
     if (componentType.isPrimitive()) {
@@ -1132,6 +1157,8 @@ public enum PinotDataType {
         return DOUBLE;
       case STRING_ARRAY:
         return STRING;
+      case BYTES_ARRAY:
+        return BYTES;
       case OBJECT_ARRAY:
         return OBJECT;
       case BOOLEAN_ARRAY:
@@ -1205,6 +1232,9 @@ public enum PinotDataType {
     if (cls == Short.class) {
       return SHORT_ARRAY;
     }
+    if (cls == byte[].class) {
+      return BYTES_ARRAY;
+    }
     if (cls == Boolean.class) {
       return BOOLEAN_ARRAY;
     }
@@ -1233,7 +1263,6 @@ public enum PinotDataType {
   /**
    * Returns the {@link PinotDataType} for the given {@link FieldSpec} for 
data ingestion purpose. Returns object array
    * type for multi-valued types.
-   * TODO: Add MV support for BYTES
    */
   public static PinotDataType getPinotDataTypeForIngestion(FieldSpec 
fieldSpec) {
     DataType dataType = fieldSpec.getDataType();
@@ -1259,11 +1288,7 @@ public enum PinotDataType {
       case STRING:
         return fieldSpec.isSingleValueField() ? STRING : STRING_ARRAY;
       case BYTES:
-        if (fieldSpec.isSingleValueField()) {
-          return BYTES;
-        } else {
-          throw new IllegalStateException("There is no multi-value type for 
BYTES");
-        }
+        return fieldSpec.isSingleValueField() ? BYTES : BYTES_ARRAY;
       default:
         throw new UnsupportedOperationException(
             "Unsupported data type: " + dataType + " in field: " + 
fieldSpec.getName());
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java
index 421e2ea..04355b8 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java
@@ -29,18 +29,18 @@ import static 
org.apache.pinot.common.utils.DataSchema.ColumnDataType.*;
 public class DataSchemaTest {
   private static final String[] COLUMN_NAMES = {
       "int", "long", "float", "double", "string", "object", "int_array", 
"long_array", "float_array", "double_array",
-      "string_array", "boolean_array", "timestamp_array"
+      "string_array", "boolean_array", "timestamp_array", "bytes_array"
   };
   private static final int NUM_COLUMNS = COLUMN_NAMES.length;
   private static final DataSchema.ColumnDataType[] COLUMN_DATA_TYPES =
       {INT, LONG, FLOAT, DOUBLE, STRING, OBJECT, INT_ARRAY, LONG_ARRAY, 
FLOAT_ARRAY, DOUBLE_ARRAY, STRING_ARRAY,
-      BOOLEAN_ARRAY, TIMESTAMP_ARRAY};
+      BOOLEAN_ARRAY, TIMESTAMP_ARRAY, BYTES_ARRAY};
   private static final DataSchema.ColumnDataType[] 
COMPATIBLE_COLUMN_DATA_TYPES =
       {LONG, FLOAT, DOUBLE, INT, STRING, OBJECT, LONG_ARRAY, FLOAT_ARRAY, 
DOUBLE_ARRAY, INT_ARRAY, STRING_ARRAY,
-       BOOLEAN_ARRAY, TIMESTAMP_ARRAY};
+       BOOLEAN_ARRAY, TIMESTAMP_ARRAY, BYTES_ARRAY};
   private static final DataSchema.ColumnDataType[] UPGRADED_COLUMN_DATA_TYPES 
= {
       LONG, DOUBLE, DOUBLE, DOUBLE, STRING, OBJECT, LONG_ARRAY, DOUBLE_ARRAY, 
DOUBLE_ARRAY, DOUBLE_ARRAY, STRING_ARRAY,
-      BOOLEAN_ARRAY, TIMESTAMP_ARRAY
+      BOOLEAN_ARRAY, TIMESTAMP_ARRAY, BYTES_ARRAY
   };
 
   @Test
@@ -92,7 +92,7 @@ public class DataSchemaTest {
     Assert.assertEquals(dataSchema.toString(),
         
"[int(INT),long(LONG),float(FLOAT),double(DOUBLE),string(STRING),object(OBJECT),int_array(INT_ARRAY),"
             + 
"long_array(LONG_ARRAY),float_array(FLOAT_ARRAY),double_array(DOUBLE_ARRAY),string_array(STRING_ARRAY),"
-            + 
"boolean_array(BOOLEAN_ARRAY),timestamp_array(TIMESTAMP_ARRAY)]");
+            + 
"boolean_array(BOOLEAN_ARRAY),timestamp_array(TIMESTAMP_ARRAY),bytes_array(BYTES_ARRAY)]");
   }
 
   @Test
@@ -107,6 +107,7 @@ public class DataSchemaTest {
       Assert.assertFalse(columnDataType.isCompatible(STRING));
       Assert.assertFalse(columnDataType.isCompatible(DOUBLE_ARRAY));
       Assert.assertFalse(columnDataType.isCompatible(STRING_ARRAY));
+      Assert.assertFalse(columnDataType.isCompatible(BYTES_ARRAY));
     }
 
     for (DataSchema.ColumnDataType columnDataType : new 
DataSchema.ColumnDataType[]{FLOAT, DOUBLE}) {
@@ -119,6 +120,7 @@ public class DataSchemaTest {
       Assert.assertFalse(columnDataType.isCompatible(STRING));
       Assert.assertFalse(columnDataType.isCompatible(LONG_ARRAY));
       Assert.assertFalse(columnDataType.isCompatible(STRING_ARRAY));
+      Assert.assertFalse(columnDataType.isCompatible(BYTES_ARRAY));
     }
 
     Assert.assertFalse(STRING.isNumber());
@@ -130,6 +132,7 @@ public class DataSchemaTest {
     Assert.assertTrue(STRING.isCompatible(STRING));
     Assert.assertFalse(STRING.isCompatible(DOUBLE_ARRAY));
     Assert.assertFalse(STRING.isCompatible(STRING_ARRAY));
+    Assert.assertFalse(STRING.isCompatible(BYTES_ARRAY));
 
     Assert.assertFalse(OBJECT.isNumber());
     Assert.assertFalse(OBJECT.isWholeNumber());
@@ -140,6 +143,7 @@ public class DataSchemaTest {
     Assert.assertFalse(OBJECT.isCompatible(STRING));
     Assert.assertFalse(OBJECT.isCompatible(DOUBLE_ARRAY));
     Assert.assertFalse(OBJECT.isCompatible(STRING_ARRAY));
+    Assert.assertFalse(OBJECT.isCompatible(BYTES_ARRAY));
     Assert.assertTrue(OBJECT.isCompatible(OBJECT));
 
     for (DataSchema.ColumnDataType columnDataType : new 
DataSchema.ColumnDataType[]{INT_ARRAY, LONG_ARRAY}) {
@@ -152,6 +156,7 @@ public class DataSchemaTest {
       Assert.assertFalse(columnDataType.isCompatible(STRING));
       Assert.assertTrue(columnDataType.isCompatible(DOUBLE_ARRAY));
       Assert.assertFalse(columnDataType.isCompatible(STRING_ARRAY));
+      Assert.assertFalse(columnDataType.isCompatible(BYTES_ARRAY));
     }
 
     for (DataSchema.ColumnDataType columnDataType : new 
DataSchema.ColumnDataType[]{FLOAT_ARRAY, DOUBLE_ARRAY}) {
@@ -164,10 +169,11 @@ public class DataSchemaTest {
       Assert.assertFalse(columnDataType.isCompatible(STRING));
       Assert.assertTrue(columnDataType.isCompatible(LONG_ARRAY));
       Assert.assertFalse(columnDataType.isCompatible(STRING_ARRAY));
+      Assert.assertFalse(columnDataType.isCompatible(BYTES_ARRAY));
     }
 
     for (DataSchema.ColumnDataType columnDataType : new 
DataSchema.ColumnDataType[]{STRING_ARRAY, BOOLEAN_ARRAY,
-        TIMESTAMP_ARRAY}) {
+        TIMESTAMP_ARRAY, BYTES_ARRAY}) {
       Assert.assertFalse(columnDataType.isNumber());
       Assert.assertFalse(columnDataType.isWholeNumber());
       Assert.assertTrue(columnDataType.isArray());
@@ -192,5 +198,6 @@ public class DataSchemaTest {
     Assert.assertEquals(fromDataType(FieldSpec.DataType.STRING, false), 
STRING_ARRAY);
     Assert.assertEquals(fromDataType(FieldSpec.DataType.BOOLEAN, false), 
BOOLEAN_ARRAY);
     Assert.assertEquals(fromDataType(FieldSpec.DataType.TIMESTAMP, false), 
TIMESTAMP_ARRAY);
+    Assert.assertEquals(fromDataType(FieldSpec.DataType.BYTES, false), 
BYTES_ARRAY);
   }
 }
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/PinotDataTypeTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/PinotDataTypeTest.java
index 649bb9e..1ddb766 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/PinotDataTypeTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/PinotDataTypeTest.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.pinot.common.utils.PinotDataType.*;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
@@ -130,7 +131,9 @@ public class PinotDataTypeTest {
         {LONG_ARRAY, TIMESTAMP_ARRAY, new long[] {1000000L, 2000000L},
             new Timestamp[] { new Timestamp(1000000L), new Timestamp(2000000L) 
}},
         {TIMESTAMP_ARRAY, TIMESTAMP_ARRAY, new Timestamp[] { new 
Timestamp(1000000L), new Timestamp(2000000L) },
-        new Timestamp[] { new Timestamp(1000000L), new Timestamp(2000000L) }}
+        new Timestamp[] { new Timestamp(1000000L), new Timestamp(2000000L) }},
+        {BYTES_ARRAY, BYTES_ARRAY, new byte[][] { "foo".getBytes(UTF_8), 
"bar".getBytes(UTF_8) },
+            new byte[][] { "foo".getBytes(UTF_8), "bar".getBytes(UTF_8) }}
     };
   }
 
@@ -257,6 +260,7 @@ public class PinotDataTypeTest {
     testCases.put(String.class, STRING_ARRAY);
     testCases.put(Boolean.class, BOOLEAN_ARRAY);
     testCases.put(Timestamp.class, TIMESTAMP_ARRAY);
+    testCases.put(byte[].class, BYTES_ARRAY);
 
     for (Map.Entry<Class<?>, PinotDataType> tc : testCases.entrySet()) {
       assertEquals(getMultiValueType(tc.getKey()), tc.getValue());
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java 
b/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java
index f1c7fbb..a3b2e24 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java
@@ -207,7 +207,7 @@ public class RawIndexConverter {
     int numDocs = _originalSegmentMetadata.getTotalDocs();
     int lengthOfLongestEntry = 
_originalSegmentMetadata.getColumnMetadataFor(columnName).getColumnMaxLength();
     try (ForwardIndexCreator rawIndexCreator = SegmentColumnarIndexCreator
-        .getRawIndexCreatorForColumn(_convertedIndexDir, 
ChunkCompressionType.SNAPPY, columnName, storedType, numDocs,
+        .getRawIndexCreatorForSVColumn(_convertedIndexDir, 
ChunkCompressionType.SNAPPY, columnName, storedType, numDocs,
             lengthOfLongestEntry, false, 
BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
         ForwardIndexReaderContext readerContext = reader.createContext()) {
       switch (storedType) {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4Compressor.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4Compressor.java
index e0198b1..bc9de7a 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4Compressor.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4Compressor.java
@@ -48,4 +48,9 @@ public class LZ4Compressor implements ChunkCompressor {
     outCompressed.flip();
     return outCompressed.limit();
   }
+
+  @Override
+  public int maxCompressedSize(int uncompressedSize) {
+    return _lz4Factory.fastCompressor().maxCompressedLength(uncompressedSize);
+  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/PassThroughCompressor.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/PassThroughCompressor.java
index 30c69c8..b7d876b 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/PassThroughCompressor.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/PassThroughCompressor.java
@@ -39,4 +39,9 @@ public class PassThroughCompressor implements ChunkCompressor 
{
     outCompressed.flip();
     return outCompressed.limit();
   }
+
+  @Override
+  public int maxCompressedSize(int uncompressedSize) {
+    return uncompressedSize;
+  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/SnappyCompressor.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/SnappyCompressor.java
index e183db7..0b87afe 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/SnappyCompressor.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/SnappyCompressor.java
@@ -34,4 +34,9 @@ public class SnappyCompressor implements ChunkCompressor {
       throws IOException {
     return Snappy.compress(inDecompressed, outCompressed);
   }
+
+  @Override
+  public int maxCompressedSize(int uncompressedSize) {
+    return Snappy.maxCompressedLength(uncompressedSize);
+  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ZstandardCompressor.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ZstandardCompressor.java
index 33c607c..931f969 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ZstandardCompressor.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ZstandardCompressor.java
@@ -40,4 +40,9 @@ public class ZstandardCompressor implements ChunkCompressor {
     outCompressed.flip();
     return compressedSize;
   }
+
+  @Override
+  public int maxCompressedSize(int uncompressedSize) {
+    return (int) Zstd.compressBound(uncompressedSize);
+  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkSVForwardIndexWriter.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkSVForwardIndexWriter.java
index 5be72b0..1e92e1f 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkSVForwardIndexWriter.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkSVForwardIndexWriter.java
@@ -21,14 +21,16 @@ package org.apache.pinot.segment.local.io.writer.impl;
 import com.google.common.base.Preconditions;
 import java.io.Closeable;
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import java.nio.channels.FileChannel;
 import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 import org.apache.pinot.segment.spi.compression.ChunkCompressor;
+import org.apache.pinot.segment.spi.memory.CleanerUtil;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,10 +47,10 @@ public abstract class BaseChunkSVForwardIndexWriter 
implements Closeable {
   private static final int FILE_HEADER_ENTRY_CHUNK_OFFSET_SIZE_V1V2 = 
Integer.BYTES;
   private static final int FILE_HEADER_ENTRY_CHUNK_OFFSET_SIZE_V3 = Long.BYTES;
 
-  protected final FileChannel _dataFile;
-  protected ByteBuffer _header;
+  private final File _file;
+  private final FileChannel _dataChannel;
+  private final ByteBuffer _header;
   protected final ByteBuffer _chunkBuffer;
-  protected final ByteBuffer _compressedBuffer;
   protected final ChunkCompressor _chunkCompressor;
 
   protected int _chunkSize;
@@ -66,19 +68,21 @@ public abstract class BaseChunkSVForwardIndexWriter 
implements Closeable {
    * @param chunkSize Size of chunk
    * @param sizeOfEntry Size of entry (in bytes), max size for variable byte 
implementation.
    * @param version version of File
-   * @throws FileNotFoundException
+   * @throws IOException if the file isn't found or can't be mapped
    */
   protected BaseChunkSVForwardIndexWriter(File file, ChunkCompressionType 
compressionType, int totalDocs,
       int numDocsPerChunk, int chunkSize, int sizeOfEntry, int version)
-      throws FileNotFoundException {
+      throws IOException {
     Preconditions.checkArgument(version == DEFAULT_VERSION || version == 
CURRENT_VERSION);
+    _file = file;
+    _headerEntryChunkOffsetSize = getHeaderEntryChunkOffsetSize(version);
+    _dataOffset = headerSize(totalDocs, numDocsPerChunk, 
_headerEntryChunkOffsetSize);
     _chunkSize = chunkSize;
     _chunkCompressor = ChunkCompressorFactory.getCompressor(compressionType);
-    _headerEntryChunkOffsetSize = getHeaderEntryChunkOffsetSize(version);
-    _dataOffset = writeHeader(compressionType, totalDocs, numDocsPerChunk, 
sizeOfEntry, version);
     _chunkBuffer = ByteBuffer.allocateDirect(chunkSize);
-    _compressedBuffer = ByteBuffer.allocateDirect(chunkSize * 2);
-    _dataFile = new RandomAccessFile(file, "rw").getChannel();
+    _dataChannel = new RandomAccessFile(file, "rw").getChannel();
+    _header = _dataChannel.map(FileChannel.MapMode.READ_WRITE, 0, _dataOffset);
+    writeHeader(compressionType, totalDocs, numDocsPerChunk, sizeOfEntry, 
version);
   }
 
   public static int getHeaderEntryChunkOffsetSize(int version) {
@@ -102,10 +106,13 @@ public abstract class BaseChunkSVForwardIndexWriter 
implements Closeable {
       writeChunk();
     }
 
-    // Write the header and close the file.
-    _header.flip();
-    _dataFile.write(_header, 0);
-    _dataFile.close();
+    if (CleanerUtil.UNMAP_SUPPORTED) {
+      CleanerUtil.getCleaner().freeBuffer(_header);
+    }
+
+    // we will have overmapped by (maxCompressedSize - actualCompressedSize) 
for the most recent chunk
+    _dataChannel.truncate(_dataOffset);
+    _dataChannel.close();
   }
 
   /**
@@ -116,14 +123,10 @@ public abstract class BaseChunkSVForwardIndexWriter 
implements Closeable {
    * @param numDocsPerChunk Number of documents per chunk
    * @param sizeOfEntry Size of each entry
    * @param version Version of file
-   * @return Size of header
    */
-  private int writeHeader(ChunkCompressionType compressionType, int totalDocs, 
int numDocsPerChunk, int sizeOfEntry,
+  private void writeHeader(ChunkCompressionType compressionType, int 
totalDocs, int numDocsPerChunk, int sizeOfEntry,
       int version) {
     int numChunks = (totalDocs + numDocsPerChunk - 1) / numDocsPerChunk;
-    int headerSize = (7 * Integer.BYTES) + (numChunks * 
_headerEntryChunkOffsetSize);
-
-    _header = ByteBuffer.allocateDirect(headerSize);
 
     int offset = 0;
     _header.putInt(version);
@@ -151,8 +154,11 @@ public abstract class BaseChunkSVForwardIndexWriter 
implements Closeable {
       int dataHeaderStart = offset + Integer.BYTES;
       _header.putInt(dataHeaderStart);
     }
+  }
 
-    return headerSize;
+  private static int headerSize(int totalDocs, int numDocsPerChunk, int 
headerEntryChunkOffsetSize) {
+    int numChunks = (totalDocs + numDocsPerChunk - 1) / numDocsPerChunk;
+    return (7 * Integer.BYTES) + (numChunks * headerEntryChunkOffsetSize);
   }
 
   /**
@@ -166,13 +172,15 @@ public abstract class BaseChunkSVForwardIndexWriter 
implements Closeable {
    *
    */
   protected void writeChunk() {
-    int sizeToWrite;
+    int sizeWritten;
     _chunkBuffer.flip();
 
-    try {
-      sizeToWrite = _chunkCompressor.compress(_chunkBuffer, _compressedBuffer);
-      _dataFile.write(_compressedBuffer, _dataOffset);
-      _compressedBuffer.clear();
+    int maxCompressedSize = 
_chunkCompressor.maxCompressedSize(_chunkBuffer.limit());
+    // compress directly in to the mapped output rather keep a large buffer to 
compress into
+    try (PinotDataBuffer compressedBuffer = PinotDataBuffer.mapFile(_file, 
false, _dataOffset,
+        maxCompressedSize, ByteOrder.BIG_ENDIAN, "forward index chunk")) {
+      ByteBuffer view = compressedBuffer.toDirectByteBuffer(0, 
maxCompressedSize);
+      sizeWritten = _chunkCompressor.compress(_chunkBuffer, view);
     } catch (IOException e) {
       LOGGER.error("Exception caught while compressing/writing data chunk", e);
       throw new RuntimeException(e);
@@ -184,7 +192,7 @@ public abstract class BaseChunkSVForwardIndexWriter 
implements Closeable {
       _header.putLong(_dataOffset);
     }
 
-    _dataOffset += sizeToWrite;
+    _dataOffset += sizeWritten;
 
     _chunkBuffer.clear();
   }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteChunkSVForwardIndexWriter.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteChunkSVForwardIndexWriter.java
index 359c48e..8d9ad7e 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteChunkSVForwardIndexWriter.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteChunkSVForwardIndexWriter.java
@@ -20,6 +20,7 @@ package org.apache.pinot.segment.local.io.writer.impl;
 
 import java.io.File;
 import java.io.FileNotFoundException;
+import java.io.IOException;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 
@@ -65,10 +66,11 @@ public class FixedByteChunkSVForwardIndexWriter extends 
BaseChunkSVForwardIndexW
    * @param sizeOfEntry Size of entry (in bytes)
    * @param writerVersion writer format version
    * @throws FileNotFoundException Throws {@link FileNotFoundException} if the 
specified file is not found.
+   * @throws IOException Throws {@link IOException} if there are any errors 
mapping the underlying ByteBuffer.
    */
   public FixedByteChunkSVForwardIndexWriter(File file, ChunkCompressionType 
compressionType, int totalDocs,
       int numDocsPerChunk, int sizeOfEntry, int writerVersion)
-      throws FileNotFoundException {
+      throws IOException {
     super(file, compressionType, totalDocs, numDocsPerChunk, (sizeOfEntry * 
numDocsPerChunk), sizeOfEntry,
         writerVersion);
     _chunkDataOffset = 0;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java
index c06e528..fed1200 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java
@@ -54,6 +54,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
  */
 @NotThreadSafe
 public class VarByteChunkSVForwardIndexWriter extends 
BaseChunkSVForwardIndexWriter {
+
   public static final int CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE = Integer.BYTES;
 
   private final int _chunkHeaderSize;
@@ -69,11 +70,13 @@ public class VarByteChunkSVForwardIndexWriter extends 
BaseChunkSVForwardIndexWri
    * @param numDocsPerChunk Number of documents per chunk.
    * @param lengthOfLongestEntry Length of longest entry (in bytes)
    * @param writerVersion writer format version
-   * @throws FileNotFoundException Throws {@link FileNotFoundException} if the 
specified file is not found.
+   * @throws FileNotFoundException Throws {@link FileNotFoundException} if the 
specified file is
+   *     not found.
    */
-  public VarByteChunkSVForwardIndexWriter(File file, ChunkCompressionType 
compressionType, int totalDocs,
+  public VarByteChunkSVForwardIndexWriter(File file, ChunkCompressionType 
compressionType,
+      int totalDocs,
       int numDocsPerChunk, int lengthOfLongestEntry, int writerVersion)
-      throws FileNotFoundException {
+      throws IOException {
     super(file, compressionType, totalDocs, numDocsPerChunk,
         numDocsPerChunk * (CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE + 
lengthOfLongestEntry),
         // chunkSize
@@ -96,25 +99,66 @@ public class VarByteChunkSVForwardIndexWriter extends 
BaseChunkSVForwardIndexWri
     _chunkBuffer.put(value);
     _chunkDataOffSet += value.length;
 
-    // If buffer filled, then compress and write to file.
-    if (_chunkHeaderOffset == _chunkHeaderSize) {
-      writeChunk();
+    writeChunkIfNecessary();
+  }
+
+  // Note: some duplication is tolerated between these overloads for the sake 
of memory efficiency
+
+  public void putStrings(String[] values) {
+    // the entire String[] will be encoded as a single string, write the 
header here
+    _chunkBuffer.putInt(_chunkHeaderOffset, _chunkDataOffSet);
+    _chunkHeaderOffset += CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
+    // write all the strings into the data buffer as if it's a single string,
+    // but with its own embedded header so offsets to strings within the body
+    // can be located
+    int headerPosition = _chunkDataOffSet;
+    int headerSize = Integer.BYTES + Integer.BYTES * values.length;
+    int bodyPosition = headerPosition + headerSize;
+    _chunkBuffer.position(bodyPosition);
+    int bodySize = 0;
+    for (int i = 0, h = headerPosition + Integer.BYTES; i < values.length; 
i++, h += Integer.BYTES) {
+      byte[] utf8 = values[i].getBytes(UTF_8);
+      _chunkBuffer.putInt(h, utf8.length);
+      _chunkBuffer.put(utf8);
+      bodySize += utf8.length;
     }
+    _chunkDataOffSet += headerSize + bodySize;
+    // go back to write the number of strings embedded in the big string
+    _chunkBuffer.putInt(headerPosition, values.length);
+
+    writeChunkIfNecessary();
   }
 
-  @Override
-  public void close()
-      throws IOException {
+  public void putByteArrays(byte[][] values) {
+    // the entire byte[][] will be encoded as a single string, write the 
header here
+    _chunkBuffer.putInt(_chunkHeaderOffset, _chunkDataOffSet);
+    _chunkHeaderOffset += CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
+    // write all the byte[]s into the data buffer as if it's a single byte[],
+    // but with its own embedded header so offsets to byte[]s within the body
+    // can be located
+    int headerPosition = _chunkDataOffSet;
+    int headerSize = Integer.BYTES + Integer.BYTES * values.length;
+    int bodyPosition = headerPosition + headerSize;
+    _chunkBuffer.position(bodyPosition);
+    int bodySize = 0;
+    for (int i = 0, h = headerPosition + Integer.BYTES; i < values.length; 
i++, h += Integer.BYTES) {
+      byte[] utf8 = values[i];
+      _chunkBuffer.putInt(h, utf8.length);
+      _chunkBuffer.put(utf8);
+      bodySize += utf8.length;
+    }
+    _chunkDataOffSet += headerSize + bodySize;
+    // go back to write the number of byte[]s embedded in the big byte[]
+    _chunkBuffer.putInt(headerPosition, values.length);
+
+    writeChunkIfNecessary();
+  }
 
-    // Write the chunk if it is non-empty.
-    if (_chunkBuffer.position() > 0) {
+  private void writeChunkIfNecessary() {
+    // If buffer filled, then compress and write to file.
+    if (_chunkHeaderOffset == _chunkHeaderSize) {
       writeChunk();
     }
-
-    // Write the header and close the file.
-    _header.flip();
-    _dataFile.write(_header, 0);
-    _dataFile.close();
   }
 
   /**
@@ -125,7 +169,6 @@ public class VarByteChunkSVForwardIndexWriter extends 
BaseChunkSVForwardIndexWri
    *   <li> Updates the header with the current chunks offset. </li>
    *   <li> Clears up the buffers, so that they can be reused. </li>
    * </ul>
-   *
    */
   protected void writeChunk() {
     // For partially filled chunks, we still need to clear the offsets for 
remaining rows, as we reuse this buffer.
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
index 2829e8e..dcc0ea2 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -34,7 +35,9 @@ import 
org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.pinot.common.utils.FileUtils;
 import org.apache.pinot.segment.local.io.util.PinotDataBitSet;
 import 
org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
+import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueFixedByteRawIndexCreator;
 import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueUnsortedForwardIndexCreator;
+import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator;
 import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueFixedByteRawIndexCreator;
 import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueSortedForwardIndexCreator;
 import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueUnsortedForwardIndexCreator;
@@ -228,10 +231,6 @@ public class SegmentColumnarIndexCreator implements 
SegmentCreator {
         }
       } else {
         // Create raw index
-
-        // TODO: add support to multi-value column and inverted index
-        Preconditions.checkState(fieldSpec.isSingleValueField(), "Cannot 
create raw index for multi-value column: %s",
-            columnName);
         Preconditions.checkState(!invertedIndexColumns.contains(columnName),
             "Cannot create inverted index for raw index column: %s", 
columnName);
 
@@ -241,9 +240,16 @@ public class SegmentColumnarIndexCreator implements 
SegmentCreator {
         boolean deriveNumDocsPerChunk =
             shouldDeriveNumDocsPerChunk(columnName, 
segmentCreationSpec.getColumnProperties());
         int writerVersion = rawIndexWriterVersion(columnName, 
segmentCreationSpec.getColumnProperties());
-        _forwardIndexCreatorMap.put(columnName,
-            getRawIndexCreatorForColumn(_indexDir, compressionType, 
columnName, storedType, _totalDocs,
-                indexCreationInfo.getLengthOfLongestEntry(), 
deriveNumDocsPerChunk, writerVersion));
+        if (fieldSpec.isSingleValueField()) {
+          _forwardIndexCreatorMap.put(columnName,
+              getRawIndexCreatorForSVColumn(_indexDir, compressionType, 
columnName, storedType, _totalDocs,
+                  indexCreationInfo.getLengthOfLongestEntry(), 
deriveNumDocsPerChunk, writerVersion));
+        } else {
+          _forwardIndexCreatorMap.put(columnName,
+              getRawIndexCreatorForMVColumn(_indexDir, compressionType, 
columnName, storedType, _totalDocs,
+                  indexCreationInfo.getMaxNumberOfMultiValueElements(), 
deriveNumDocsPerChunk, writerVersion,
+                  indexCreationInfo.getMaxRowLengthInBytes()));
+        }
       }
 
       if (textIndexColumns.contains(columnName)) {
@@ -366,10 +372,6 @@ public class SegmentColumnarIndexCreator implements 
SegmentCreator {
     String column = spec.getName();
     if (config.getRawIndexCreationColumns().contains(column) || 
config.getRawIndexCompressionType()
         .containsKey(column)) {
-      if (!spec.isSingleValueField()) {
-        throw new RuntimeException(
-            "Creation of indices without dictionaries is supported for single 
valued columns only.");
-      }
       return false;
     }
     return info.isCreateDictionary();
@@ -387,16 +389,19 @@ public class SegmentColumnarIndexCreator implements 
SegmentCreator {
         throw new RuntimeException("Null value for column:" + columnName);
       }
 
-      boolean isSingleValue = 
_schema.getFieldSpecFor(columnName).isSingleValueField();
+      FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName);
+
+      //get dictionaryCreator, will be null if column is not dictionaryEncoded
       SegmentDictionaryCreator dictionaryCreator = 
_dictionaryCreatorMap.get(columnName);
 
-      if (isSingleValue) {
-        // SV column
-        // text-index enabled SV column
-        TextIndexCreator textIndexCreator = 
_textIndexCreatorMap.get(columnName);
-        if (textIndexCreator != null) {
-          textIndexCreator.add((String) columnValueToIndex);
-        }
+      // text-index
+      TextIndexCreator textIndexCreator = _textIndexCreatorMap.get(columnName);
+      if (textIndexCreator != null) {
+        textIndexCreator.add((String) columnValueToIndex);
+      }
+
+      if (fieldSpec.isSingleValueField()) {
+        // Single Value column
         JsonIndexCreator jsonIndexCreator = 
_jsonIndexCreatorMap.get(columnName);
         if (jsonIndexCreator != null) {
           jsonIndexCreator.add((String) columnValueToIndex);
@@ -452,12 +457,107 @@ public class SegmentColumnarIndexCreator implements 
SegmentCreator {
           }
         }
       } else {
-        // MV column (always dictionary encoded)
-        int[] dictIds = dictionaryCreator.indexOfMV(columnValueToIndex);
-        forwardIndexCreator.putDictIdMV(dictIds);
-        DictionaryBasedInvertedIndexCreator invertedIndexCreator = 
_invertedIndexCreatorMap.get(columnName);
-        if (invertedIndexCreator != null) {
-          invertedIndexCreator.add(dictIds, dictIds.length);
+        if (dictionaryCreator != null) {
+          //dictionary encoded
+          int[] dictIds = dictionaryCreator.indexOfMV(columnValueToIndex);
+          forwardIndexCreator.putDictIdMV(dictIds);
+          DictionaryBasedInvertedIndexCreator invertedIndexCreator = 
_invertedIndexCreatorMap
+              .get(columnName);
+          if (invertedIndexCreator != null) {
+            invertedIndexCreator.add(dictIds, dictIds.length);
+          }
+        } else {
+          // for text index on raw columns, check the config to determine if 
actual raw value should
+          // be stored or not
+          if (textIndexCreator != null && 
!shouldStoreRawValueForTextIndex(columnName)) {
+            Object value = _columnProperties.get(columnName)
+                .get(FieldConfig.TEXT_INDEX_RAW_VALUE);
+            if (value == null) {
+              value = FieldConfig.TEXT_INDEX_DEFAULT_RAW_VALUE;
+            }
+            if (forwardIndexCreator.getValueType().getStoredType() == 
DataType.STRING) {
+              value = String.valueOf(value);
+              int length = ((String[]) columnValueToIndex).length;
+              columnValueToIndex = new String[length];
+              Arrays.fill((String[]) columnValueToIndex, value);
+            } else if (forwardIndexCreator.getValueType().getStoredType() == 
DataType.BYTES) {
+              int length = ((byte[][]) columnValueToIndex).length;
+              columnValueToIndex = new byte[length][];
+              Arrays.fill((byte[][]) columnValueToIndex, 
String.valueOf(value).getBytes());
+            } else {
+              throw new RuntimeException("Text Index is only supported for 
STRING and BYTES stored type");
+            }
+          }
+          switch (forwardIndexCreator.getValueType()) {
+            case INT:
+              if (columnValueToIndex instanceof int[]) {
+                forwardIndexCreator.putIntMV((int[]) columnValueToIndex);
+              } else if (columnValueToIndex instanceof Object[]) {
+                int[] array = new int[((Object[]) columnValueToIndex).length];
+                for (int i = 0; i < array.length; i++) {
+                  array[i] = (Integer) ((Object[]) columnValueToIndex)[i];
+                }
+                forwardIndexCreator.putIntMV(array);
+              }
+              break;
+            case LONG:
+              if (columnValueToIndex instanceof long[]) {
+                forwardIndexCreator.putLongMV((long[]) columnValueToIndex);
+              } else if (columnValueToIndex instanceof Object[]) {
+                long[] array = new long[((Object[]) 
columnValueToIndex).length];
+                for (int i = 0; i < array.length; i++) {
+                  array[i] = (Long) ((Object[]) columnValueToIndex)[i];
+                }
+                forwardIndexCreator.putLongMV(array);
+              }
+              break;
+            case FLOAT:
+              if (columnValueToIndex instanceof float[]) {
+                forwardIndexCreator.putFloatMV((float[]) columnValueToIndex);
+              } else if (columnValueToIndex instanceof Object[]) {
+                float[] array = new float[((Object[]) 
columnValueToIndex).length];
+                for (int i = 0; i < array.length; i++) {
+                  array[i] = (Float) ((Object[]) columnValueToIndex)[i];
+                }
+                forwardIndexCreator.putFloatMV(array);
+              }
+              break;
+            case DOUBLE:
+              if (columnValueToIndex instanceof double[]) {
+                forwardIndexCreator.putDoubleMV((double[]) columnValueToIndex);
+              } else if (columnValueToIndex instanceof Object[]) {
+                double[] array = new double[((Object[]) 
columnValueToIndex).length];
+                for (int i = 0; i < array.length; i++) {
+                  array[i] = (Double) ((Object[]) columnValueToIndex)[i];
+                }
+                forwardIndexCreator.putDoubleMV(array);
+              }
+              break;
+            case STRING:
+              if (columnValueToIndex instanceof String[]) {
+                forwardIndexCreator.putStringMV((String[]) columnValueToIndex);
+              } else if (columnValueToIndex instanceof Object[]) {
+                String[] array = new String[((Object[]) 
columnValueToIndex).length];
+                for (int i = 0; i < array.length; i++) {
+                  array[i] = (String) ((Object[]) columnValueToIndex)[i];
+                }
+                forwardIndexCreator.putStringMV(array);
+              }
+              break;
+            case BYTES:
+              if (columnValueToIndex instanceof byte[][]) {
+                forwardIndexCreator.putBytesMV((byte[][]) columnValueToIndex);
+              } else if (columnValueToIndex instanceof Object[]) {
+                byte[][] array = new byte[((Object[]) 
columnValueToIndex).length][];
+                for (int i = 0; i < array.length; i++) {
+                  array[i] = (byte[]) ((Object[]) columnValueToIndex)[i];
+                }
+                forwardIndexCreator.putBytesMV(array);
+              }
+              break;
+            default:
+              throw new IllegalStateException();
+          }
         }
       }
 
@@ -734,10 +834,11 @@ public class SegmentColumnarIndexCreator implements 
SegmentCreator {
    * @param deriveNumDocsPerChunk true if varbyte writer should auto-derive 
the number of rows per chunk
    * @param writerVersion version to use for the raw index writer
    * @return raw index creator
-   * @throws IOException
    */
-  public static ForwardIndexCreator getRawIndexCreatorForColumn(File file, 
ChunkCompressionType compressionType,
-      String column, DataType dataType, int totalDocs, int 
lengthOfLongestEntry, boolean deriveNumDocsPerChunk,
+  public static ForwardIndexCreator getRawIndexCreatorForSVColumn(File file,
+      ChunkCompressionType compressionType,
+      String column, DataType dataType, int totalDocs, int 
lengthOfLongestEntry,
+      boolean deriveNumDocsPerChunk,
       int writerVersion)
       throws IOException {
     switch (dataType.getStoredType()) {
@@ -756,6 +857,41 @@ public class SegmentColumnarIndexCreator implements 
SegmentCreator {
     }
   }
 
+  /**
+   * Helper method to build the raw index creator for the column.
+   * Assumes that column to be indexed is single valued.
+   *
+   * @param file Output index file
+   * @param column Column name
+   * @param totalDocs Total number of documents to index
+   * @param deriveNumDocsPerChunk true if varbyte writer should auto-derive 
the number of rows
+   *     per chunk
+   * @param writerVersion version to use for the raw index writer
+   * @param maxRowLengthInBytes the length of the longest row in bytes
+   * @return raw index creator
+   */
+  public static ForwardIndexCreator getRawIndexCreatorForMVColumn(File file, 
ChunkCompressionType compressionType,
+      String column, DataType dataType, final int totalDocs,
+      final int maxNumberOfMultiValueElements, boolean deriveNumDocsPerChunk, 
int writerVersion,
+      int maxRowLengthInBytes)
+      throws IOException {
+    switch (dataType.getStoredType()) {
+      case INT:
+      case LONG:
+      case FLOAT:
+      case DOUBLE:
+        return new MultiValueFixedByteRawIndexCreator(file, compressionType, 
column, totalDocs, dataType,
+            dataType.getStoredType().size(), maxNumberOfMultiValueElements, 
deriveNumDocsPerChunk, writerVersion);
+      case STRING:
+      case BYTES:
+        return new MultiValueVarByteRawIndexCreator(file, compressionType, 
column, totalDocs, dataType, writerVersion,
+            maxRowLengthInBytes);
+      default:
+        throw new UnsupportedOperationException(
+            "Data type not supported for raw indexing: " + dataType);
+    }
+  }
+
   @Override
   public void close()
       throws IOException {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
new file mode 100644
index 0000000..572c793
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.fwd;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.commons.io.FileUtils;
+import 
org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
+import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
+import org.apache.pinot.segment.spi.V1Constants.Indexes;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+/**
+ * Forward index creator for raw (non-dictionary-encoded) single-value column 
of variable length
+ * data type (STRING,
+ * BYTES).
+ */
+public class MultiValueFixedByteRawIndexCreator implements ForwardIndexCreator 
{
+
+  private static final int DEFAULT_NUM_DOCS_PER_CHUNK = 1000;
+  private static final int TARGET_MAX_CHUNK_SIZE = 1024 * 1024;
+
+  private final VarByteChunkSVForwardIndexWriter _indexWriter;
+  private final DataType _valueType;
+
+  /**
+   * Create a var-byte raw index creator for the given column
+   *
+   * @param baseIndexDir Index directory
+   * @param compressionType Type of compression to use
+   * @param column Name of column to index
+   * @param totalDocs Total number of documents to index
+   * @param valueType Type of the values
+   */
+  public MultiValueFixedByteRawIndexCreator(File baseIndexDir, 
ChunkCompressionType compressionType,
+      String column,
+      int totalDocs, DataType valueType, final int maxLengthOfEachEntry,
+      final int maxNumberOfMultiValueElements)
+      throws IOException {
+    this(baseIndexDir, compressionType, column, totalDocs, valueType, 
maxLengthOfEachEntry,
+        maxNumberOfMultiValueElements, false,
+        BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
+  }
+
+  /**
+   * Create a var-byte raw index creator for the given column
+   *
+   * @param baseIndexDir Index directory
+   * @param compressionType Type of compression to use
+   * @param column Name of column to index
+   * @param totalDocs Total number of documents to index
+   * @param valueType Type of the values
+   * @param maxLengthOfEachEntry length of longest entry (in bytes)
+   * @param deriveNumDocsPerChunk true if writer should auto-derive the number 
of rows per chunk
+   * @param writerVersion writer format version
+   */
+  public MultiValueFixedByteRawIndexCreator(File baseIndexDir, 
ChunkCompressionType compressionType,
+      String column, int totalDocs, DataType valueType, final int 
maxLengthOfEachEntry,
+      final int maxNumberOfMultiValueElements, boolean deriveNumDocsPerChunk,
+      int writerVersion)
+      throws IOException {
+    File file = new File(baseIndexDir,
+        column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
+    FileUtils.deleteQuietly(file);
+    int totalMaxLength = maxNumberOfMultiValueElements * maxLengthOfEachEntry;
+    int numDocsPerChunk =
+        deriveNumDocsPerChunk ? getNumDocsPerChunk(totalMaxLength) : 
DEFAULT_NUM_DOCS_PER_CHUNK;
+    _indexWriter = new VarByteChunkSVForwardIndexWriter(file, compressionType, 
totalDocs,
+        numDocsPerChunk, totalMaxLength, writerVersion);
+    _valueType = valueType;
+  }
+
+  @VisibleForTesting
+  public static int getNumDocsPerChunk(int lengthOfLongestEntry) {
+    int overheadPerEntry =
+        lengthOfLongestEntry + 
VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
+    return Math.max(TARGET_MAX_CHUNK_SIZE / overheadPerEntry, 1);
+  }
+
+  @Override
+  public boolean isDictionaryEncoded() {
+    return false;
+  }
+
+  @Override
+  public boolean isSingleValue() {
+    return false;
+  }
+
+  @Override
+  public DataType getValueType() {
+    return _valueType;
+  }
+
+  @Override
+  public void putIntMV(final int[] values) {
+
+    byte[] bytes = new byte[Integer.BYTES
+        + values.length * Integer.BYTES]; //numValues, bytes required to store 
the content
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    //write the length
+    byteBuffer.putInt(values.length);
+    //write the content of each element
+    for (final int value : values) {
+      byteBuffer.putInt(value);
+    }
+    _indexWriter.putBytes(bytes);
+  }
+
+  @Override
+  public void putLongMV(final long[] values) {
+
+    byte[] bytes = new byte[Integer.BYTES
+        + values.length * Long.BYTES]; //numValues, bytes required to store 
the content
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    //write the length
+    byteBuffer.putInt(values.length);
+    //write the content of each element
+    for (final long value : values) {
+      byteBuffer.putLong(value);
+    }
+    _indexWriter.putBytes(bytes);
+  }
+
+  @Override
+  public void putFloatMV(final float[] values) {
+
+    byte[] bytes = new byte[Integer.BYTES
+        + values.length * Float.BYTES]; //numValues, bytes required to store 
the content
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    //write the length
+    byteBuffer.putInt(values.length);
+    //write the content of each element
+    for (final float value : values) {
+      byteBuffer.putFloat(value);
+    }
+    _indexWriter.putBytes(bytes);
+  }
+
+  @Override
+  public void putDoubleMV(final double[] values) {
+
+    byte[] bytes = new byte[Integer.BYTES
+        + values.length * Long.BYTES]; //numValues, bytes required to store 
the content
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    //write the length
+    byteBuffer.putInt(values.length);
+    //write the content of each element
+    for (final double value : values) {
+      byteBuffer.putDouble(value);
+    }
+    _indexWriter.putBytes(bytes);
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    _indexWriter.close();
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
new file mode 100644
index 0000000..5d5b3cf
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.fwd;
+
+import java.io.File;
+import java.io.IOException;
+import 
org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
+import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
+import org.apache.pinot.segment.spi.V1Constants.Indexes;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+/**
+ * Forward index creator for raw (non-dictionary-encoded) single-value column 
of variable length
+ * data type (STRING,
+ * BYTES).
+ */
+public class MultiValueVarByteRawIndexCreator implements ForwardIndexCreator {
+
+  private static final int TARGET_MAX_CHUNK_SIZE = 1024 * 1024;
+
+  private final VarByteChunkSVForwardIndexWriter _indexWriter;
+  private final DataType _valueType;
+
+  /**
+   * Create a var-byte raw index creator for the given column
+   *
+   * @param baseIndexDir Index directory
+   * @param compressionType Type of compression to use
+   * @param column Name of column to index
+   * @param totalDocs Total number of documents to index
+   * @param valueType Type of the values
+   * @param maxRowLengthInBytes the length in bytes of the largest row
+   */
+  public MultiValueVarByteRawIndexCreator(File baseIndexDir, 
ChunkCompressionType compressionType,
+      String column,
+      int totalDocs, DataType valueType, int maxRowLengthInBytes)
+      throws IOException {
+    this(baseIndexDir, compressionType, column, totalDocs, valueType,
+        BaseChunkSVForwardIndexWriter.DEFAULT_VERSION, maxRowLengthInBytes);
+  }
+
+  /**
+   * Create a var-byte raw index creator for the given column
+   *
+   * @param baseIndexDir Index directory
+   * @param compressionType Type of compression to use
+   * @param column Name of column to index
+   * @param totalDocs Total number of documents to index
+   * @param valueType Type of the values
+   * @param maxRowLengthInBytes the size in bytes of the largest row, the 
chunk size cannot be smaller than this
+   * @param writerVersion writer format version
+   */
+  public MultiValueVarByteRawIndexCreator(File baseIndexDir, 
ChunkCompressionType compressionType,
+      String column, int totalDocs, DataType valueType, int writerVersion, int 
maxRowLengthInBytes)
+      throws IOException {
+    //we will prepend the actual content with numElements and length array 
containing length of each element
+    int totalMaxLength = Integer.BYTES + Math.max(maxRowLengthInBytes, 
TARGET_MAX_CHUNK_SIZE);
+    File file = new File(baseIndexDir,
+        column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
+    int numDocsPerChunk = getNumDocsPerChunk(totalMaxLength);
+    _indexWriter = new VarByteChunkSVForwardIndexWriter(file, compressionType, 
totalDocs,
+        numDocsPerChunk, totalMaxLength,
+        writerVersion);
+    _valueType = valueType;
+  }
+
+  private static int getNumDocsPerChunk(int lengthOfLongestEntry) {
+    int overheadPerEntry =
+        lengthOfLongestEntry + 
VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
+    return Math.max(TARGET_MAX_CHUNK_SIZE / overheadPerEntry, 1);
+  }
+
+  @Override
+  public boolean isDictionaryEncoded() {
+    return false;
+  }
+
+  @Override
+  public boolean isSingleValue() {
+    return false;
+  }
+
+  @Override
+  public DataType getValueType() {
+    return _valueType;
+  }
+
+  @Override
+  public void putStringMV(final String[] values) {
+    _indexWriter.putStrings(values);
+  }
+
+  @Override
+  public void putBytesMV(final byte[][] values) {
+    _indexWriter.putByteArrays(values);
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    _indexWriter.close();
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java
index 284bf69..6407b55 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java
@@ -47,6 +47,7 @@ public abstract class AbstractColumnStatisticsCollector 
implements ColumnStatist
 
   protected int _totalNumberOfEntries = 0;
   protected int _maxNumberOfMultiValues = 0;
+  protected int _maxLengthOfMultiValues = 0;
   private PartitionFunction _partitionFunction;
   private final int _numPartitions;
   private final Set<Integer> _partitions;
@@ -72,6 +73,10 @@ public abstract class AbstractColumnStatisticsCollector 
implements ColumnStatist
     return _maxNumberOfMultiValues;
   }
 
+  public int getMaxLengthOfMultiValues() {
+    return _maxLengthOfMultiValues;
+  }
+
   void addressSorted(Object entry) {
     if (_isSorted) {
       if (_previousValue != null) {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java
index a0cfd66..411238d 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java
@@ -33,6 +33,7 @@ public class BytesColumnPredIndexStatsCollector extends 
AbstractColumnStatistics
 
   private int _minLength = Integer.MAX_VALUE;
   private int _maxLength = 0;
+  private int _maxRowLength = 0;
   private ByteArray[] _sortedValues;
   private boolean _sealed = false;
 
@@ -42,16 +43,32 @@ public class BytesColumnPredIndexStatsCollector extends 
AbstractColumnStatistics
 
   @Override
   public void collect(Object entry) {
-    ByteArray value = new ByteArray((byte[]) entry);
-    addressSorted(value);
-    updatePartition(value);
-    _values.add(value);
-
-    int length = value.length();
-    _minLength = Math.min(_minLength, length);
-    _maxLength = Math.max(_maxLength, length);
-
-    _totalNumberOfEntries++;
+    if (entry instanceof Object[]) {
+      Object[] values = (Object[]) entry;
+      int rowLength = 0;
+      for (Object obj : values) {
+        ByteArray value = new ByteArray((byte[]) obj);
+        _values.add(value);
+        int length = value.length();
+        _minLength = Math.min(_minLength, length);
+        _maxLength = Math.max(_maxLength, length);
+        rowLength += length;
+      }
+      _maxNumberOfMultiValues = Math.max(_maxNumberOfMultiValues, 
values.length);
+      _maxRowLength = Math.max(_maxRowLength, rowLength);
+      updateTotalNumberOfEntries(values);
+    } else {
+      ByteArray value = new ByteArray((byte[]) entry);
+      addressSorted(value);
+      updatePartition(value);
+      _values.add(value);
+
+      int length = value.length();
+      _minLength = Math.min(_minLength, length);
+      _maxLength = Math.max(_maxLength, length);
+      _maxRowLength = _maxLength;
+      _totalNumberOfEntries++;
+    }
   }
 
   @Override
@@ -92,6 +109,11 @@ public class BytesColumnPredIndexStatsCollector extends 
AbstractColumnStatistics
   }
 
   @Override
+  public int getMaxRowLengthInBytes() {
+    return _maxRowLength;
+  }
+
+  @Override
   public int getCardinality() {
     if (_sealed) {
       return _sortedValues.length;
@@ -106,7 +128,7 @@ public class BytesColumnPredIndexStatsCollector extends 
AbstractColumnStatistics
 
   @Override
   public void seal() {
-    _sortedValues = _values.toArray(new ByteArray[_values.size()]);
+    _sortedValues = _values.toArray(new ByteArray[0]);
     Arrays.sort(_sortedValues);
     _sealed = true;
   }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java
index 068a11e..1350677 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java
@@ -31,6 +31,7 @@ public class StringColumnPreIndexStatsCollector extends 
AbstractColumnStatistics
 
   private int _minLength = Integer.MAX_VALUE;
   private int _maxLength = 0;
+  private int _maxRowLength = 0;
   private String[] _sortedValues;
   private boolean _sealed = false;
 
@@ -42,6 +43,7 @@ public class StringColumnPreIndexStatsCollector extends 
AbstractColumnStatistics
   public void collect(Object entry) {
     if (entry instanceof Object[]) {
       Object[] values = (Object[]) entry;
+      int rowLength = 0;
       for (Object obj : values) {
         String value = (String) obj;
         _values.add(value);
@@ -49,9 +51,11 @@ public class StringColumnPreIndexStatsCollector extends 
AbstractColumnStatistics
         int length = value.getBytes(UTF_8).length;
         _minLength = Math.min(_minLength, length);
         _maxLength = Math.max(_maxLength, length);
+        rowLength += length;
       }
 
       _maxNumberOfMultiValues = Math.max(_maxNumberOfMultiValues, 
values.length);
+      _maxRowLength = Math.max(_maxRowLength, rowLength);
       updateTotalNumberOfEntries(values);
     } else {
       String value = (String) entry;
@@ -62,6 +66,7 @@ public class StringColumnPreIndexStatsCollector extends 
AbstractColumnStatistics
       int valueLength = value.getBytes(UTF_8).length;
       _minLength = Math.min(_minLength, valueLength);
       _maxLength = Math.max(_maxLength, valueLength);
+      _maxRowLength = _maxLength;
 
       _totalNumberOfEntries++;
     }
@@ -100,6 +105,11 @@ public class StringColumnPreIndexStatsCollector extends 
AbstractColumnStatistics
   }
 
   @Override
+  public int getMaxRowLengthInBytes() {
+    return _maxRowLength;
+  }
+
+  @Override
   public int getCardinality() {
     if (_sealed) {
       return _sortedValues.length;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java
new file mode 100644
index 0000000..1844957
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.readers.forward;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import javax.annotation.Nullable;
+import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+/**
+ * Chunk-based single-value raw (non-dictionary-encoded) forward index reader 
for values of
+ * variable
+ * length data type
+ * (STRING, BYTES).
+ * <p>For data layout, please refer to the documentation for {@link 
VarByteChunkSVForwardIndexWriter}
+ */
+public final class VarByteChunkMVForwardIndexReader extends 
BaseChunkSVForwardIndexReader {
+
+  private static final int ROW_OFFSET_SIZE = 
VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
+
+  private final int _maxChunkSize;
+
+  public VarByteChunkMVForwardIndexReader(PinotDataBuffer dataBuffer, DataType 
valueType) {
+    super(dataBuffer, valueType);
+    _maxChunkSize = _numDocsPerChunk * (ROW_OFFSET_SIZE + 
_lengthOfLongestEntry);
+  }
+
+  @Nullable
+  @Override
+  public ChunkReaderContext createContext() {
+    if (_isCompressed) {
+      return new ChunkReaderContext(_maxChunkSize);
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public int getStringMV(final int docId, final String[] valueBuffer,
+      final ChunkReaderContext context) {
+    byte[] compressedBytes;
+    if (_isCompressed) {
+      compressedBytes = getBytesCompressed(docId, context);
+    } else {
+      compressedBytes = getBytesUncompressed(docId);
+    }
+    ByteBuffer byteBuffer = ByteBuffer.wrap(compressedBytes);
+    int numValues = byteBuffer.getInt();
+    int contentOffset = (numValues + 1) * Integer.BYTES;
+    for (int i = 0; i < numValues; i++) {
+      int length = byteBuffer.getInt((i + 1) * Integer.BYTES);
+      byte[] bytes = new byte[length];
+      byteBuffer.position(contentOffset);
+      byteBuffer.get(bytes, 0, length);
+      valueBuffer[i] = new String(bytes, StandardCharsets.UTF_8);
+      contentOffset += length;
+    }
+    return numValues;
+  }
+
+  @Override
+  public int getBytesMV(final int docId, final byte[][] valueBuffer,
+      final ChunkReaderContext context) {
+    byte[] compressedBytes;
+    if (_isCompressed) {
+      compressedBytes = getBytesCompressed(docId, context);
+    } else {
+      compressedBytes = getBytesUncompressed(docId);
+    }
+    ByteBuffer byteBuffer = ByteBuffer.wrap(compressedBytes);
+    int numValues = byteBuffer.getInt();
+    int contentOffset = (numValues + 1) * Integer.BYTES;
+    for (int i = 0; i < numValues; i++) {
+      int length = byteBuffer.getInt((i + 1) * Integer.BYTES);
+      byte[] bytes = new byte[length];
+      byteBuffer.position(contentOffset);
+      byteBuffer.get(bytes, 0, length);
+      valueBuffer[i] = bytes;
+      contentOffset += length;
+    }
+    return numValues;
+  }
+
+  @Override
+  public byte[] getBytes(int docId, ChunkReaderContext context) {
+    if (_isCompressed) {
+      return getBytesCompressed(docId, context);
+    } else {
+      return getBytesUncompressed(docId);
+    }
+  }
+
+  /**
+   * Helper method to read BYTES value from the compressed index.
+   */
+  private byte[] getBytesCompressed(int docId, ChunkReaderContext context) {
+    int chunkRowId = docId % _numDocsPerChunk;
+    ByteBuffer chunkBuffer = getChunkBuffer(docId, context);
+
+    // These offsets are offset in the chunk buffer
+    int valueStartOffset = chunkBuffer.getInt(chunkRowId * ROW_OFFSET_SIZE);
+    int valueEndOffset = getValueEndOffset(chunkRowId, chunkBuffer);
+
+    byte[] bytes = new byte[valueEndOffset - valueStartOffset];
+    chunkBuffer.position(valueStartOffset);
+    chunkBuffer.get(bytes);
+    return bytes;
+  }
+
+  /**
+   * Helper method to read BYTES value from the uncompressed index.
+   */
+  private byte[] getBytesUncompressed(int docId) {
+    int chunkId = docId / _numDocsPerChunk;
+    int chunkRowId = docId % _numDocsPerChunk;
+
+    // These offsets are offset in the data buffer
+    long chunkStartOffset = getChunkPosition(chunkId);
+    long valueStartOffset =
+        chunkStartOffset + _dataBuffer.getInt(chunkStartOffset + (long) 
chunkRowId * ROW_OFFSET_SIZE);
+    long valueEndOffset = getValueEndOffset(chunkId, chunkRowId, 
chunkStartOffset);
+
+    byte[] bytes = new byte[(int) (valueEndOffset - valueStartOffset)];
+    _dataBuffer.copyTo(valueStartOffset, bytes);
+    return bytes;
+  }
+
+  /**
+   * Helper method to compute the end offset of the value in the chunk buffer.
+   */
+  private int getValueEndOffset(int rowId, ByteBuffer chunkBuffer) {
+    if (rowId == _numDocsPerChunk - 1) {
+      // Last row in the chunk
+      return chunkBuffer.limit();
+    } else {
+      int valueEndOffset = chunkBuffer.getInt((rowId + 1) * ROW_OFFSET_SIZE);
+      if (valueEndOffset == 0) {
+        // Last row in the last chunk (chunk is incomplete, which stores 0 as 
the offset for the absent rows)
+        return chunkBuffer.limit();
+      } else {
+        return valueEndOffset;
+      }
+    }
+  }
+
+  /**
+   * Helper method to compute the end offset of the value in the data buffer.
+   */
+  private long getValueEndOffset(int chunkId, int chunkRowId, long 
chunkStartOffset) {
+    if (chunkId == _numChunks - 1) {
+      // Last chunk
+      if (chunkRowId == _numDocsPerChunk - 1) {
+        // Last row in the last chunk
+        return _dataBuffer.size();
+      } else {
+        int valueEndOffsetInChunk = _dataBuffer
+            .getInt(chunkStartOffset + (long) (chunkRowId + 1) * 
ROW_OFFSET_SIZE);
+        if (valueEndOffsetInChunk == 0) {
+          // Last row in the last chunk (chunk is incomplete, which stores 0 
as the offset for the absent rows)
+          return _dataBuffer.size();
+        } else {
+          return chunkStartOffset + valueEndOffsetInChunk;
+        }
+      }
+    } else {
+      if (chunkRowId == _numDocsPerChunk - 1) {
+        // Last row in the chunk
+        return getChunkPosition(chunkId + 1);
+      } else {
+        return chunkStartOffset + _dataBuffer
+            .getInt(chunkStartOffset + (long) (chunkRowId + 1) * 
ROW_OFFSET_SIZE);
+      }
+    }
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java
index aba55ca..15b02d6 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java
@@ -167,7 +167,11 @@ class FilePerIndexDirectory extends ColumnIndexDirectory {
             fileExtension = 
V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION;
           }
         } else {
-          fileExtension = 
V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION;
+          if (!columnMetadata.hasDictionary()) {
+            fileExtension = 
V1Constants.Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION;
+          } else {
+            fileExtension = 
V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION;
+          }
         }
         break;
       case INVERTED_INDEX:
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
new file mode 100644
index 0000000..a1f6e2c
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.creator;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import org.apache.commons.io.FileUtils;
+import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator;
+import 
org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader.ChunkReaderContext;
+import 
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader;
+import org.apache.pinot.segment.spi.V1Constants.Indexes;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class MultiValueVarByteRawIndexCreatorTest {
+
+  private static final String OUTPUT_DIR =
+      System.getProperty("java.io.tmpdir") + File.separator + "mvVarRawTest";
+
+  @BeforeClass
+  public void setup() throws Exception {
+    FileUtils.forceMkdir(new File(OUTPUT_DIR));
+  }
+
+  /**
+   * Clean up after test
+   */
+  @AfterClass
+  public void cleanup() {
+    FileUtils.deleteQuietly(new File(OUTPUT_DIR));
+  }
+
+  @Test
+  public void testMVString() throws IOException {
+    String column = "testCol";
+    int numDocs = 1000;
+    int maxElements = 50;
+    int maxTotalLength = 500;
+    File file = new File(OUTPUT_DIR, column + 
Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
+    file.delete();
+    MultiValueVarByteRawIndexCreator creator = new 
MultiValueVarByteRawIndexCreator(
+        new File(OUTPUT_DIR), ChunkCompressionType.SNAPPY, column, numDocs, 
DataType.STRING, maxTotalLength);
+    List<String[]> inputs = new ArrayList<>();
+    Random random = new Random();
+    for (int i = 0; i < numDocs; i++) {
+      //int length = 1;
+      int length = random.nextInt(10);
+      String[] values = new String[length];
+      for (int j = 0; j < length; j++) {
+        char[] value = new char[length];
+        Arrays.fill(value, 'a');
+        values[j] = new String(value);
+      }
+      inputs.add(values);
+      creator.putStringMV(values);
+    }
+    creator.close();
+
+    //read
+    final PinotDataBuffer buffer = PinotDataBuffer
+        .mapFile(file, true, 0, file.length(), ByteOrder.BIG_ENDIAN, "");
+    VarByteChunkMVForwardIndexReader reader = new 
VarByteChunkMVForwardIndexReader(buffer,
+        DataType.STRING);
+    final ChunkReaderContext context = reader.createContext();
+    String[] values = new String[maxElements];
+    for (int i = 0; i < numDocs; i++) {
+      int length = reader.getStringMV(i, values, context);
+      String[] readValue = Arrays.copyOf(values, length);
+      Assert.assertEquals(inputs.get(i), readValue);
+    }
+  }
+
+  @Test
+  public void testMVBytes() throws IOException {
+    String column = "testCol";
+    int numDocs = 1000;
+    int maxElements = 50;
+    int maxTotalLength = 500;
+    File file = new File(OUTPUT_DIR, column + 
Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
+    file.delete();
+    MultiValueVarByteRawIndexCreator creator = new 
MultiValueVarByteRawIndexCreator(
+        new File(OUTPUT_DIR), ChunkCompressionType.SNAPPY, column, numDocs, 
DataType.BYTES,
+        maxTotalLength);
+    List<byte[][]> inputs = new ArrayList<>();
+    Random random = new Random();
+    for (int i = 0; i < numDocs; i++) {
+      //int length = 1;
+      int length = random.nextInt(10);
+      byte[][] values = new byte[length][];
+      for (int j = 0; j < length; j++) {
+        char[] value = new char[length];
+        Arrays.fill(value, 'a');
+        values[j] = new String(value).getBytes();
+      }
+      inputs.add(values);
+      creator.putBytesMV(values);
+    }
+    creator.close();
+
+    //read
+    final PinotDataBuffer buffer = PinotDataBuffer
+        .mapFile(file, true, 0, file.length(), ByteOrder.BIG_ENDIAN, "");
+    VarByteChunkMVForwardIndexReader reader = new 
VarByteChunkMVForwardIndexReader(buffer,
+        DataType.BYTES);
+    final ChunkReaderContext context = reader.createContext();
+    byte[][] values = new byte[maxElements][];
+    for (int i = 0; i < numDocs; i++) {
+      int length = reader.getBytesMV(i, values, context);
+      byte[][] readValue = Arrays.copyOf(values, length);
+      for (int j = 0; j < length; j++) {
+        Assert.assertTrue(Arrays.equals(inputs.get(i)[j], readValue[j]));
+      }
+    }
+  }
+}
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java
index 9f515e8..d8cafdc 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java
@@ -21,6 +21,7 @@ package org.apache.pinot.segment.local.segment.index.creator;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -32,6 +33,7 @@ import 
org.apache.pinot.segment.local.loader.LocalSegmentDirectoryLoader;
 import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
+import 
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
 import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
@@ -60,6 +62,7 @@ import org.testng.annotations.Test;
  * Class for testing Raw index creators.
  */
 public class RawIndexCreatorTest {
+
   private static final int NUM_ROWS = 10009;
   private static final int MAX_STRING_LENGTH = 101;
 
@@ -71,6 +74,12 @@ public class RawIndexCreatorTest {
   private static final String FLOAT_COLUMN = "floatColumn";
   private static final String DOUBLE_COLUMN = "doubleColumn";
   private static final String STRING_COLUMN = "stringColumn";
+  private static final String INT_MV_COLUMN = "intMVColumn";
+  private static final String LONG_MV_COLUMN = "longMVColumn";
+  private static final String FLOAT_MV_COLUMN = "floatMVColumn";
+  private static final String DOUBLE_MV_COLUMN = "doubleMVColumn";
+  private static final String STRING_MV_COLUMN = "stringMVColumn";
+  private static final String BYTES_MV_COLUMN = "bytesMVColumn";
 
   Random _random;
   private RecordReader _recordReader;
@@ -79,8 +88,6 @@ public class RawIndexCreatorTest {
 
   /**
    * Setup to build a segment with raw indexes (no-dictionary) of various data 
types.
-   *
-   * @throws Exception
    */
   @BeforeClass
   public void setup()
@@ -91,8 +98,15 @@ public class RawIndexCreatorTest {
     schema.addField(new DimensionFieldSpec(FLOAT_COLUMN, DataType.FLOAT, 
true));
     schema.addField(new DimensionFieldSpec(DOUBLE_COLUMN, DataType.DOUBLE, 
true));
     schema.addField(new DimensionFieldSpec(STRING_COLUMN, DataType.STRING, 
true));
+    schema.addField(new DimensionFieldSpec(INT_MV_COLUMN, DataType.INT, 
false));
+    schema.addField(new DimensionFieldSpec(LONG_MV_COLUMN, DataType.LONG, 
false));
+    schema.addField(new DimensionFieldSpec(FLOAT_MV_COLUMN, DataType.FLOAT, 
false));
+    schema.addField(new DimensionFieldSpec(DOUBLE_MV_COLUMN, DataType.DOUBLE, 
false));
+    schema.addField(new DimensionFieldSpec(STRING_MV_COLUMN, DataType.STRING, 
false));
+    schema.addField(new DimensionFieldSpec(BYTES_MV_COLUMN, DataType.BYTES, 
false));
 
-    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("test").build();
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("test")
+        .build();
 
     _random = new Random(System.nanoTime());
     _recordReader = buildIndex(tableConfig, schema);
@@ -109,7 +123,6 @@ public class RawIndexCreatorTest {
   /**
    * Test for int raw index creator.
    * Compares values read from the raw index against expected value.
-   * @throws Exception
    */
   @Test
   public void testIntRawIndexCreator()
@@ -120,7 +133,6 @@ public class RawIndexCreatorTest {
   /**
    * Test for long raw index creator.
    * Compares values read from the raw index against expected value.
-   * @throws Exception
    */
   @Test
   public void testLongRawIndexCreator()
@@ -131,7 +143,6 @@ public class RawIndexCreatorTest {
   /**
    * Test for float raw index creator.
    * Compares values read from the raw index against expected value.
-   * @throws Exception
    */
   @Test
   public void testFloatRawIndexCreator()
@@ -142,7 +153,6 @@ public class RawIndexCreatorTest {
   /**
    * Test for double raw index creator.
    * Compares values read from the raw index against expected value.
-   * @throws Exception
    */
   @Test
   public void testDoubleRawIndexCreator()
@@ -153,19 +163,21 @@ public class RawIndexCreatorTest {
   /**
    * Test for string raw index creator.
    * Compares values read from the raw index against expected value.
-   * @throws Exception
    */
   @Test
   public void testStringRawIndexCreator()
       throws Exception {
     PinotDataBuffer indexBuffer = getIndexBufferForColumn(STRING_COLUMN);
-    try (VarByteChunkSVForwardIndexReader rawIndexReader = new 
VarByteChunkSVForwardIndexReader(indexBuffer,
+    try (VarByteChunkSVForwardIndexReader rawIndexReader = new 
VarByteChunkSVForwardIndexReader(
+        indexBuffer,
         DataType.STRING);
-        BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext = 
rawIndexReader.createContext()) {
+        BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext = 
rawIndexReader
+            .createContext()) {
       _recordReader.rewind();
       for (int row = 0; row < NUM_ROWS; row++) {
         GenericRow expectedRow = _recordReader.next();
-        Assert.assertEquals(rawIndexReader.getString(row, readerContext), 
expectedRow.getValue(STRING_COLUMN));
+        Assert.assertEquals(rawIndexReader.getString(row, readerContext),
+            expectedRow.getValue(STRING_COLUMN));
       }
     }
   }
@@ -175,17 +187,88 @@ public class RawIndexCreatorTest {
    *
    * @param column Column for which to perform the test
    * @param dataType Data type of the column
-   * @throws Exception
    */
   private void testFixedLengthRawIndexCreator(String column, DataType dataType)
       throws Exception {
     PinotDataBuffer indexBuffer = getIndexBufferForColumn(column);
-    try (FixedByteChunkSVForwardIndexReader rawIndexReader = new 
FixedByteChunkSVForwardIndexReader(indexBuffer,
-        dataType); BaseChunkSVForwardIndexReader.ChunkReaderContext 
readerContext = rawIndexReader.createContext()) {
+    try (FixedByteChunkSVForwardIndexReader rawIndexReader = new 
FixedByteChunkSVForwardIndexReader(
+        indexBuffer,
+        dataType); BaseChunkSVForwardIndexReader.ChunkReaderContext 
readerContext = rawIndexReader
+        .createContext()) {
+      _recordReader.rewind();
+      for (int row = 0; row < NUM_ROWS; row++) {
+        GenericRow expectedRow = _recordReader.next();
+        Assert.assertEquals(readValueFromIndex(rawIndexReader, readerContext, 
row),
+            expectedRow.getValue(column));
+      }
+    }
+  }
+
+  /**
+   * Test for multi value string raw index creator.
+   * Compares values read from the raw index against expected value.
+   */
+  @Test
+  public void testStringMVRawIndexCreator()
+      throws Exception {
+    PinotDataBuffer indexBuffer = getIndexBufferForColumn(STRING_MV_COLUMN);
+    try (VarByteChunkMVForwardIndexReader rawIndexReader = new 
VarByteChunkMVForwardIndexReader(
+        indexBuffer,
+        DataType.STRING);
+        BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext = 
rawIndexReader
+            .createContext()) {
       _recordReader.rewind();
+      int maxNumberOfMultiValues = _segmentDirectory.getSegmentMetadata()
+          .getColumnMetadataFor(STRING_MV_COLUMN).getMaxNumberOfMultiValues();
+      final String[] valueBuffer = new String[maxNumberOfMultiValues];
       for (int row = 0; row < NUM_ROWS; row++) {
         GenericRow expectedRow = _recordReader.next();
-        Assert.assertEquals(readValueFromIndex(rawIndexReader, readerContext, 
row), expectedRow.getValue(column));
+
+        int length = rawIndexReader.getStringMV(row, valueBuffer, 
readerContext);
+        String[] readValue = Arrays.copyOf(valueBuffer, length);
+        Object[] writtenValue = (Object[]) 
expectedRow.getValue(STRING_MV_COLUMN);
+        if (writtenValue == null || writtenValue.length == 0) {
+          writtenValue = new 
String[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_STRING};
+        }
+        for (int i = 0; i < length; i++) {
+          Object expected = writtenValue[i];
+          Object actual = readValue[i];
+          Assert.assertEquals(expected, actual);
+        }
+      }
+    }
+  }
+
+  /**
+   * Test for multi value string raw index creator.
+   * Compares values read from the raw index against expected value.
+   */
+  @Test
+  public void testBytesMVRawIndexCreator()
+      throws Exception {
+    PinotDataBuffer indexBuffer = getIndexBufferForColumn(BYTES_MV_COLUMN);
+    try (VarByteChunkMVForwardIndexReader rawIndexReader = new 
VarByteChunkMVForwardIndexReader(
+        indexBuffer, DataType.BYTES);
+        BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext = 
rawIndexReader
+            .createContext()) {
+      _recordReader.rewind();
+      int maxNumberOfMultiValues = _segmentDirectory.getSegmentMetadata()
+          .getColumnMetadataFor(BYTES_MV_COLUMN).getMaxNumberOfMultiValues();
+      final byte[][] valueBuffer = new byte[maxNumberOfMultiValues][];
+      for (int row = 0; row < NUM_ROWS; row++) {
+        GenericRow expectedRow = _recordReader.next();
+
+        int length = rawIndexReader.getBytesMV(row, valueBuffer, 
readerContext);
+        byte[][] readValue = Arrays.copyOf(valueBuffer, length);
+        Object[] writtenValue = (Object[]) 
expectedRow.getValue(BYTES_MV_COLUMN);
+        if (writtenValue == null || writtenValue.length == 0) {
+          writtenValue = new 
byte[][]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_BYTES};
+        }
+        for (int i = 0; i < length; i++) {
+          Object expected = writtenValue[i];
+          Object actual = readValue[i];
+          Assert.assertTrue(Arrays.equals((byte[]) expected, (byte[]) actual));
+        }
       }
     }
   }
@@ -205,7 +288,6 @@ public class RawIndexCreatorTest {
    * Helper method to build a segment containing a single valued string column 
with RAW (no-dictionary) index.
    *
    * @return Array of string values for the rows in the generated index.
-   * @throws Exception
    */
   private RecordReader buildIndex(TableConfig tableConfig, Schema schema)
       throws Exception {
@@ -221,9 +303,17 @@ public class RawIndexCreatorTest {
 
       for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
         Object value;
-
-        value = getRandomValue(_random, fieldSpec.getDataType());
-        map.put(fieldSpec.getName(), value);
+        if (fieldSpec.isSingleValueField()) {
+          value = getRandomValue(_random, fieldSpec.getDataType());
+          map.put(fieldSpec.getName(), value);
+        } else {
+          int length = _random.nextInt(50);
+          Object[] values = new Object[length];
+          for (int j = 0; j < length; j++) {
+            values[j] = getRandomValue(_random, fieldSpec.getDataType());
+          }
+          map.put(fieldSpec.getName(), values);
+        }
       }
 
       GenericRow genericRow = new GenericRow();
@@ -263,8 +353,13 @@ public class RawIndexCreatorTest {
       case STRING:
         return StringUtil
             
.sanitizeStringValue(RandomStringUtils.random(random.nextInt(MAX_STRING_LENGTH)),
 Integer.MAX_VALUE);
+      case BYTES:
+        return StringUtil
+            
.sanitizeStringValue(RandomStringUtils.random(random.nextInt(MAX_STRING_LENGTH)),
 Integer.MAX_VALUE)
+            .getBytes();
       default:
-        throw new UnsupportedOperationException("Unsupported data type for 
random value generator: " + dataType);
+        throw new UnsupportedOperationException(
+            "Unsupported data type for random value generator: " + dataType);
     }
   }
 
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
index 1d1a8a5..744f0bc 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
@@ -39,6 +39,7 @@ public class V1Constants {
     public static final String UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION = 
".sv.unsorted.fwd";
     public static final String SORTED_SV_FORWARD_INDEX_FILE_EXTENSION = 
".sv.sorted.fwd";
     public static final String RAW_SV_FORWARD_INDEX_FILE_EXTENSION = 
".sv.raw.fwd";
+    public static final String RAW_MV_FORWARD_INDEX_FILE_EXTENSION = 
".mv.raw.fwd";
     public static final String UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION = 
".mv.fwd";
     public static final String BITMAP_INVERTED_INDEX_FILE_EXTENSION = 
".bitmap.inv";
     public static final String BITMAP_RANGE_INDEX_FILE_EXTENSION = 
".bitmap.range";
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressor.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressor.java
index 9415de8..d636fb1 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressor.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressor.java
@@ -38,4 +38,6 @@ public interface ChunkCompressor {
    */
   int compress(ByteBuffer inUncompressed, ByteBuffer outCompressed)
       throws IOException;
+
+  int maxCompressedSize(int uncompressedSize);
 }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ColumnIndexCreationInfo.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ColumnIndexCreationInfo.java
index da30237..9707107 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ColumnIndexCreationInfo.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ColumnIndexCreationInfo.java
@@ -85,6 +85,10 @@ public class ColumnIndexCreationInfo implements Serializable 
{
     return _columnStatistics.getMaxNumberOfMultiValues();
   }
 
+  public int getMaxRowLengthInBytes() {
+    return _columnStatistics.getMaxRowLengthInBytes();
+  }
+
   public boolean isAutoGenerated() {
     return _isAutoGenerated;
   }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ColumnStatistics.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ColumnStatistics.java
index 825c331..70bd584 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ColumnStatistics.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ColumnStatistics.java
@@ -83,6 +83,13 @@ public interface ColumnStatistics extends Serializable {
   int getMaxNumberOfMultiValues();
 
   /**
+   * @return the length of the largest row in bytes for variable length types
+   */
+  default int getMaxRowLengthInBytes() {
+    return -1;
+  }
+
+  /**
    * @return Returns if any of the values have nulls in the segments.
    */
   boolean hasNull();
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/ForwardIndexCreator.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/ForwardIndexCreator.java
index dee4db1..e5a21e9 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/ForwardIndexCreator.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/ForwardIndexCreator.java
@@ -173,4 +173,13 @@ public interface ForwardIndexCreator extends Closeable {
   default void putStringMV(String[] values) {
     throw new UnsupportedOperationException();
   }
+
+  /**
+   * Writes the next byte[] type multi-value into the forward index.
+   *
+   * @param values Values to write
+   */
+  default void putBytesMV(byte[][] values) {
+    throw new UnsupportedOperationException();
+  }
 }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
index fb92bec..6393aaf 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
@@ -242,4 +242,23 @@ public interface ForwardIndexReader<T extends 
ForwardIndexReaderContext> extends
   default int getStringMV(int docId, String[] valueBuffer, T context) {
     throw new UnsupportedOperationException();
   }
+
+  /**
+   * Reads the bytes type multi-value at the given document id into the passed 
in value buffer (the buffer size must
+   * be enough to hold all the values for the multi-value entry) and returns 
the number of values within the multi-value
+   * entry.
+   *
+   * @param docId Document id
+   * @param valueBuffer Value buffer
+   * @param context Reader context
+   * @return Number of values within the multi-value entry
+   */
+  default int getBytesMV(int docId, byte[][] valueBuffer, T context) {
+    throw new UnsupportedOperationException();
+  }
+
+  default int getFloatMV(int docId, float[] valueBuffer, T context, int[] 
parentIndices) {
+    throw new UnsupportedOperationException();
+  }
+
 }
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
index b506106..aa41810 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
@@ -309,7 +309,7 @@ public class DictionaryToRawIndexConverter {
     int lengthOfLongestEntry = (storedType == DataType.STRING) ? 
getLengthOfLongestEntry(dictionary) : -1;
 
     try (ForwardIndexCreator rawIndexCreator = SegmentColumnarIndexCreator
-        .getRawIndexCreatorForColumn(newSegment, compressionType, column, 
storedType, numDocs, lengthOfLongestEntry,
+        .getRawIndexCreatorForSVColumn(newSegment, compressionType, column, 
storedType, numDocs, lengthOfLongestEntry,
             false, BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
         ForwardIndexReaderContext readerContext = reader.createContext()) {
       switch (storedType) {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to