This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new cf8fd93759 MultiValue VarByte V4 index writer and consolidate V4 
reader for all types (#11674)
cf8fd93759 is described below

commit cf8fd93759ebfc521141190196922aa26787c7e1
Author: Saurabh Dubey <saurabhd...@gmail.com>
AuthorDate: Thu Sep 28 23:22:41 2023 +0530

    MultiValue VarByte V4 index writer and consolidate V4 reader for all types 
(#11674)
---
 .../pinot/perf/BenchmarkRawForwardIndexReader.java |   8 +-
 .../impl/VarByteChunkForwardIndexWriter.java       |   6 +-
 .../impl/VarByteChunkForwardIndexWriterV4.java     |  48 +++++++
 .../local/io/writer/impl/VarByteChunkWriter.java   |   4 +
 .../impl/fwd/MultiValueVarByteRawIndexCreator.java |  17 +--
 .../index/forward/ForwardIndexReaderFactory.java   |  32 +++--
 ....java => VarByteChunkForwardIndexReaderV4.java} | 159 ++++++++++++++++++++-
 .../impl/VarByteChunkSVForwardIndexWriterTest.java |   4 +-
 .../MultiValueFixedByteRawIndexCreatorTest.java    |  59 ++++----
 .../MultiValueVarByteRawIndexCreatorTest.java      |  43 +++---
 .../segment/index/creator/VarByteChunkV4Test.java  |   8 +-
 11 files changed, 300 insertions(+), 88 deletions(-)

diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexReader.java
 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexReader.java
index 6da32f2906..31c106cd03 100644
--- 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexReader.java
+++ 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexReader.java
@@ -28,8 +28,8 @@ import org.apache.commons.io.FileUtils;
 import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriter;
 import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext;
+import 
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkForwardIndexReaderV4;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
-import 
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReaderV4;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -184,9 +184,9 @@ public class BenchmarkRawForwardIndexReader {
   public void readV4(V4State state, Blackhole bh)
       throws IOException {
     try (PinotDataBuffer buffer = 
PinotDataBuffer.loadBigEndianFile(state._file);
-        VarByteChunkSVForwardIndexReaderV4 reader =
-        new VarByteChunkSVForwardIndexReaderV4(buffer, 
FieldSpec.DataType.BYTES);
-        VarByteChunkSVForwardIndexReaderV4.ReaderContext context = 
reader.createContext()) {
+        VarByteChunkForwardIndexReaderV4 reader =
+        new VarByteChunkForwardIndexReaderV4(buffer, FieldSpec.DataType.BYTES, 
true);
+        VarByteChunkForwardIndexReaderV4.ReaderContext context = 
reader.createContext()) {
       for (int i = 0; i < state._records; i++) {
         bh.consume(reader.getBytes(i, context));
       }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriter.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriter.java
index b6daaf7fe7..fadcce827e 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriter.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriter.java
@@ -99,7 +99,8 @@ public class VarByteChunkForwardIndexWriter extends 
BaseChunkForwardIndexWriter
 
   // Note: some duplication is tolerated between these overloads for the sake 
of memory efficiency
 
-  public void putStrings(String[] values) {
+  @Override
+  public void putStringMV(String[] values) {
     // the entire String[] will be encoded as a single string, write the 
header here
     _chunkBuffer.putInt(_chunkHeaderOffset, _chunkDataOffSet);
     _chunkHeaderOffset += CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
@@ -122,7 +123,8 @@ public class VarByteChunkForwardIndexWriter extends 
BaseChunkForwardIndexWriter
     writeChunkIfNecessary();
   }
 
-  public void putByteArrays(byte[][] values) {
+  @Override
+  public void putBytesMV(byte[][] values) {
     // the entire byte[][] will be encoded as a single string, write the 
header here
     _chunkBuffer.putInt(_chunkHeaderOffset, _chunkDataOffSet);
     _chunkHeaderOffset += CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java
index d70ed2dcbc..35c61f35f5 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java
@@ -37,6 +37,8 @@ import org.apache.pinot.spi.utils.BigDecimalUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+
 
 /**
  * Chunk-based raw (non-dictionary-encoded) forward index writer where each 
chunk contains variable number of docs, and
@@ -142,6 +144,52 @@ public class VarByteChunkForwardIndexWriterV4 implements 
VarByteChunkWriter {
     _nextDocId++;
   }
 
+  @Override
+  public void putStringMV(String[] values) {
+    // num values + length of each value
+    int headerSize = Integer.BYTES + Integer.BYTES * values.length;
+    int size = headerSize;
+    byte[][] stringBytes = new byte[values.length][];
+    for (int i = 0; i < values.length; i++) {
+      stringBytes[i] = values[i].getBytes(UTF_8);
+      size += stringBytes[i].length;
+    }
+
+    // Format : 
[numValues][length1][length2]...[lengthN][value1][value2]...[valueN]
+    byte[] serializedBytes = new byte[size];
+    ByteBuffer byteBuffer = ByteBuffer.wrap(serializedBytes);
+    byteBuffer.putInt(values.length);
+    byteBuffer.position(headerSize);
+    for (int i = 0; i < values.length; i++) {
+      byteBuffer.putInt((i + 1) * Integer.BYTES, stringBytes[i].length);
+      byteBuffer.put(stringBytes[i]);
+    }
+
+    putBytes(serializedBytes);
+  }
+
+  @Override
+  public void putBytesMV(byte[][] values) {
+    // num values + length of each value
+    int headerSize = Integer.BYTES + Integer.BYTES * values.length;
+    int size = headerSize;
+    for (byte[] value : values) {
+      size += value.length;
+    }
+
+    // Format : 
[numValues][length1][length2]...[lengthN][bytes1][bytes2]...[bytesN]
+    byte[] serializedBytes = new byte[size];
+    ByteBuffer byteBuffer = ByteBuffer.wrap(serializedBytes);
+    byteBuffer.putInt(values.length);
+    byteBuffer.position(headerSize);
+    for (int i = 0; i < values.length; i++) {
+      byteBuffer.putInt((i + 1) * Integer.BYTES, values[i].length);
+      byteBuffer.put(values[i]);
+    }
+
+    putBytes(serializedBytes);
+  }
+
   private void writeHugeChunk(byte[] bytes) {
     // huge values where the bytes and their length prefix don't fit in to the 
remainder of the buffer after the prefix
     // for the number of documents in a regular chunk are written as a single 
value without metadata, and these chunks
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkWriter.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkWriter.java
index 1e6dbc2837..bf3537d67c 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkWriter.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkWriter.java
@@ -28,4 +28,8 @@ public interface VarByteChunkWriter extends Closeable {
   void putString(String value);
 
   void putBytes(byte[] value);
+
+  void putStringMV(String[] values);
+
+  void putBytesMV(byte[][] values);
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
index 85dba85ab9..0c41ce2c6e 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
@@ -23,6 +23,7 @@ import java.io.File;
 import java.io.IOException;
 import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriter;
 import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
+import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkWriter;
 import org.apache.pinot.segment.spi.V1Constants.Indexes;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
@@ -37,7 +38,7 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
 public class MultiValueVarByteRawIndexCreator implements ForwardIndexCreator {
   private static final int TARGET_MAX_CHUNK_SIZE = 1024 * 1024;
 
-  private final VarByteChunkForwardIndexWriter _indexWriter;
+  private final VarByteChunkWriter _indexWriter;
   private final DataType _valueType;
 
   /**
@@ -80,13 +81,9 @@ public class MultiValueVarByteRawIndexCreator implements 
ForwardIndexCreator {
     int numDocsPerChunk = Math.max(
         TARGET_MAX_CHUNK_SIZE / (totalMaxLength + 
VarByteChunkForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE),
         1);
-    // TODO: Support V4 MV reader
-    // Currently fall back to V2 for backward compatible
-    if (writerVersion == VarByteChunkForwardIndexWriterV4.VERSION) {
-      writerVersion = 2;
-    }
-    _indexWriter = new VarByteChunkForwardIndexWriter(file, compressionType, 
totalDocs, numDocsPerChunk, totalMaxLength,
-        writerVersion);
+    _indexWriter = writerVersion < VarByteChunkForwardIndexWriterV4.VERSION ? 
new VarByteChunkForwardIndexWriter(file,
+        compressionType, totalDocs, numDocsPerChunk, totalMaxLength, 
writerVersion)
+        : new VarByteChunkForwardIndexWriterV4(file, compressionType, 
TARGET_MAX_CHUNK_SIZE);
     _valueType = valueType;
   }
 
@@ -107,12 +104,12 @@ public class MultiValueVarByteRawIndexCreator implements 
ForwardIndexCreator {
 
   @Override
   public void putStringMV(final String[] values) {
-    _indexWriter.putStrings(values);
+    _indexWriter.putStringMV(values);
   }
 
   @Override
   public void putBytesMV(final byte[][] values) {
-    _indexWriter.putByteArrays(values);
+    _indexWriter.putBytesMV(values);
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java
index ecaaf875a5..6de6e1294b 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java
@@ -25,9 +25,9 @@ import 
org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitSVFo
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkMVForwardIndexReader;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.FixedBytePower2ChunkSVForwardIndexReader;
+import 
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkForwardIndexReaderV4;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
-import 
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReaderV4;
 import 
org.apache.pinot.segment.local.segment.index.readers.sorted.SortedIndexReaderImpl;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
@@ -75,19 +75,31 @@ public class ForwardIndexReaderFactory extends 
IndexReaderFactory.Default<Forwar
   public static ForwardIndexReader createRawIndexReader(PinotDataBuffer 
dataBuffer, DataType storedType,
       boolean isSingleValue) {
     int version = dataBuffer.getInt(0);
+    if (isSingleValue && storedType.isFixedWidth()) {
+      return version == FixedBytePower2ChunkSVForwardIndexReader.VERSION
+          ? new FixedBytePower2ChunkSVForwardIndexReader(dataBuffer, 
storedType)
+          : new FixedByteChunkSVForwardIndexReader(dataBuffer, storedType);
+    }
+
+    if (version == VarByteChunkForwardIndexWriterV4.VERSION) {
+      // V4 reader is common for sv var byte, mv fixed byte and mv var byte
+      return new VarByteChunkForwardIndexReaderV4(dataBuffer, storedType, 
isSingleValue);
+    } else {
+      return createNonV4RawIndexReader(dataBuffer, storedType, isSingleValue);
+    }
+  }
+
+  private static ForwardIndexReader createNonV4RawIndexReader(PinotDataBuffer 
dataBuffer, DataType storedType,
+      boolean isSingleValue) {
+    // Only reach here if SV + raw + var byte + non v4 or MV + non v4
     if (isSingleValue) {
+      return new VarByteChunkSVForwardIndexReader(dataBuffer, storedType);
+    } else {
       if (storedType.isFixedWidth()) {
-        return version == FixedBytePower2ChunkSVForwardIndexReader.VERSION
-            ? new FixedBytePower2ChunkSVForwardIndexReader(dataBuffer, 
storedType)
-            : new FixedByteChunkSVForwardIndexReader(dataBuffer, storedType);
+        return new FixedByteChunkMVForwardIndexReader(dataBuffer, storedType);
       } else {
-        return version == VarByteChunkForwardIndexWriterV4.VERSION ? new 
VarByteChunkSVForwardIndexReaderV4(dataBuffer,
-            storedType) : new VarByteChunkSVForwardIndexReader(dataBuffer, 
storedType);
+        return new VarByteChunkMVForwardIndexReader(dataBuffer, storedType);
       }
-    } else {
-      // TODO: Support V4 MV reader
-      return storedType.isFixedWidth() ? new 
FixedByteChunkMVForwardIndexReader(dataBuffer, storedType)
-          : new VarByteChunkMVForwardIndexReader(dataBuffer, storedType);
     }
   }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReaderV4.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java
similarity index 65%
rename from 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReaderV4.java
rename to 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java
index c1d842b23c..4858e790cb 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReaderV4.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java
@@ -39,13 +39,13 @@ import org.slf4j.LoggerFactory;
 
 
 /**
- * Chunk-based single-value raw (non-dictionary-encoded) forward index reader 
for values of variable length data type
- * (BIG_DECIMAL, STRING, BYTES).
+ * Chunk-based raw (non-dictionary-encoded) forward index reader for values of 
SV variable length data types
+ * (BIG_DECIMAL, STRING, BYTES), MV fixed length and MV variable length data 
types.
  * <p>For data layout, please refer to the documentation for {@link 
VarByteChunkForwardIndexWriterV4}
  */
-public class VarByteChunkSVForwardIndexReaderV4
-    implements 
ForwardIndexReader<VarByteChunkSVForwardIndexReaderV4.ReaderContext> {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(VarByteChunkSVForwardIndexReaderV4.class);
+public class VarByteChunkForwardIndexReaderV4
+    implements 
ForwardIndexReader<VarByteChunkForwardIndexReaderV4.ReaderContext> {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(VarByteChunkForwardIndexReaderV4.class);
   private static final int METADATA_ENTRY_SIZE = 8;
 
   private final FieldSpec.DataType _storedType;
@@ -55,8 +55,10 @@ public class VarByteChunkSVForwardIndexReaderV4
 
   private final PinotDataBuffer _metadata;
   private final PinotDataBuffer _chunks;
+  private final boolean _isSingleValue;
 
-  public VarByteChunkSVForwardIndexReaderV4(PinotDataBuffer dataBuffer, 
FieldSpec.DataType storedType) {
+  public VarByteChunkForwardIndexReaderV4(PinotDataBuffer dataBuffer, 
FieldSpec.DataType storedType,
+      boolean isSingleValue) {
     int version = dataBuffer.getInt(0);
     Preconditions.checkState(version == 
VarByteChunkForwardIndexWriterV4.VERSION, "Illegal index version: %s", version);
     _storedType = storedType;
@@ -67,6 +69,7 @@ public class VarByteChunkSVForwardIndexReaderV4
     // the file has a BE header for compatability reasons (version selection) 
but the content is LE
     _metadata = dataBuffer.view(16, chunksOffset, ByteOrder.LITTLE_ENDIAN);
     _chunks = dataBuffer.view(chunksOffset, dataBuffer.size(), 
ByteOrder.LITTLE_ENDIAN);
+    _isSingleValue = isSingleValue;
   }
 
   @Override
@@ -76,7 +79,7 @@ public class VarByteChunkSVForwardIndexReaderV4
 
   @Override
   public boolean isSingleValue() {
-    return true;
+    return _isSingleValue;
   }
 
   @Override
@@ -113,6 +116,148 @@ public class VarByteChunkSVForwardIndexReaderV4
     return context.getValue(docId);
   }
 
+  @Override
+  public int getIntMV(int docId, int[] valueBuffer, 
VarByteChunkForwardIndexReaderV4.ReaderContext context) {
+    ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId));
+    int numValues = byteBuffer.getInt();
+    for (int i = 0; i < numValues; i++) {
+      valueBuffer[i] = byteBuffer.getInt();
+    }
+    return numValues;
+  }
+
+  @Override
+  public int[] getIntMV(int docId, 
VarByteChunkForwardIndexReaderV4.ReaderContext context) {
+    ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId));
+    int numValues = byteBuffer.getInt();
+    int[] valueBuffer = new int[numValues];
+    for (int i = 0; i < numValues; i++) {
+      valueBuffer[i] = byteBuffer.getInt();
+    }
+    return valueBuffer;
+  }
+
+  @Override
+  public int getLongMV(int docId, long[] valueBuffer, 
VarByteChunkForwardIndexReaderV4.ReaderContext context) {
+    ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId));
+    int numValues = byteBuffer.getInt();
+    for (int i = 0; i < numValues; i++) {
+      valueBuffer[i] = byteBuffer.getLong();
+    }
+    return numValues;
+  }
+
+  @Override
+  public long[] getLongMV(int docId, 
VarByteChunkForwardIndexReaderV4.ReaderContext context) {
+    ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId));
+    int numValues = byteBuffer.getInt();
+    long[] valueBuffer = new long[numValues];
+    for (int i = 0; i < numValues; i++) {
+      valueBuffer[i] = byteBuffer.getLong();
+    }
+    return valueBuffer;
+  }
+
+  @Override
+  public int getFloatMV(int docId, float[] valueBuffer, 
VarByteChunkForwardIndexReaderV4.ReaderContext context) {
+    ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId));
+    int numValues = byteBuffer.getInt();
+    for (int i = 0; i < numValues; i++) {
+      valueBuffer[i] = byteBuffer.getFloat();
+    }
+    return numValues;
+  }
+
+  @Override
+  public float[] getFloatMV(int docId, 
VarByteChunkForwardIndexReaderV4.ReaderContext context) {
+    ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId));
+    int numValues = byteBuffer.getInt();
+    float[] valueBuffer = new float[numValues];
+    for (int i = 0; i < numValues; i++) {
+      valueBuffer[i] = byteBuffer.getFloat();
+    }
+    return valueBuffer;
+  }
+
+  @Override
+  public int getDoubleMV(int docId, double[] valueBuffer, 
VarByteChunkForwardIndexReaderV4.ReaderContext context) {
+    ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId));
+    int numValues = byteBuffer.getInt();
+    for (int i = 0; i < numValues; i++) {
+      valueBuffer[i] = byteBuffer.getDouble();
+    }
+    return numValues;
+  }
+
+  @Override
+  public double[] getDoubleMV(int docId, 
VarByteChunkForwardIndexReaderV4.ReaderContext context) {
+    ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId));
+    int numValues = byteBuffer.getInt();
+    double[] valueBuffer = new double[numValues];
+    for (int i = 0; i < numValues; i++) {
+      valueBuffer[i] = byteBuffer.getFloat();
+    }
+    return valueBuffer;
+  }
+
+  @Override
+  public int getStringMV(int docId, String[] valueBuffer, 
VarByteChunkForwardIndexReaderV4.ReaderContext context) {
+    ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId));
+    int numValues = byteBuffer.getInt();
+    byteBuffer.position((numValues + 1) * Integer.BYTES);
+    for (int i = 0; i < numValues; i++) {
+      int length = byteBuffer.getInt((i + 1) * Integer.BYTES);
+      byte[] bytes = new byte[length];
+      byteBuffer.get(bytes);
+      valueBuffer[i] = new String(bytes, StandardCharsets.UTF_8);
+    }
+    return numValues;
+  }
+
+  @Override
+  public String[] getStringMV(int docId, 
VarByteChunkForwardIndexReaderV4.ReaderContext context) {
+    ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId));
+    int numValues = byteBuffer.getInt();
+    byteBuffer.position((numValues + 1) * Integer.BYTES);
+    String[] valueBuffer = new String[numValues];
+    for (int i = 0; i < numValues; i++) {
+      int length = byteBuffer.getInt((i + 1) * Integer.BYTES);
+      byte[] bytes = new byte[length];
+      byteBuffer.get(bytes);
+      valueBuffer[i] = new String(bytes, StandardCharsets.UTF_8);
+    }
+    return valueBuffer;
+  }
+
+  @Override
+  public int getBytesMV(int docId, byte[][] valueBuffer, 
VarByteChunkForwardIndexReaderV4.ReaderContext context) {
+    ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId));
+    int numValues = byteBuffer.getInt();
+    byteBuffer.position((numValues + 1) * Integer.BYTES);
+    for (int i = 0; i < numValues; i++) {
+      int length = byteBuffer.getInt((i + 1) * Integer.BYTES);
+      byte[] bytes = new byte[length];
+      byteBuffer.get(bytes, 0, length);
+      valueBuffer[i] = bytes;
+    }
+    return numValues;
+  }
+
+  @Override
+  public byte[][] getBytesMV(int docId, 
VarByteChunkForwardIndexReaderV4.ReaderContext context) {
+    ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId));
+    int numValues = byteBuffer.getInt();
+    byteBuffer.position((numValues + 1) * Integer.BYTES);
+    byte[][] valueBuffer = new byte[numValues][];
+    for (int i = 0; i < numValues; i++) {
+      int length = byteBuffer.getInt((i + 1) * Integer.BYTES);
+      byte[] bytes = new byte[length];
+      byteBuffer.get(bytes, 0, length);
+      valueBuffer[i] = bytes;
+    }
+    return valueBuffer;
+  }
+
   @Override
   public void close()
       throws IOException {
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriterTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriterTest.java
index e3d89157db..5a46c9d3f3 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriterTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriterTest.java
@@ -87,7 +87,7 @@ public class VarByteChunkSVForwardIndexWriterTest {
     try (VarByteChunkForwardIndexWriter writer = new 
VarByteChunkForwardIndexWriter(file, compressionType, totalDocs,
         numDocsPerChunk, maxEntryLengthInBytes, version)) {
       for (String[] array : arrays) {
-        writer.putStrings(array);
+        writer.putStringMV(array);
       }
     }
     try (VarByteChunkSVForwardIndexReader reader = new 
VarByteChunkSVForwardIndexReader(
@@ -122,7 +122,7 @@ public class VarByteChunkSVForwardIndexWriterTest {
     try (VarByteChunkForwardIndexWriter writer = new 
VarByteChunkForwardIndexWriter(file, compressionType, totalDocs,
         numDocsPerChunk, maxEntryLengthInBytes, version)) {
       for (String[] array : arrays) {
-        writer.putByteArrays(Arrays.stream(array).map(str -> 
str.getBytes(UTF_8)).toArray(byte[][]::new));
+        writer.putBytesMV(Arrays.stream(array).map(str -> 
str.getBytes(UTF_8)).toArray(byte[][]::new));
       }
     }
     try (VarByteChunkSVForwardIndexReader reader = new 
VarByteChunkSVForwardIndexReader(
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java
index f6ae39aa3b..a0cf8c7a97 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java
@@ -29,11 +29,14 @@ import java.util.function.ToIntFunction;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.commons.io.FileUtils;
+import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
 import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueFixedByteRawIndexCreator;
-import 
org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkMVForwardIndexReader;
+import 
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkForwardIndexReaderV4;
 import org.apache.pinot.segment.spi.V1Constants.Indexes;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.testng.Assert;
@@ -52,7 +55,9 @@ public class MultiValueFixedByteRawIndexCreatorTest {
 
   @DataProvider(name = "compressionTypes")
   public Object[][] compressionTypes() {
-    return Arrays.stream(ChunkCompressionType.values()).map(ct -> new 
Object[]{ct}).toArray(Object[][]::new);
+    return Arrays.stream(ChunkCompressionType.values())
+        .flatMap(ct -> IntStream.of(2, 4).boxed()
+            .map(writerVersion -> new Object[]{ct, 
writerVersion})).toArray(Object[][]::new);
   }
 
   @BeforeClass
@@ -70,98 +75,98 @@ public class MultiValueFixedByteRawIndexCreatorTest {
   }
 
   @Test(dataProvider = "compressionTypes")
-  public void testMVInt(ChunkCompressionType compressionType)
+  public void testMVInt(ChunkCompressionType compressionType, int 
writerVersion)
       throws IOException {
     // This tests varying lengths of MV rows
     testMV(DataType.INT, ints(false), x -> x.length, int[]::new, 
MultiValueFixedByteRawIndexCreator::putIntMV,
         (reader, context, docId, buffer) -> {
           int length = reader.getIntMV(docId, buffer, context);
           return Arrays.copyOf(buffer, length);
-        }, compressionType);
+        }, compressionType, writerVersion);
 
     // This tests a fixed length of MV rows to ensure there are no 
BufferOverflowExceptions on filling up the chunk
     testMV(DataType.INT, ints(true), x -> x.length, int[]::new, 
MultiValueFixedByteRawIndexCreator::putIntMV,
         (reader, context, docId, buffer) -> {
           int length = reader.getIntMV(docId, buffer, context);
           return Arrays.copyOf(buffer, length);
-        }, compressionType);
+        }, compressionType, writerVersion);
   }
 
   @Test(dataProvider = "compressionTypes")
-  public void testMVLong(ChunkCompressionType compressionType)
+  public void testMVLong(ChunkCompressionType compressionType, int 
writerVersion)
       throws IOException {
     // This tests varying lengths of MV rows
     testMV(DataType.LONG, longs(false), x -> x.length, long[]::new, 
MultiValueFixedByteRawIndexCreator::putLongMV,
         (reader, context, docId, buffer) -> {
           int length = reader.getLongMV(docId, buffer, context);
           return Arrays.copyOf(buffer, length);
-        }, compressionType);
+        }, compressionType, writerVersion);
 
     // This tests a fixed length of MV rows to ensure there are no 
BufferOverflowExceptions on filling up the chunk
     testMV(DataType.LONG, longs(true), x -> x.length, long[]::new, 
MultiValueFixedByteRawIndexCreator::putLongMV,
         (reader, context, docId, buffer) -> {
           int length = reader.getLongMV(docId, buffer, context);
           return Arrays.copyOf(buffer, length);
-        }, compressionType);
+        }, compressionType, writerVersion);
   }
 
   @Test(dataProvider = "compressionTypes")
