This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b067ae1924 Extract common MV ser/de logic into ArraySerDeUtils (#14209)
b067ae1924 is described below

commit b067ae1924a532282229e0506188b9375f0ec6d2
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Fri Oct 11 16:00:51 2024 -0700

    Extract common MV ser/de logic into ArraySerDeUtils (#14209)
---
 .../impl/VarByteChunkForwardIndexWriter.java       |  63 ++--
 .../impl/VarByteChunkForwardIndexWriterV4.java     |  59 ++--
 .../local/io/writer/impl/VarByteChunkWriter.java   |   8 +
 .../fwd/MultiValueFixedByteRawIndexCreator.java    |  42 +--
 .../FixedByteChunkMVForwardIndexReader.java        |  64 +---
 .../forward/VarByteChunkForwardIndexReaderV4.java  | 109 +------
 .../forward/VarByteChunkMVForwardIndexReader.java  | 101 +-----
 .../pinot/segment/local/utils/ArraySerDeUtils.java | 363 +++++++++++++++++++++
 8 files changed, 456 insertions(+), 353 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriter.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriter.java
index fadcce827e..0a6bd47925 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriter.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriter.java
@@ -23,6 +23,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.math.BigDecimal;
 import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.pinot.segment.local.utils.ArraySerDeUtils;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 import org.apache.pinot.spi.utils.BigDecimalUtils;
 
@@ -97,54 +98,34 @@ public class VarByteChunkForwardIndexWriter extends 
BaseChunkForwardIndexWriter
     writeChunkIfNecessary();
   }
 
-  // Note: some duplication is tolerated between these overloads for the sake 
of memory efficiency
+  @Override
+  public void putIntMV(int[] values) {
+    putBytes(ArraySerDeUtils.serializeIntArrayWithLength(values));
+  }
+
+  @Override
+  public void putLongMV(long[] values) {
+    putBytes(ArraySerDeUtils.serializeLongArrayWithLength(values));
+  }
+
+  @Override
+  public void putFloatMV(float[] values) {
+    putBytes(ArraySerDeUtils.serializeFloatArrayWithLength(values));
+  }
+
+  @Override
+  public void putDoubleMV(double[] values) {
+    putBytes(ArraySerDeUtils.serializeDoubleArrayWithLength(values));
+  }
 
   @Override
   public void putStringMV(String[] values) {
-    // the entire String[] will be encoded as a single string, write the 
header here
-    _chunkBuffer.putInt(_chunkHeaderOffset, _chunkDataOffSet);
-    _chunkHeaderOffset += CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
-    // write all the strings into the data buffer as if it's a single string,
-    // but with its own embedded header so offsets to strings within the body
-    // can be located
-    _chunkBuffer.putInt(_chunkDataOffSet, values.length);
-    _chunkDataOffSet += Integer.BYTES;
-    int headerSize = Integer.BYTES * values.length;
-    int bodyPosition = _chunkDataOffSet + headerSize;
-    _chunkBuffer.position(bodyPosition);
-    int bodySize = 0;
-    for (int i = 0, h = _chunkDataOffSet; i < values.length; i++, h += 
Integer.BYTES) {
-      byte[] utf8 = values[i].getBytes(UTF_8);
-      _chunkBuffer.putInt(h, utf8.length);
-      _chunkBuffer.put(utf8);
-      bodySize += utf8.length;
-    }
-    _chunkDataOffSet += headerSize + bodySize;
-    writeChunkIfNecessary();
+    putBytes(ArraySerDeUtils.serializeStringArray(values));
   }
 
   @Override
   public void putBytesMV(byte[][] values) {
-    // the entire byte[][] will be encoded as a single string, write the 
header here
-    _chunkBuffer.putInt(_chunkHeaderOffset, _chunkDataOffSet);
-    _chunkHeaderOffset += CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
-    // write all the byte[]s into the data buffer as if it's a single byte[],
-    // but with its own embedded header so offsets to byte[]s within the body
-    // can be located
-    _chunkBuffer.putInt(_chunkDataOffSet, values.length);
-    _chunkDataOffSet += Integer.BYTES;
-    int headerSize = Integer.BYTES * values.length;
-    int bodyPosition = _chunkDataOffSet + headerSize;
-    _chunkBuffer.position(bodyPosition);
-    int bodySize = 0;
-    for (int i = 0, h = _chunkDataOffSet; i < values.length; i++, h += 
Integer.BYTES) {
-      byte[] bytes = values[i];
-      _chunkBuffer.putInt(h, bytes.length);
-      _chunkBuffer.put(bytes);
-      bodySize += bytes.length;
-    }
-    _chunkDataOffSet += headerSize + bodySize;
-    writeChunkIfNecessary();
+    putBytes(ArraySerDeUtils.serializeBytesArray(values));
   }
 
   private void writeChunkIfNecessary() {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java
index 440808a6b0..e7bc30fc70 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java
@@ -30,6 +30,7 @@ import java.nio.charset.StandardCharsets;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory;
+import org.apache.pinot.segment.local.utils.ArraySerDeUtils;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 import org.apache.pinot.segment.spi.compression.ChunkCompressor;
 import org.apache.pinot.segment.spi.memory.CleanerUtil;
@@ -37,8 +38,6 @@ import org.apache.pinot.spi.utils.BigDecimalUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-
 
 /**
  * Chunk-based raw (non-dictionary-encoded) forward index writer where each 
chunk contains variable number of docs, and
@@ -145,49 +144,33 @@ public class VarByteChunkForwardIndexWriterV4 implements 
VarByteChunkWriter {
   }
 
   @Override
-  public void putStringMV(String[] values) {
-    // num values + length of each value
-    int headerSize = Integer.BYTES + Integer.BYTES * values.length;
-    int size = headerSize;
-    byte[][] stringBytes = new byte[values.length][];
-    for (int i = 0; i < values.length; i++) {
-      stringBytes[i] = values[i].getBytes(UTF_8);
-      size += stringBytes[i].length;
-    }
+  public void putIntMV(int[] values) {
+    putBytes(ArraySerDeUtils.serializeIntArrayWithLength(values));
+  }
 
-    // Format : 
[numValues][length1][length2]...[lengthN][value1][value2]...[valueN]
-    byte[] serializedBytes = new byte[size];
-    ByteBuffer byteBuffer = ByteBuffer.wrap(serializedBytes);
-    byteBuffer.putInt(values.length);
-    byteBuffer.position(headerSize);
-    for (int i = 0; i < values.length; i++) {
-      byteBuffer.putInt((i + 1) * Integer.BYTES, stringBytes[i].length);
-      byteBuffer.put(stringBytes[i]);
-    }
+  @Override
+  public void putLongMV(long[] values) {
+    putBytes(ArraySerDeUtils.serializeLongArrayWithLength(values));
+  }
 
-    putBytes(serializedBytes);
+  @Override
+  public void putFloatMV(float[] values) {
+    putBytes(ArraySerDeUtils.serializeFloatArrayWithLength(values));
   }
 
   @Override
-  public void putBytesMV(byte[][] values) {
-    // num values + length of each value
-    int headerSize = Integer.BYTES + Integer.BYTES * values.length;
-    int size = headerSize;
-    for (byte[] value : values) {
-      size += value.length;
-    }
+  public void putDoubleMV(double[] values) {
+    putBytes(ArraySerDeUtils.serializeDoubleArrayWithLength(values));
+  }
 
-    // Format : 
[numValues][length1][length2]...[lengthN][bytes1][bytes2]...[bytesN]
-    byte[] serializedBytes = new byte[size];
-    ByteBuffer byteBuffer = ByteBuffer.wrap(serializedBytes);
-    byteBuffer.putInt(values.length);
-    byteBuffer.position(headerSize);
-    for (int i = 0; i < values.length; i++) {
-      byteBuffer.putInt((i + 1) * Integer.BYTES, values[i].length);
-      byteBuffer.put(values[i]);
-    }
+  @Override
+  public void putStringMV(String[] values) {
+    putBytes(ArraySerDeUtils.serializeStringArray(values));
+  }
 
-    putBytes(serializedBytes);
+  @Override
+  public void putBytesMV(byte[][] values) {
+    putBytes(ArraySerDeUtils.serializeBytesArray(values));
   }
 
   private void writeHugeChunk(byte[] bytes) {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkWriter.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkWriter.java
index bf3537d67c..82fbd2b469 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkWriter.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkWriter.java
@@ -29,6 +29,14 @@ public interface VarByteChunkWriter extends Closeable {
 
   void putBytes(byte[] value);
 
+  void putIntMV(int[] values);
+
+  void putLongMV(long[] values);
+
+  void putFloatMV(float[] values);
+
+  void putDoubleMV(double[] values);
+
   void putStringMV(String[] values);
 
   void putBytesMV(byte[][] values);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
index dcdcb99705..e7877f8b60 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
@@ -20,7 +20,6 @@ package 
org.apache.pinot.segment.local.segment.creator.impl.fwd;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriter;
 import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
 import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkWriter;
@@ -70,7 +69,6 @@ public class MultiValueFixedByteRawIndexCreator implements 
ForwardIndexCreator {
         ForwardIndexConfig.DEFAULT_TARGET_DOCS_PER_CHUNK);
   }
 
-
   public MultiValueFixedByteRawIndexCreator(File indexFile, 
ChunkCompressionType compressionType, int totalDocs,
       DataType valueType, int maxNumberOfMultiValueElements, boolean 
deriveNumDocsPerChunk, int writerVersion,
       int targetMaxChunkSizeBytes, int targetDocsPerChunk)
@@ -108,54 +106,22 @@ public class MultiValueFixedByteRawIndexCreator 
implements ForwardIndexCreator {
 
   @Override
   public void putIntMV(int[] values) {
-    byte[] bytes = new byte[Integer.BYTES + values.length * Integer.BYTES];
-    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
-    //write the length
-    byteBuffer.putInt(values.length);
-    //write the content of each element
-    for (int value : values) {
-      byteBuffer.putInt(value);
-    }
-    _indexWriter.putBytes(bytes);
+    _indexWriter.putIntMV(values);
   }
 
   @Override
   public void putLongMV(long[] values) {
-    byte[] bytes = new byte[Integer.BYTES + values.length * Long.BYTES];
-    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
-    //write the length
-    byteBuffer.putInt(values.length);
-    //write the content of each element
-    for (long value : values) {
-      byteBuffer.putLong(value);
-    }
-    _indexWriter.putBytes(bytes);
+    _indexWriter.putLongMV(values);
   }
 
   @Override
   public void putFloatMV(float[] values) {
-    byte[] bytes = new byte[Integer.BYTES + values.length * Float.BYTES];
-    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
-    //write the length
-    byteBuffer.putInt(values.length);
-    //write the content of each element
-    for (float value : values) {
-      byteBuffer.putFloat(value);
-    }
-    _indexWriter.putBytes(bytes);
+    _indexWriter.putFloatMV(values);
   }
 
   @Override
   public void putDoubleMV(double[] values) {
-    byte[] bytes = new byte[Integer.BYTES + values.length * Double.BYTES];
-    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
-    //write the length
-    byteBuffer.putInt(values.length);
-    //write the content of each element
-    for (double value : values) {
-      byteBuffer.putDouble(value);
-    }
-    _indexWriter.putBytes(bytes);
+    _indexWriter.putDoubleMV(values);
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkMVForwardIndexReader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkMVForwardIndexReader.java
index f96ed6e878..8e53ecb156 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkMVForwardIndexReader.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkMVForwardIndexReader.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import javax.annotation.Nullable;
 import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriter;
+import org.apache.pinot.segment.local.utils.ArraySerDeUtils;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 
@@ -53,92 +54,47 @@ public final class FixedByteChunkMVForwardIndexReader 
extends BaseChunkForwardIn
 
   @Override
   public int getIntMV(int docId, int[] valueBuffer, ChunkReaderContext 
context) {
-    ByteBuffer byteBuffer = slice(docId, context);
-    int numValues = byteBuffer.getInt();
-    for (int i = 0; i < numValues; i++) {
-      valueBuffer[i] = byteBuffer.getInt();
-    }
-    return numValues;
+    return ArraySerDeUtils.deserializeIntArrayWithLength(slice(docId, 
context), valueBuffer);
   }
 
   @Override
   public int[] getIntMV(int docId, ChunkReaderContext context) {
-    ByteBuffer byteBuffer = slice(docId, context);
-    int numValues = byteBuffer.getInt();
-    int[] valueBuffer = new int[numValues];
-    for (int i = 0; i < numValues; i++) {
-      valueBuffer[i] = byteBuffer.getInt();
-    }
-    return valueBuffer;
+    return ArraySerDeUtils.deserializeIntArrayWithLength(slice(docId, 
context));
   }
 
   @Override
   public int getLongMV(int docId, long[] valueBuffer, ChunkReaderContext 
context) {
-    ByteBuffer byteBuffer = slice(docId, context);
-    int numValues = byteBuffer.getInt();
-    for (int i = 0; i < numValues; i++) {
-      valueBuffer[i] = byteBuffer.getLong();
-    }
-    return numValues;
+    return ArraySerDeUtils.deserializeLongArrayWithLength(slice(docId, 
context), valueBuffer);
   }
 
   @Override
   public long[] getLongMV(int docId, ChunkReaderContext context) {
-    ByteBuffer byteBuffer = slice(docId, context);
-    int numValues = byteBuffer.getInt();
-    long[] valueBuffer = new long[numValues];
-    for (int i = 0; i < numValues; i++) {
-      valueBuffer[i] = byteBuffer.getLong();
-    }
-    return valueBuffer;
+    return ArraySerDeUtils.deserializeLongArrayWithLength(slice(docId, 
context));
   }
 
   @Override
   public int getFloatMV(int docId, float[] valueBuffer, ChunkReaderContext 
context) {
-    ByteBuffer byteBuffer = slice(docId, context);
-    int numValues = byteBuffer.getInt();
-    for (int i = 0; i < numValues; i++) {
-      valueBuffer[i] = byteBuffer.getFloat();
-    }
-    return numValues;
+    return ArraySerDeUtils.deserializeFloatArrayWithLength(slice(docId, 
context), valueBuffer);
   }
 
   @Override
   public float[] getFloatMV(int docId, ChunkReaderContext context) {
-    ByteBuffer byteBuffer = slice(docId, context);
-    int numValues = byteBuffer.getInt();
-    float[] valueBuffer = new float[numValues];
-    for (int i = 0; i < numValues; i++) {
-      valueBuffer[i] = byteBuffer.getFloat();
-    }
-    return valueBuffer;
+    return ArraySerDeUtils.deserializeFloatArrayWithLength(slice(docId, 
context));
   }
 
   @Override
   public int getDoubleMV(int docId, double[] valueBuffer, ChunkReaderContext 
context) {
-    ByteBuffer byteBuffer = slice(docId, context);
-    int numValues = byteBuffer.getInt();
-    for (int i = 0; i < numValues; i++) {
-      valueBuffer[i] = byteBuffer.getDouble();
-    }
-    return numValues;
+    return ArraySerDeUtils.deserializeDoubleArrayWithLength(slice(docId, 
context), valueBuffer);
   }
 
   @Override
   public double[] getDoubleMV(int docId, ChunkReaderContext context) {
-    ByteBuffer byteBuffer = slice(docId, context);
-    int numValues = byteBuffer.getInt();
-    double[] valueBuffer = new double[numValues];
-    for (int i = 0; i < numValues; i++) {
-      valueBuffer[i] = byteBuffer.getDouble();
-    }
-    return valueBuffer;
+    return ArraySerDeUtils.deserializeDoubleArrayWithLength(slice(docId, 
context));
   }
 
   @Override
   public int getNumValuesMV(int docId, ChunkReaderContext context) {
-    ByteBuffer byteBuffer = slice(docId, context);
-    return byteBuffer.getInt();
+    return slice(docId, context).getInt();
   }
 
   private ByteBuffer slice(int docId, ChunkReaderContext context) {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java
index 851f5f3e7d..277805d22c 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory;
 import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
+import org.apache.pinot.segment.local.utils.ArraySerDeUtils;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
@@ -46,6 +47,8 @@ import org.slf4j.LoggerFactory;
  * Chunk-based raw (non-dictionary-encoded) forward index reader for values of 
SV variable length data types
  * (BIG_DECIMAL, STRING, BYTES), MV fixed length and MV variable length data 
types.
  * <p>For data layout, please refer to the documentation for {@link 
VarByteChunkForwardIndexWriterV4}
+ *
+ * TODO: Consider reading directly from sliced ByteBuffer instead of copying 
to byte[] first
  */
 public class VarByteChunkForwardIndexReaderV4
     implements 
ForwardIndexReader<VarByteChunkForwardIndexReaderV4.ReaderContext> {
@@ -130,144 +133,62 @@ public class VarByteChunkForwardIndexReaderV4
 
   @Override
   public int getIntMV(int docId, int[] valueBuffer, 
VarByteChunkForwardIndexReaderV4.ReaderContext context) {
-    ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId));
-    int numValues = byteBuffer.getInt();
-    for (int i = 0; i < numValues; i++) {
-      valueBuffer[i] = byteBuffer.getInt();
-    }
-    return numValues;
+    return 
ArraySerDeUtils.deserializeIntArrayWithLength(context.getValue(docId), 
valueBuffer);
   }
 
   @Override
   public int[] getIntMV(int docId, 
VarByteChunkForwardIndexReaderV4.ReaderContext context) {
-    ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId));
-    int numValues = byteBuffer.getInt();
-    int[] valueBuffer = new int[numValues];
-    for (int i = 0; i < numValues; i++) {
-      valueBuffer[i] = byteBuffer.getInt();
-    }
-    return valueBuffer;
+    return 
ArraySerDeUtils.deserializeIntArrayWithLength(context.getValue(docId));
   }
 
   @Override
   public int getLongMV(int docId, long[] valueBuffer, 
VarByteChunkForwardIndexReaderV4.ReaderContext context) {
-    ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId));
-    int numValues = byteBuffer.getInt();
-    for (int i = 0; i < numValues; i++) {
-      valueBuffer[i] = byteBuffer.getLong();
-    }
-    return numValues;
+    return 
ArraySerDeUtils.deserializeLongArrayWithLength(context.getValue(docId), 
valueBuffer);
   }
 
   @Override
   public long[] getLongMV(int docId, 
VarByteChunkForwardIndexReaderV4.ReaderContext context) {
-    ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId));
-    int numValues = byteBuffer.getInt();
-    long[] valueBuffer = new long[numValues];
-    for (int i = 0; i < numValues; i++) {
-      valueBuffer[i] = byteBuffer.getLong();
-    }
-    return valueBuffer;
+    return 
ArraySerDeUtils.deserializeLongArrayWithLength(context.getValue(docId));
   }
 
   @Override
   public int getFloatMV(int docId, float[] valueBuffer, 
VarByteChunkForwardIndexReaderV4.ReaderContext context) {
-    ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId));
-    int numValues = byteBuffer.getInt();
-    for (int i = 0; i < numValues; i++) {
-      valueBuffer[i] = byteBuffer.getFloat();
-    }
-    return numValues;
+    return 
ArraySerDeUtils.deserializeFloatArrayWithLength(context.getValue(docId), 
valueBuffer);
   }
 
   @Override
   public float[] getFloatMV(int docId, 
VarByteChunkForwardIndexReaderV4.ReaderContext context) {
-    ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId));
-    int numValues = byteBuffer.getInt();
-    float[] valueBuffer = new float[numValues];
-    for (int i = 0; i < numValues; i++) {
-      valueBuffer[i] = byteBuffer.getFloat();
-    }
-    return valueBuffer;
+    return 
ArraySerDeUtils.deserializeFloatArrayWithLength(context.getValue(docId));
   }
 
   @Override
   public int getDoubleMV(int docId, double[] valueBuffer, 
VarByteChunkForwardIndexReaderV4.ReaderContext context) {
-    ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId));
-    int numValues = byteBuffer.getInt();
-    for (int i = 0; i < numValues; i++) {
-      valueBuffer[i] = byteBuffer.getDouble();
-    }
-    return numValues;
+    return 
ArraySerDeUtils.deserializeDoubleArrayWithLength(context.getValue(docId), 
valueBuffer);
   }
 
   @Override
   public double[] getDoubleMV(int docId, 
VarByteChunkForwardIndexReaderV4.ReaderContext context) {
-    ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId));
-    int numValues = byteBuffer.getInt();
-    double[] valueBuffer = new double[numValues];
-    for (int i = 0; i < numValues; i++) {
-      valueBuffer[i] = byteBuffer.getFloat();
-    }
-    return valueBuffer;
+    return 
ArraySerDeUtils.deserializeDoubleArrayWithLength(context.getValue(docId));
   }
 
   @Override
   public int getStringMV(int docId, String[] valueBuffer, 
VarByteChunkForwardIndexReaderV4.ReaderContext context) {
-    byte[] bytes = context.getValue(docId);
-    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
-    int numValues = byteBuffer.getInt();
-    int offset = (numValues + 1) * Integer.BYTES;
-    for (int i = 0; i < numValues; i++) {
-      int length = byteBuffer.getInt((i + 1) * Integer.BYTES);
-      valueBuffer[i] = new String(bytes, offset, length, 
StandardCharsets.UTF_8);
-      offset += length;
-    }
-    return numValues;
+    return ArraySerDeUtils.deserializeStringArray(context.getValue(docId), 
valueBuffer);
   }
 
   @Override
   public String[] getStringMV(int docId, 
VarByteChunkForwardIndexReaderV4.ReaderContext context) {
-    byte[] bytes = context.getValue(docId);
-    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
-    int numValues = byteBuffer.getInt();
-    int offset = (numValues + 1) * Integer.BYTES;
-    String[] valueBuffer = new String[numValues];
-    for (int i = 0; i < numValues; i++) {
-      int length = byteBuffer.getInt((i + 1) * Integer.BYTES);
-      valueBuffer[i] = new String(bytes, offset, length, 
StandardCharsets.UTF_8);
-      offset += length;
-    }
-    return valueBuffer;
+    return ArraySerDeUtils.deserializeStringArray(context.getValue(docId));
   }
 
   @Override
   public int getBytesMV(int docId, byte[][] valueBuffer, 
VarByteChunkForwardIndexReaderV4.ReaderContext context) {
-    ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId));
-    int numValues = byteBuffer.getInt();
-    byteBuffer.position((numValues + 1) * Integer.BYTES);
-    for (int i = 0; i < numValues; i++) {
-      int length = byteBuffer.getInt((i + 1) * Integer.BYTES);
-      byte[] bytes = new byte[length];
-      byteBuffer.get(bytes, 0, length);
-      valueBuffer[i] = bytes;
-    }
-    return numValues;
+    return ArraySerDeUtils.deserializeBytesArray(context.getValue(docId), 
valueBuffer);
   }
 
   @Override
   public byte[][] getBytesMV(int docId, 
VarByteChunkForwardIndexReaderV4.ReaderContext context) {
-    ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId));
-    int numValues = byteBuffer.getInt();
-    byteBuffer.position((numValues + 1) * Integer.BYTES);
-    byte[][] valueBuffer = new byte[numValues][];
-    for (int i = 0; i < numValues; i++) {
-      int length = byteBuffer.getInt((i + 1) * Integer.BYTES);
-      byte[] bytes = new byte[length];
-      byteBuffer.get(bytes, 0, length);
-      valueBuffer[i] = bytes;
-    }
-    return valueBuffer;
+    return ArraySerDeUtils.deserializeBytesArray(context.getValue(docId));
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java
index 1d133bb896..bd7dc5dbb7 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java
@@ -19,10 +19,10 @@
 package org.apache.pinot.segment.local.segment.index.readers.forward;
 
 import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
 import java.util.List;
 import javax.annotation.Nullable;
 import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriter;