-  public void testMVFloat(ChunkCompressionType compressionType)
+  public void testMVFloat(ChunkCompressionType compressionType, int 
writerVersion)
       throws IOException {
     // This tests varying lengths of MV rows
     testMV(DataType.FLOAT, floats(false), x -> x.length, float[]::new, 
MultiValueFixedByteRawIndexCreator::putFloatMV,
         (reader, context, docId, buffer) -> {
           int length = reader.getFloatMV(docId, buffer, context);
           return Arrays.copyOf(buffer, length);
-        }, compressionType);
+        }, compressionType, writerVersion);
 
     // This tests a fixed length of MV rows to ensure there are no 
BufferOverflowExceptions on filling up the chunk
     testMV(DataType.FLOAT, floats(true), x -> x.length, float[]::new, 
MultiValueFixedByteRawIndexCreator::putFloatMV,
         (reader, context, docId, buffer) -> {
           int length = reader.getFloatMV(docId, buffer, context);
           return Arrays.copyOf(buffer, length);
-        }, compressionType);
+        }, compressionType, writerVersion);
   }
 
   @Test(dataProvider = "compressionTypes")
-  public void testMVDouble(ChunkCompressionType compressionType)
+  public void testMVDouble(ChunkCompressionType compressionType, int 
writerVersion)
       throws IOException {
     // This tests varying lengths of MV rows
     testMV(DataType.DOUBLE, doubles(false), x -> x.length, double[]::new,
-        MultiValueFixedByteRawIndexCreator::putDoubleMV,
-        (reader, context, docId, buffer) -> {
+        MultiValueFixedByteRawIndexCreator::putDoubleMV, (reader, context, 
docId, buffer) -> {
           int length = reader.getDoubleMV(docId, buffer, context);
           return Arrays.copyOf(buffer, length);
-        }, compressionType);
+        }, compressionType, writerVersion);
 
     // This tests a fixed length of MV rows to ensure there are no 
BufferOverflowExceptions on filling up the chunk
     testMV(DataType.DOUBLE, doubles(true), x -> x.length, double[]::new,
-        MultiValueFixedByteRawIndexCreator::putDoubleMV,
-        (reader, context, docId, buffer) -> {
+        MultiValueFixedByteRawIndexCreator::putDoubleMV, (reader, context, 
docId, buffer) -> {
           int length = reader.getDoubleMV(docId, buffer, context);
           return Arrays.copyOf(buffer, length);
-        }, compressionType);
+        }, compressionType, writerVersion);
   }
 
   public <T> void testMV(DataType dataType, List<T> inputs, ToIntFunction<T> 
sizeof, IntFunction<T> constructor,
-      Injector<T> injector, Extractor<T> extractor, ChunkCompressionType 
compressionType)
+      Injector<T> injector, Extractor<T> extractor, ChunkCompressionType 
compressionType, int writerVersion)
       throws IOException {
     String column = "testCol_" + dataType;
     int numDocs = inputs.size();
     int maxElements = 
inputs.stream().mapToInt(sizeof).max().orElseThrow(RuntimeException::new);
     File file = new File(OUTPUT_DIR, column + 
Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
     file.delete();
-    MultiValueFixedByteRawIndexCreator creator = new 
MultiValueFixedByteRawIndexCreator(new File(OUTPUT_DIR),
-        compressionType, column, numDocs, dataType, maxElements);
+    MultiValueFixedByteRawIndexCreator creator =
+        new MultiValueFixedByteRawIndexCreator(new File(OUTPUT_DIR), 
compressionType, column, numDocs, dataType,
+            maxElements, false, writerVersion);
     inputs.forEach(input -> injector.inject(creator, input));
     creator.close();
 
     //read
-    final PinotDataBuffer buffer = PinotDataBuffer
-        .mapFile(file, true, 0, file.length(), ByteOrder.BIG_ENDIAN, "");
-    FixedByteChunkMVForwardIndexReader reader = new 
FixedByteChunkMVForwardIndexReader(buffer,
-        dataType.getStoredType());
-    final ChunkReaderContext context = reader.createContext();
+    final PinotDataBuffer buffer = PinotDataBuffer.mapFile(file, true, 0, 
file.length(), ByteOrder.BIG_ENDIAN, "");
+    ForwardIndexReader reader =
+        writerVersion == VarByteChunkForwardIndexWriterV4.VERSION ? new 
VarByteChunkForwardIndexReaderV4(buffer,
+            dataType.getStoredType(), false) : new 
FixedByteChunkMVForwardIndexReader(buffer, dataType.getStoredType());
+
+    final ForwardIndexReaderContext context = reader.createContext();
     T valueBuffer = constructor.apply(maxElements);
     for (int i = 0; i < numDocs; i++) {
       Assert.assertEquals(inputs.get(i), extractor.extract(reader, context, i, 
valueBuffer));
@@ -169,7 +174,7 @@ public class MultiValueFixedByteRawIndexCreatorTest {
   }
 
   interface Extractor<T> {
-    T extract(FixedByteChunkMVForwardIndexReader reader, ChunkReaderContext 
context, int offset, T buffer);
+    T extract(ForwardIndexReader reader, ForwardIndexReaderContext context, 
int offset, T buffer);
   }
 
   interface Injector<T> {
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
index 1006245586..32f4ff1a2c 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
@@ -30,10 +30,11 @@ import java.util.stream.IntStream;
 import java.util.stream.Stream;
 import org.apache.commons.io.FileUtils;
 import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator;
-import 
org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext;
-import 
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader;
+import 
org.apache.pinot.segment.local.segment.index.forward.ForwardIndexReaderFactory;
 import org.apache.pinot.segment.spi.V1Constants.Indexes;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.testng.Assert;
@@ -56,12 +57,12 @@ public class MultiValueVarByteRawIndexCreatorTest {
 
   @DataProvider
   public Object[][] params() {
-    return Arrays.stream(ChunkCompressionType.values())
-        .flatMap(chunkCompressionType -> IntStream.of(10, 15, 20, 1000).boxed()
-            .flatMap(useFullSize -> Stream.of(true, false)
-                .flatMap(maxLength -> IntStream.range(1, 20).map(i -> i * 2 - 
1).boxed()
-                    .map(maxNumEntries -> new Object[]{chunkCompressionType, 
useFullSize, maxLength,
-                        maxNumEntries}))))
+    return 
Arrays.stream(ChunkCompressionType.values()).flatMap(chunkCompressionType -> 
IntStream.of(2, 4).boxed()
+            .flatMap(writerVersion -> IntStream.of(10, 15, 20, 
1000).boxed().flatMap(maxLength -> Stream.of(true, false)
+                .flatMap(
+                    useFullSize -> IntStream.range(1, 20).map(i -> i * 2 - 
1).boxed().map(maxNumEntries -> new Object[]{
+                        chunkCompressionType, useFullSize, writerVersion, 
maxLength, maxNumEntries
+                    })))))
         .toArray(Object[][]::new);
   }
 
@@ -86,7 +87,8 @@ public class MultiValueVarByteRawIndexCreatorTest {
   }
 
   @Test(dataProvider = "params")
-  public void testMVString(ChunkCompressionType compressionType, int 
maxLength, boolean useFullSize, int maxNumEntries)
+  public void testMVString(ChunkCompressionType compressionType, boolean 
useFullSize, int writerVersion, int maxLength,
+      int maxNumEntries)
       throws IOException {
     String column = "testCol-" + UUID.randomUUID();
     int numDocs = 1000;
@@ -117,18 +119,16 @@ public class MultiValueVarByteRawIndexCreatorTest {
       inputs.add(values);
     }
     try (MultiValueVarByteRawIndexCreator creator = new 
MultiValueVarByteRawIndexCreator(OUTPUT_DIR, compressionType,
-        column, numDocs, DataType.STRING, maxTotalLength, maxElements)) {
+        column, numDocs, DataType.STRING, maxTotalLength, maxElements, 
writerVersion)) {
       for (String[] input : inputs) {
         creator.putStringMV(input);
       }
     }
 
     //read
-    final PinotDataBuffer buffer = PinotDataBuffer
-        .mapFile(file, true, 0, file.length(), ByteOrder.BIG_ENDIAN, "");
-    VarByteChunkMVForwardIndexReader reader = new 
VarByteChunkMVForwardIndexReader(buffer,
-        DataType.STRING);
-    final ChunkReaderContext context = reader.createContext();
+    final PinotDataBuffer buffer = PinotDataBuffer.mapFile(file, true, 0, 
file.length(), ByteOrder.BIG_ENDIAN, "");
+    ForwardIndexReader reader = 
ForwardIndexReaderFactory.createRawIndexReader(buffer, DataType.STRING, false);
+    final ForwardIndexReaderContext context = reader.createContext();
     String[] values = new String[maxElements];
     for (int i = 0; i < numDocs; i++) {
       int length = reader.getStringMV(i, values, context);
@@ -138,7 +138,8 @@ public class MultiValueVarByteRawIndexCreatorTest {
   }
 
   @Test(dataProvider = "params")
-  public void testMVBytes(ChunkCompressionType compressionType, int maxLength, 
boolean useFullSize, int maxNumEntries)
+  public void testMVBytes(ChunkCompressionType compressionType, boolean 
useFullSize, int writerVersion, int maxLength,
+      int maxNumEntries)
       throws IOException {
     String column = "testCol-" + UUID.randomUUID();
     int numDocs = 1000;
@@ -169,18 +170,16 @@ public class MultiValueVarByteRawIndexCreatorTest {
       inputs.add(values);
     }
     try (MultiValueVarByteRawIndexCreator creator = new 
MultiValueVarByteRawIndexCreator(OUTPUT_DIR, compressionType,
-        column, numDocs, DataType.STRING, maxTotalLength, maxElements)) {
+        column, numDocs, DataType.BYTES, writerVersion, maxTotalLength, 
maxElements)) {
       for (byte[][] input : inputs) {
         creator.putBytesMV(input);
       }
     }
 
     //read
-    final PinotDataBuffer buffer = PinotDataBuffer
-        .mapFile(file, true, 0, file.length(), ByteOrder.BIG_ENDIAN, "");
-    VarByteChunkMVForwardIndexReader reader = new 
VarByteChunkMVForwardIndexReader(buffer,
-        DataType.BYTES);
-    final ChunkReaderContext context = reader.createContext();
+    final PinotDataBuffer buffer = PinotDataBuffer.mapFile(file, true, 0, 
file.length(), ByteOrder.BIG_ENDIAN, "");
+    ForwardIndexReader reader = 
ForwardIndexReaderFactory.createRawIndexReader(buffer, DataType.BYTES, false);
+    final ForwardIndexReaderContext context = reader.createContext();
     byte[][] values = new byte[maxElements][];
     for (int i = 0; i < numDocs; i++) {
       int length = reader.getBytesMV(i, values, context);
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/VarByteChunkV4Test.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/VarByteChunkV4Test.java
index ba59d057a7..88414b8b58 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/VarByteChunkV4Test.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/VarByteChunkV4Test.java
@@ -31,7 +31,7 @@ import java.util.stream.IntStream;
 import java.util.stream.Stream;
 import org.apache.commons.io.FileUtils;
 import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
-import 
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReaderV4;
+import 
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkForwardIndexReaderV4;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -113,8 +113,8 @@ public class VarByteChunkV4Test {
       }
     }
     try (PinotDataBuffer buffer = 
PinotDataBuffer.mapReadOnlyBigEndianFile(_file)) {
-      try (VarByteChunkSVForwardIndexReaderV4 reader = new 
VarByteChunkSVForwardIndexReaderV4(buffer, dataType);
-          VarByteChunkSVForwardIndexReaderV4.ReaderContext context = 
reader.createContext()) {
+      try (VarByteChunkForwardIndexReaderV4 reader = new 
VarByteChunkForwardIndexReaderV4(buffer, dataType,
+          true); VarByteChunkForwardIndexReaderV4.ReaderContext context = 
reader.createContext()) {
         for (int i = 0; i < values.size(); i++) {
           assertEquals(read.read(reader, context, i), values.get(i));
         }
@@ -161,7 +161,7 @@ public class VarByteChunkV4Test {
 
   @FunctionalInterface
   interface Read<T> {
-    T read(VarByteChunkSVForwardIndexReaderV4 reader, 
VarByteChunkSVForwardIndexReaderV4.ReaderContext context,
+    T read(VarByteChunkForwardIndexReaderV4 reader, 
VarByteChunkForwardIndexReaderV4.ReaderContext context,
         int docId);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to