+import org.apache.pinot.segment.local.utils.ArraySerDeUtils;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 
@@ -31,6 +31,8 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
  * Chunk-based multi-value raw (non-dictionary-encoded) forward index reader 
for values of variable length data type
  * (STRING, BYTES).
  * <p>For data layout, please refer to the documentation for {@link 
VarByteChunkForwardIndexWriter}
+ *
+ * TODO: Consider reading directly from sliced ByteBuffer instead of copying 
to byte[] first
  */
 public final class VarByteChunkMVForwardIndexReader extends 
BaseChunkForwardIndexReader {
   private static final int ROW_OFFSET_SIZE = 
VarByteChunkForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
@@ -53,105 +55,28 @@ public final class VarByteChunkMVForwardIndexReader 
extends BaseChunkForwardInde
   }
 
   @Override
-  public int getStringMV(final int docId, final String[] valueBuffer, final 
ChunkReaderContext context) {
-    byte[] compressedBytes;
-    if (_isCompressed) {
-      compressedBytes = getBytesCompressed(docId, context);
-    } else {
-      compressedBytes = getBytesUncompressed(docId);
-    }
-    ByteBuffer byteBuffer = ByteBuffer.wrap(compressedBytes);
-    int numValues = byteBuffer.getInt();
-    int contentOffset = (numValues + 1) * Integer.BYTES;
-    for (int i = 0; i < numValues; i++) {
-      int length = byteBuffer.getInt((i + 1) * Integer.BYTES);
-      byte[] bytes = new byte[length];
-      byteBuffer.position(contentOffset);
-      byteBuffer.get(bytes, 0, length);
-      valueBuffer[i] = new String(bytes, StandardCharsets.UTF_8);
-      contentOffset += length;
-    }
-    return numValues;
+  public int getStringMV(int docId, String[] valueBuffer, ChunkReaderContext 
context) {
+    return ArraySerDeUtils.deserializeStringArray(getBytes(docId, context), 
valueBuffer);
   }
 
   @Override
-  public String[] getStringMV(final int docId, final ChunkReaderContext 
context) {
-    byte[] compressedBytes;
-    if (_isCompressed) {
-      compressedBytes = getBytesCompressed(docId, context);
-    } else {
-      compressedBytes = getBytesUncompressed(docId);
-    }
-    ByteBuffer byteBuffer = ByteBuffer.wrap(compressedBytes);
-    int numValues = byteBuffer.getInt();
-    int contentOffset = (numValues + 1) * Integer.BYTES;
-    String[] valueBuffer = new String[numValues];
-    for (int i = 0; i < numValues; i++) {
-      int length = byteBuffer.getInt((i + 1) * Integer.BYTES);
-      byte[] bytes = new byte[length];
-      byteBuffer.position(contentOffset);
-      byteBuffer.get(bytes, 0, length);
-      valueBuffer[i] = new String(bytes, StandardCharsets.UTF_8);
-      contentOffset += length;
-    }
-    return valueBuffer;
+  public String[] getStringMV(int docId, ChunkReaderContext context) {
+    return ArraySerDeUtils.deserializeStringArray(getBytes(docId, context));
   }
 
   @Override
-  public int getBytesMV(final int docId, final byte[][] valueBuffer, final 
ChunkReaderContext context) {
-    byte[] compressedBytes;
-    if (_isCompressed) {
-      compressedBytes = getBytesCompressed(docId, context);
-    } else {
-      compressedBytes = getBytesUncompressed(docId);
-    }
-    ByteBuffer byteBuffer = ByteBuffer.wrap(compressedBytes);
-    int numValues = byteBuffer.getInt();
-    int contentOffset = (numValues + 1) * Integer.BYTES;
-    for (int i = 0; i < numValues; i++) {
-      int length = byteBuffer.getInt((i + 1) * Integer.BYTES);
-      byte[] bytes = new byte[length];
-      byteBuffer.position(contentOffset);
-      byteBuffer.get(bytes, 0, length);
-      valueBuffer[i] = bytes;
-      contentOffset += length;
-    }
-    return numValues;
+  public int getBytesMV(int docId, byte[][] valueBuffer, ChunkReaderContext 
context) {
+    return ArraySerDeUtils.deserializeBytesArray(getBytes(docId, context), 
valueBuffer);
   }
 
   @Override
-  public byte[][] getBytesMV(final int docId, final ChunkReaderContext 
context) {
-    byte[] compressedBytes;
-    if (_isCompressed) {
-      compressedBytes = getBytesCompressed(docId, context);
-    } else {
-      compressedBytes = getBytesUncompressed(docId);
-    }
-    ByteBuffer byteBuffer = ByteBuffer.wrap(compressedBytes);
-    int numValues = byteBuffer.getInt();
-    int contentOffset = (numValues + 1) * Integer.BYTES;
-    byte[][] valueBuffer = new byte[numValues][];
-    for (int i = 0; i < numValues; i++) {
-      int length = byteBuffer.getInt((i + 1) * Integer.BYTES);
-      byte[] bytes = new byte[length];
-      byteBuffer.position(contentOffset);
-      byteBuffer.get(bytes, 0, length);
-      valueBuffer[i] = bytes;
-      contentOffset += length;
-    }
-    return valueBuffer;
+  public byte[][] getBytesMV(int docId, ChunkReaderContext context) {
+    return ArraySerDeUtils.deserializeBytesArray(getBytes(docId, context));
   }
 
   @Override
-  public int getNumValuesMV(final int docId, final ChunkReaderContext context) 
{
-    byte[] compressedBytes;
-    if (_isCompressed) {
-      compressedBytes = getBytesCompressed(docId, context);
-    } else {
-      compressedBytes = getBytesUncompressed(docId);
-    }
-    ByteBuffer byteBuffer = ByteBuffer.wrap(compressedBytes);
-    return byteBuffer.getInt();
+  public int getNumValuesMV(int docId, ChunkReaderContext context) {
+    return ByteBuffer.wrap(getBytes(docId, context)).getInt();
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ArraySerDeUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ArraySerDeUtils.java
new file mode 100644
index 0000000000..58238a33e0
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ArraySerDeUtils.java
@@ -0,0 +1,363 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils;
+
+import java.nio.ByteBuffer;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+
+public class ArraySerDeUtils {
+  private ArraySerDeUtils() {
+  }
+
+  public static byte[] serializeIntArrayWithLength(int[] values) {
+    byte[] bytes = new byte[Integer.BYTES + values.length * Integer.BYTES];
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    byteBuffer.putInt(values.length);
+    writeValues(byteBuffer, values);
+    return bytes;
+  }
+
+  public static int[] deserializeIntArrayWithLength(byte[] bytes) {
+    return deserializeIntArrayWithLength(ByteBuffer.wrap(bytes));
+  }
+
+  public static int[] deserializeIntArrayWithLength(ByteBuffer byteBuffer) {
+    int length = byteBuffer.getInt();
+    int[] values = new int[length];
+    readValues(byteBuffer, values, length);
+    return values;
+  }
+
+  public static int deserializeIntArrayWithLength(byte[] bytes, int[] values) {
+    return deserializeIntArrayWithLength(ByteBuffer.wrap(bytes), values);
+  }
+
+  public static int deserializeIntArrayWithLength(ByteBuffer byteBuffer, int[] 
values) {
+    int length = byteBuffer.getInt();
+    readValues(byteBuffer, values, length);
+    return length;
+  }
+
+  public static byte[] serializeIntArrayWithoutLength(int[] values) {
+    byte[] bytes = new byte[values.length * Integer.BYTES];
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    writeValues(byteBuffer, values);
+    return bytes;
+  }
+
+  public static int[] deserializeIntArrayWithoutLength(byte[] bytes) {
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    int length = bytes.length / Integer.BYTES;
+    int[] values = new int[length];
+    readValues(byteBuffer, values, length);
+    return values;
+  }
+
+  public static int deserializeIntArrayWithoutLength(byte[] bytes, int[] 
values) {
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    int length = bytes.length / Integer.BYTES;
+    readValues(byteBuffer, values, length);
+    return length;
+  }
+
+  private static void writeValues(ByteBuffer byteBuffer, int[] values) {
+    for (int value : values) {
+      byteBuffer.putInt(value);
+    }
+  }
+
+  private static void readValues(ByteBuffer byteBuffer, int[] values, int 
length) {
+    for (int i = 0; i < length; i++) {
+      values[i] = byteBuffer.getInt();
+    }
+  }
+
+  public static byte[] serializeLongArrayWithLength(long[] values) {
+    byte[] bytes = new byte[Integer.BYTES + values.length * Long.BYTES];
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    byteBuffer.putInt(values.length);
+    writeValues(byteBuffer, values);
+    return bytes;
+  }
+
+  public static long[] deserializeLongArrayWithLength(byte[] bytes) {
+    return deserializeLongArrayWithLength(ByteBuffer.wrap(bytes));
+  }
+
+  public static long[] deserializeLongArrayWithLength(ByteBuffer byteBuffer) {
+    int length = byteBuffer.getInt();
+    long[] values = new long[length];
+    readValues(byteBuffer, values, length);
+    return values;
+  }
+
+  public static int deserializeLongArrayWithLength(byte[] bytes, long[] 
values) {
+    return deserializeLongArrayWithLength(ByteBuffer.wrap(bytes), values);
+  }
+
+  public static int deserializeLongArrayWithLength(ByteBuffer byteBuffer, 
long[] values) {
+    int length = byteBuffer.getInt();
+    readValues(byteBuffer, values, length);
+    return length;
+  }
+
+  public static byte[] serializeLongArrayWithoutLength(long[] values) {
+    byte[] bytes = new byte[values.length * Long.BYTES];
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    writeValues(byteBuffer, values);
+    return bytes;
+  }
+
+  public static long[] deserializeLongArrayWithoutLength(byte[] bytes) {
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    int length = bytes.length / Long.BYTES;
+    long[] values = new long[length];
+    readValues(byteBuffer, values, length);
+    return values;
+  }
+
+  public static int deserializeLongArrayWithoutLength(byte[] bytes, long[] 
values) {
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    int length = bytes.length / Long.BYTES;
+    readValues(byteBuffer, values, length);
+    return length;
+  }
+
+  private static void writeValues(ByteBuffer byteBuffer, long[] values) {
+    for (long value : values) {
+      byteBuffer.putLong(value);
+    }
+  }
+
+  private static void readValues(ByteBuffer byteBuffer, long[] values, int 
length) {
+    for (int i = 0; i < length; i++) {
+      values[i] = byteBuffer.getLong();
+    }
+  }
+
+  public static byte[] serializeFloatArrayWithLength(float[] values) {
+    byte[] bytes = new byte[Integer.BYTES + values.length * Float.BYTES];
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    byteBuffer.putInt(values.length);
+    writeValues(byteBuffer, values);
+    return bytes;
+  }
+
+  public static float[] deserializeFloatArrayWithLength(byte[] bytes) {
+    return deserializeFloatArrayWithLength(ByteBuffer.wrap(bytes));
+  }
+
+  public static float[] deserializeFloatArrayWithLength(ByteBuffer byteBuffer) 
{
+    int length = byteBuffer.getInt();
+    float[] values = new float[length];
+    readValues(byteBuffer, values, length);
+    return values;
+  }
+
+  public static int deserializeFloatArrayWithLength(byte[] bytes, float[] 
values) {
+    return deserializeFloatArrayWithLength(ByteBuffer.wrap(bytes), values);
+  }
+
+  public static int deserializeFloatArrayWithLength(ByteBuffer byteBuffer, 
float[] values) {
+    int length = byteBuffer.getInt();
+    readValues(byteBuffer, values, length);
+    return length;
+  }
+
+  public static byte[] serializeFloatArrayWithoutLength(float[] values) {
+    byte[] bytes = new byte[values.length * Float.BYTES];
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    writeValues(byteBuffer, values);
+    return bytes;
+  }
+
+  public static float[] deserializeFloatArrayWithoutLength(byte[] bytes) {
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    int length = bytes.length / Float.BYTES;
+    float[] values = new float[length];
+    readValues(byteBuffer, values, length);
+    return values;
+  }
+
+  public static int deserializeFloatArrayWithoutLength(byte[] bytes, float[] 
values) {
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    int length = bytes.length / Float.BYTES;
+    readValues(byteBuffer, values, length);
+    return length;
+  }
+
+  private static void writeValues(ByteBuffer byteBuffer, float[] values) {
+    for (float value : values) {
+      byteBuffer.putFloat(value);
+    }
+  }
+
+  private static void readValues(ByteBuffer byteBuffer, float[] values, int 
length) {
+    for (int i = 0; i < length; i++) {
+      values[i] = byteBuffer.getFloat();
+    }
+  }
+
+  public static byte[] serializeDoubleArrayWithLength(double[] values) {
+    byte[] bytes = new byte[Integer.BYTES + values.length * Double.BYTES];
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    byteBuffer.putInt(values.length);
+    writeValues(byteBuffer, values);
+    return bytes;
+  }
+
+  public static double[] deserializeDoubleArrayWithLength(byte[] bytes) {
+    return deserializeDoubleArrayWithLength(ByteBuffer.wrap(bytes));
+  }
+
+  public static double[] deserializeDoubleArrayWithLength(ByteBuffer 
byteBuffer) {
+    int length = byteBuffer.getInt();
+    double[] values = new double[length];
+    readValues(byteBuffer, values, length);
+    return values;
+  }
+
+  public static int deserializeDoubleArrayWithLength(byte[] bytes, double[] 
values) {
+    return deserializeDoubleArrayWithLength(ByteBuffer.wrap(bytes), values);
+  }
+
+  public static int deserializeDoubleArrayWithLength(ByteBuffer byteBuffer, 
double[] values) {
+    int length = byteBuffer.getInt();
+    readValues(byteBuffer, values, length);
+    return length;
+  }
+
+  public static byte[] serializeDoubleArrayWithoutLength(double[] values) {
+    byte[] bytes = new byte[values.length * Double.BYTES];
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    writeValues(byteBuffer, values);
+    return bytes;
+  }
+
+  public static double[] deserializeDoubleArrayWithoutLength(byte[] bytes) {
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    int length = bytes.length / Double.BYTES;
+    double[] values = new double[length];
+    readValues(byteBuffer, values, length);
+    return values;
+  }
+
+  public static int deserializeDoubleArrayWithoutLength(byte[] bytes, double[] 
values) {
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    int length = bytes.length / Double.BYTES;
+    readValues(byteBuffer, values, length);
+    return length;
+  }
+
+  private static void writeValues(ByteBuffer byteBuffer, double[] values) {
+    for (double value : values) {
+      byteBuffer.putDouble(value);
+    }
+  }
+
+  private static void readValues(ByteBuffer byteBuffer, double[] values, int 
length) {
+    for (int i = 0; i < length; i++) {
+      values[i] = byteBuffer.getDouble();
+    }
+  }
+
+  public static byte[] serializeStringArray(String[] values) {
+    // Format: 
[numValues][length1][length2]...[lengthN][value1][value2]...[valueN]
+    int headerSize = Integer.BYTES + Integer.BYTES * values.length;
+    int size = headerSize;
+    byte[][] stringBytes = new byte[values.length][];
+    for (int i = 0; i < values.length; i++) {
+      stringBytes[i] = values[i].getBytes(UTF_8);
+      size += stringBytes[i].length;
+    }
+    return writeValues(stringBytes, size, headerSize);
+  }
+
+  public static String[] deserializeStringArray(byte[] bytes) {
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    int numValues = byteBuffer.getInt();
+    String[] values = new String[numValues];
+    readValues(bytes, byteBuffer, values, numValues);
+    return values;
+  }
+
+  public static int deserializeStringArray(byte[] bytes, String[] values) {
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    int numValues = byteBuffer.getInt();
+    readValues(bytes, byteBuffer, values, numValues);
+    return numValues;
+  }
+
+  public static byte[] serializeBytesArray(byte[][] values) {
+    // Format: 
[numValues][length1][length2]...[lengthN][value1][value2]...[valueN]
+    int headerSize = Integer.BYTES + Integer.BYTES * values.length;
+    int size = headerSize;
+    for (byte[] value : values) {
+      size += value.length;
+    }
+    return writeValues(values, size, headerSize);
+  }
+
+  public static byte[][] deserializeBytesArray(byte[] bytes) {
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    int numValues = byteBuffer.getInt();
+    byte[][] values = new byte[numValues][];
+    readValues(byteBuffer, values, numValues);
+    return values;
+  }
+
+  public static int deserializeBytesArray(byte[] bytes, byte[][] values) {
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    int numValues = byteBuffer.getInt();
+    readValues(byteBuffer, values, numValues);
+    return numValues;
+  }
+
+  private static byte[] writeValues(byte[][] values, int size, int headerSize) 
{
+    byte[] bytes = new byte[size];
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    byteBuffer.putInt(values.length);
+    byteBuffer.position(headerSize);
+    for (int i = 0; i < values.length; i++) {
+      byteBuffer.putInt((i + 1) * Integer.BYTES, values[i].length);
+      byteBuffer.put(values[i]);
+    }
+    return bytes;
+  }
+
+  private static void readValues(byte[] bytes, ByteBuffer byteBuffer, String[] 
values, int numValues) {
+    int offset = (numValues + 1) * Integer.BYTES;
+    for (int i = 0; i < numValues; i++) {
+      int length = byteBuffer.getInt();
+      values[i] = new String(bytes, offset, length, UTF_8);
+      offset += length;
+    }
+  }
+
+  private static void readValues(ByteBuffer byteBuffer, byte[][] values, int 
numValues) {
+    byteBuffer.position((numValues + 1) * Integer.BYTES);
+    for (int i = 0; i < numValues; i++) {
+      int length = byteBuffer.getInt((i + 1) * Integer.BYTES);
+      values[i] = new byte[length];
+      byteBuffer.get(values[i]);
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to