This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new cf8fd93759 MultiValue VarByte V4 index writer and consolidate V4 reader for all types (#11674) cf8fd93759 is described below commit cf8fd93759ebfc521141190196922aa26787c7e1 Author: Saurabh Dubey <saurabhd...@gmail.com> AuthorDate: Thu Sep 28 23:22:41 2023 +0530 MultiValue VarByte V4 index writer and consolidate V4 reader for all types (#11674) --- .../pinot/perf/BenchmarkRawForwardIndexReader.java | 8 +- .../impl/VarByteChunkForwardIndexWriter.java | 6 +- .../impl/VarByteChunkForwardIndexWriterV4.java | 48 +++++++ .../local/io/writer/impl/VarByteChunkWriter.java | 4 + .../impl/fwd/MultiValueVarByteRawIndexCreator.java | 17 +-- .../index/forward/ForwardIndexReaderFactory.java | 32 +++-- ....java => VarByteChunkForwardIndexReaderV4.java} | 159 ++++++++++++++++++++- .../impl/VarByteChunkSVForwardIndexWriterTest.java | 4 +- .../MultiValueFixedByteRawIndexCreatorTest.java | 59 ++++---- .../MultiValueVarByteRawIndexCreatorTest.java | 43 +++--- .../segment/index/creator/VarByteChunkV4Test.java | 8 +- 11 files changed, 300 insertions(+), 88 deletions(-) diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexReader.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexReader.java index 6da32f2906..31c106cd03 100644 --- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexReader.java +++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexReader.java @@ -28,8 +28,8 @@ import org.apache.commons.io.FileUtils; import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriter; import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4; import org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext; +import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkForwardIndexReaderV4; import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader; -import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReaderV4; import org.apache.pinot.segment.spi.compression.ChunkCompressionType; import org.apache.pinot.segment.spi.memory.PinotDataBuffer; import org.apache.pinot.spi.data.FieldSpec; @@ -184,9 +184,9 @@ public class BenchmarkRawForwardIndexReader { public void readV4(V4State state, Blackhole bh) throws IOException { try (PinotDataBuffer buffer = PinotDataBuffer.loadBigEndianFile(state._file); - VarByteChunkSVForwardIndexReaderV4 reader = - new VarByteChunkSVForwardIndexReaderV4(buffer, FieldSpec.DataType.BYTES); - VarByteChunkSVForwardIndexReaderV4.ReaderContext context = reader.createContext()) { + VarByteChunkForwardIndexReaderV4 reader = + new VarByteChunkForwardIndexReaderV4(buffer, FieldSpec.DataType.BYTES, true); + VarByteChunkForwardIndexReaderV4.ReaderContext context = reader.createContext()) { for (int i = 0; i < state._records; i++) { bh.consume(reader.getBytes(i, context)); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriter.java index b6daaf7fe7..fadcce827e 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriter.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriter.java @@ -99,7 +99,8 @@ public class VarByteChunkForwardIndexWriter extends BaseChunkForwardIndexWriter // Note: some duplication is tolerated between these overloads for the sake of memory efficiency - public void putStrings(String[] values) { + @Override + public void putStringMV(String[] values) { // the entire String[] will be encoded as a single string, write the header here _chunkBuffer.putInt(_chunkHeaderOffset, _chunkDataOffSet); _chunkHeaderOffset += CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE; @@ -122,7 +123,8 @@ public class VarByteChunkForwardIndexWriter extends BaseChunkForwardIndexWriter writeChunkIfNecessary(); } - public void putByteArrays(byte[][] values) { + @Override + public void putBytesMV(byte[][] values) { // the entire byte[][] will be encoded as a single string, write the header here _chunkBuffer.putInt(_chunkHeaderOffset, _chunkDataOffSet); _chunkHeaderOffset += CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java index d70ed2dcbc..35c61f35f5 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java @@ -37,6 +37,8 @@ import org.apache.pinot.spi.utils.BigDecimalUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static java.nio.charset.StandardCharsets.UTF_8; + /** * Chunk-based raw (non-dictionary-encoded) forward index writer where each chunk contains variable number of docs, and @@ -142,6 +144,52 @@ public class VarByteChunkForwardIndexWriterV4 implements VarByteChunkWriter { _nextDocId++; } + @Override + public void putStringMV(String[] values) { + // num values + length of each value + int headerSize = Integer.BYTES + Integer.BYTES * values.length; + int size = headerSize; + byte[][] stringBytes = new byte[values.length][]; + for (int i = 0; i < values.length; i++) { + stringBytes[i] = values[i].getBytes(UTF_8); + size += stringBytes[i].length; + } + + // Format : [numValues][length1][length2]...[lengthN][value1][value2]...[valueN] + byte[] serializedBytes = new byte[size]; + ByteBuffer byteBuffer = ByteBuffer.wrap(serializedBytes); + byteBuffer.putInt(values.length); + byteBuffer.position(headerSize); + for (int i = 0; i < values.length; i++) { + byteBuffer.putInt((i + 1) * Integer.BYTES, stringBytes[i].length); + byteBuffer.put(stringBytes[i]); + } + + putBytes(serializedBytes); + } + + @Override + public void putBytesMV(byte[][] values) { + // num values + length of each value + int headerSize = Integer.BYTES + Integer.BYTES * values.length; + int size = headerSize; + for (byte[] value : values) { + size += value.length; + } + + // Format : [numValues][length1][length2]...[lengthN][bytes1][bytes2]...[bytesN] + byte[] serializedBytes = new byte[size]; + ByteBuffer byteBuffer = ByteBuffer.wrap(serializedBytes); + byteBuffer.putInt(values.length); + byteBuffer.position(headerSize); + for (int i = 0; i < values.length; i++) { + byteBuffer.putInt((i + 1) * Integer.BYTES, values[i].length); + byteBuffer.put(values[i]); + } + + putBytes(serializedBytes); + } + private void writeHugeChunk(byte[] bytes) { // huge values where the bytes and their length prefix don't fit in to the remainder of the buffer after the prefix // for the number of documents in a regular chunk are written as a single value without metadata, and these chunks diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkWriter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkWriter.java index 1e6dbc2837..bf3537d67c 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkWriter.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkWriter.java @@ -28,4 +28,8 @@ public interface VarByteChunkWriter extends Closeable { void putString(String value); void putBytes(byte[] value); + + void putStringMV(String[] values); + + void putBytesMV(byte[][] values); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java index 85dba85ab9..0c41ce2c6e 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java @@ -23,6 +23,7 @@ import java.io.File; import java.io.IOException; import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriter; import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4; +import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkWriter; import org.apache.pinot.segment.spi.V1Constants.Indexes; import org.apache.pinot.segment.spi.compression.ChunkCompressionType; import org.apache.pinot.segment.spi.index.ForwardIndexConfig; @@ -37,7 +38,7 @@ import org.apache.pinot.spi.data.FieldSpec.DataType; public class MultiValueVarByteRawIndexCreator implements ForwardIndexCreator { private static final int TARGET_MAX_CHUNK_SIZE = 1024 * 1024; - private final VarByteChunkForwardIndexWriter _indexWriter; + private final VarByteChunkWriter _indexWriter; private final DataType _valueType; /** @@ -80,13 +81,9 @@ public class MultiValueVarByteRawIndexCreator implements ForwardIndexCreator { int numDocsPerChunk = Math.max( TARGET_MAX_CHUNK_SIZE / (totalMaxLength + VarByteChunkForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE), 1); - // TODO: Support V4 MV reader - // Currently fall back to V2 for backward compatible - if (writerVersion == VarByteChunkForwardIndexWriterV4.VERSION) { - writerVersion = 2; - } - _indexWriter = new VarByteChunkForwardIndexWriter(file, compressionType, totalDocs, numDocsPerChunk, totalMaxLength, - writerVersion); + _indexWriter = writerVersion < VarByteChunkForwardIndexWriterV4.VERSION ? new VarByteChunkForwardIndexWriter(file, + compressionType, totalDocs, numDocsPerChunk, totalMaxLength, writerVersion) + : new VarByteChunkForwardIndexWriterV4(file, compressionType, TARGET_MAX_CHUNK_SIZE); _valueType = valueType; } @@ -107,12 +104,12 @@ public class MultiValueVarByteRawIndexCreator implements ForwardIndexCreator { @Override public void putStringMV(final String[] values) { - _indexWriter.putStrings(values); + _indexWriter.putStringMV(values); } @Override public void putBytesMV(final byte[][] values) { - _indexWriter.putByteArrays(values); + _indexWriter.putBytesMV(values); } @Override diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java index ecaaf875a5..6de6e1294b 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java @@ -25,9 +25,9 @@ import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitSVFo import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkMVForwardIndexReader; import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader; import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBytePower2ChunkSVForwardIndexReader; +import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkForwardIndexReaderV4; import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader; import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader; -import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReaderV4; import org.apache.pinot.segment.local.segment.index.readers.sorted.SortedIndexReaderImpl; import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.index.ForwardIndexConfig; @@ -75,19 +75,31 @@ public class ForwardIndexReaderFactory extends IndexReaderFactory.Default<Forwar public static ForwardIndexReader createRawIndexReader(PinotDataBuffer dataBuffer, DataType storedType, boolean isSingleValue) { int version = dataBuffer.getInt(0); + if (isSingleValue && storedType.isFixedWidth()) { + return version == FixedBytePower2ChunkSVForwardIndexReader.VERSION + ? new FixedBytePower2ChunkSVForwardIndexReader(dataBuffer, storedType) + : new FixedByteChunkSVForwardIndexReader(dataBuffer, storedType); + } + + if (version == VarByteChunkForwardIndexWriterV4.VERSION) { + // V4 reader is common for sv var byte, mv fixed byte and mv var byte + return new VarByteChunkForwardIndexReaderV4(dataBuffer, storedType, isSingleValue); + } else { + return createNonV4RawIndexReader(dataBuffer, storedType, isSingleValue); + } + } + + private static ForwardIndexReader createNonV4RawIndexReader(PinotDataBuffer dataBuffer, DataType storedType, + boolean isSingleValue) { + // Only reach here if SV + raw + var byte + non v4 or MV + non v4 if (isSingleValue) { + return new VarByteChunkSVForwardIndexReader(dataBuffer, storedType); + } else { if (storedType.isFixedWidth()) { - return version == FixedBytePower2ChunkSVForwardIndexReader.VERSION - ? new FixedBytePower2ChunkSVForwardIndexReader(dataBuffer, storedType) - : new FixedByteChunkSVForwardIndexReader(dataBuffer, storedType); + return new FixedByteChunkMVForwardIndexReader(dataBuffer, storedType); } else { - return version == VarByteChunkForwardIndexWriterV4.VERSION ? new VarByteChunkSVForwardIndexReaderV4(dataBuffer, - storedType) : new VarByteChunkSVForwardIndexReader(dataBuffer, storedType); + return new VarByteChunkMVForwardIndexReader(dataBuffer, storedType); } - } else { - // TODO: Support V4 MV reader - return storedType.isFixedWidth() ? new FixedByteChunkMVForwardIndexReader(dataBuffer, storedType) - : new VarByteChunkMVForwardIndexReader(dataBuffer, storedType); } } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReaderV4.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java similarity index 65% rename from pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReaderV4.java rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java index c1d842b23c..4858e790cb 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReaderV4.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java @@ -39,13 +39,13 @@ import org.slf4j.LoggerFactory; /** - * Chunk-based single-value raw (non-dictionary-encoded) forward index reader for values of variable length data type - * (BIG_DECIMAL, STRING, BYTES). + * Chunk-based raw (non-dictionary-encoded) forward index reader for values of SV variable length data types + * (BIG_DECIMAL, STRING, BYTES), MV fixed length and MV variable length data types. * <p>For data layout, please refer to the documentation for {@link VarByteChunkForwardIndexWriterV4} */ -public class VarByteChunkSVForwardIndexReaderV4 - implements ForwardIndexReader<VarByteChunkSVForwardIndexReaderV4.ReaderContext> { - private static final Logger LOGGER = LoggerFactory.getLogger(VarByteChunkSVForwardIndexReaderV4.class); +public class VarByteChunkForwardIndexReaderV4 + implements ForwardIndexReader<VarByteChunkForwardIndexReaderV4.ReaderContext> { + private static final Logger LOGGER = LoggerFactory.getLogger(VarByteChunkForwardIndexReaderV4.class); private static final int METADATA_ENTRY_SIZE = 8; private final FieldSpec.DataType _storedType; @@ -55,8 +55,10 @@ public class VarByteChunkSVForwardIndexReaderV4 private final PinotDataBuffer _metadata; private final PinotDataBuffer _chunks; + private final boolean _isSingleValue; - public VarByteChunkSVForwardIndexReaderV4(PinotDataBuffer dataBuffer, FieldSpec.DataType storedType) { + public VarByteChunkForwardIndexReaderV4(PinotDataBuffer dataBuffer, FieldSpec.DataType storedType, + boolean isSingleValue) { int version = dataBuffer.getInt(0); Preconditions.checkState(version == VarByteChunkForwardIndexWriterV4.VERSION, "Illegal index version: %s", version); _storedType = storedType; @@ -67,6 +69,7 @@ public class VarByteChunkSVForwardIndexReaderV4 // the file has a BE header for compatability reasons (version selection) but the content is LE _metadata = dataBuffer.view(16, chunksOffset, ByteOrder.LITTLE_ENDIAN); _chunks = dataBuffer.view(chunksOffset, dataBuffer.size(), ByteOrder.LITTLE_ENDIAN); + _isSingleValue = isSingleValue; } @Override @@ -76,7 +79,7 @@ public class VarByteChunkSVForwardIndexReaderV4 @Override public boolean isSingleValue() { - return true; + return _isSingleValue; } @Override @@ -113,6 +116,148 @@ public class VarByteChunkSVForwardIndexReaderV4 return context.getValue(docId); } + @Override + public int getIntMV(int docId, int[] valueBuffer, VarByteChunkForwardIndexReaderV4.ReaderContext context) { + ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId)); + int numValues = byteBuffer.getInt(); + for (int i = 0; i < numValues; i++) { + valueBuffer[i] = byteBuffer.getInt(); + } + return numValues; + } + + @Override + public int[] getIntMV(int docId, VarByteChunkForwardIndexReaderV4.ReaderContext context) { + ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId)); + int numValues = byteBuffer.getInt(); + int[] valueBuffer = new int[numValues]; + for (int i = 0; i < numValues; i++) { + valueBuffer[i] = byteBuffer.getInt(); + } + return valueBuffer; + } + + @Override + public int getLongMV(int docId, long[] valueBuffer, VarByteChunkForwardIndexReaderV4.ReaderContext context) { + ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId)); + int numValues = byteBuffer.getInt(); + for (int i = 0; i < numValues; i++) { + valueBuffer[i] = byteBuffer.getLong(); + } + return numValues; + } + + @Override + public long[] getLongMV(int docId, VarByteChunkForwardIndexReaderV4.ReaderContext context) { + ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId)); + int numValues = byteBuffer.getInt(); + long[] valueBuffer = new long[numValues]; + for (int i = 0; i < numValues; i++) { + valueBuffer[i] = byteBuffer.getLong(); + } + return valueBuffer; + } + + @Override + public int getFloatMV(int docId, float[] valueBuffer, VarByteChunkForwardIndexReaderV4.ReaderContext context) { + ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId)); + int numValues = byteBuffer.getInt(); + for (int i = 0; i < numValues; i++) { + valueBuffer[i] = byteBuffer.getFloat(); + } + return numValues; + } + + @Override + public float[] getFloatMV(int docId, VarByteChunkForwardIndexReaderV4.ReaderContext context) { + ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId)); + int numValues = byteBuffer.getInt(); + float[] valueBuffer = new float[numValues]; + for (int i = 0; i < numValues; i++) { + valueBuffer[i] = byteBuffer.getFloat(); + } + return valueBuffer; + } + + @Override + public int getDoubleMV(int docId, double[] valueBuffer, VarByteChunkForwardIndexReaderV4.ReaderContext context) { + ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId)); + int numValues = byteBuffer.getInt(); + for (int i = 0; i < numValues; i++) { + valueBuffer[i] = byteBuffer.getDouble(); + } + return numValues; + } + + @Override + public double[] getDoubleMV(int docId, VarByteChunkForwardIndexReaderV4.ReaderContext context) { + ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId)); + int numValues = byteBuffer.getInt(); + double[] valueBuffer = new double[numValues]; + for (int i = 0; i < numValues; i++) { + valueBuffer[i] = byteBuffer.getFloat(); + } + return valueBuffer; + } + + @Override + public int getStringMV(int docId, String[] valueBuffer, VarByteChunkForwardIndexReaderV4.ReaderContext context) { + ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId)); + int numValues = byteBuffer.getInt(); + byteBuffer.position((numValues + 1) * Integer.BYTES); + for (int i = 0; i < numValues; i++) { + int length = byteBuffer.getInt((i + 1) * Integer.BYTES); + byte[] bytes = new byte[length]; + byteBuffer.get(bytes); + valueBuffer[i] = new String(bytes, StandardCharsets.UTF_8); + } + return numValues; + } + + @Override + public String[] getStringMV(int docId, VarByteChunkForwardIndexReaderV4.ReaderContext context) { + ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId)); + int numValues = byteBuffer.getInt(); + byteBuffer.position((numValues + 1) * Integer.BYTES); + String[] valueBuffer = new String[numValues]; + for (int i = 0; i < numValues; i++) { + int length = byteBuffer.getInt((i + 1) * Integer.BYTES); + byte[] bytes = new byte[length]; + byteBuffer.get(bytes); + valueBuffer[i] = new String(bytes, StandardCharsets.UTF_8); + } + return valueBuffer; + } + + @Override + public int getBytesMV(int docId, byte[][] valueBuffer, VarByteChunkForwardIndexReaderV4.ReaderContext context) { + ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId)); + int numValues = byteBuffer.getInt(); + byteBuffer.position((numValues + 1) * Integer.BYTES); + for (int i = 0; i < numValues; i++) { + int length = byteBuffer.getInt((i + 1) * Integer.BYTES); + byte[] bytes = new byte[length]; + byteBuffer.get(bytes, 0, length); + valueBuffer[i] = bytes; + } + return numValues; + } + + @Override + public byte[][] getBytesMV(int docId, VarByteChunkForwardIndexReaderV4.ReaderContext context) { + ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId)); + int numValues = byteBuffer.getInt(); + byteBuffer.position((numValues + 1) * Integer.BYTES); + byte[][] valueBuffer = new byte[numValues][]; + for (int i = 0; i < numValues; i++) { + int length = byteBuffer.getInt((i + 1) * Integer.BYTES); + byte[] bytes = new byte[length]; + byteBuffer.get(bytes, 0, length); + valueBuffer[i] = bytes; + } + return valueBuffer; + } + @Override public void close() throws IOException { diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriterTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriterTest.java index e3d89157db..5a46c9d3f3 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriterTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriterTest.java @@ -87,7 +87,7 @@ public class VarByteChunkSVForwardIndexWriterTest { try (VarByteChunkForwardIndexWriter writer = new VarByteChunkForwardIndexWriter(file, compressionType, totalDocs, numDocsPerChunk, maxEntryLengthInBytes, version)) { for (String[] array : arrays) { - writer.putStrings(array); + writer.putStringMV(array); } } try (VarByteChunkSVForwardIndexReader reader = new VarByteChunkSVForwardIndexReader( @@ -122,7 +122,7 @@ public class VarByteChunkSVForwardIndexWriterTest { try (VarByteChunkForwardIndexWriter writer = new VarByteChunkForwardIndexWriter(file, compressionType, totalDocs, numDocsPerChunk, maxEntryLengthInBytes, version)) { for (String[] array : arrays) { - writer.putByteArrays(Arrays.stream(array).map(str -> str.getBytes(UTF_8)).toArray(byte[][]::new)); + writer.putBytesMV(Arrays.stream(array).map(str -> str.getBytes(UTF_8)).toArray(byte[][]::new)); } } try (VarByteChunkSVForwardIndexReader reader = new VarByteChunkSVForwardIndexReader( diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java index f6ae39aa3b..a0cf8c7a97 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java @@ -29,11 +29,14 @@ import java.util.function.ToIntFunction; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.commons.io.FileUtils; +import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4; import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueFixedByteRawIndexCreator; -import org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext; import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkMVForwardIndexReader; +import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkForwardIndexReaderV4; import org.apache.pinot.segment.spi.V1Constants.Indexes; import org.apache.pinot.segment.spi.compression.ChunkCompressionType; +import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; +import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext; import org.apache.pinot.segment.spi.memory.PinotDataBuffer; import org.apache.pinot.spi.data.FieldSpec.DataType; import org.testng.Assert; @@ -52,7 +55,9 @@ public class MultiValueFixedByteRawIndexCreatorTest { @DataProvider(name = "compressionTypes") public Object[][] compressionTypes() { - return Arrays.stream(ChunkCompressionType.values()).map(ct -> new Object[]{ct}).toArray(Object[][]::new); + return Arrays.stream(ChunkCompressionType.values()) + .flatMap(ct -> IntStream.of(2, 4).boxed() + .map(writerVersion -> new Object[]{ct, writerVersion})).toArray(Object[][]::new); } @BeforeClass @@ -70,98 +75,98 @@ public class MultiValueFixedByteRawIndexCreatorTest { } @Test(dataProvider = "compressionTypes") - public void testMVInt(ChunkCompressionType compressionType) + public void testMVInt(ChunkCompressionType compressionType, int writerVersion) throws IOException { // This tests varying lengths of MV rows testMV(DataType.INT, ints(false), x -> x.length, int[]::new, MultiValueFixedByteRawIndexCreator::putIntMV, (reader, context, docId, buffer) -> { int length = reader.getIntMV(docId, buffer, context); return Arrays.copyOf(buffer, length); - }, compressionType); + }, compressionType, writerVersion); // This tests a fixed length of MV rows to ensure there are no BufferOverflowExceptions on filling up the chunk testMV(DataType.INT, ints(true), x -> x.length, int[]::new, MultiValueFixedByteRawIndexCreator::putIntMV, (reader, context, docId, buffer) -> { int length = reader.getIntMV(docId, buffer, context); return Arrays.copyOf(buffer, length); - }, compressionType); + }, compressionType, writerVersion); } @Test(dataProvider = "compressionTypes") - public void testMVLong(ChunkCompressionType compressionType) + public void testMVLong(ChunkCompressionType compressionType, int writerVersion) throws IOException { // This tests varying lengths of MV rows testMV(DataType.LONG, longs(false), x -> x.length, long[]::new, MultiValueFixedByteRawIndexCreator::putLongMV, (reader, context, docId, buffer) -> { int length = reader.getLongMV(docId, buffer, context); return Arrays.copyOf(buffer, length); - }, compressionType); + }, compressionType, writerVersion); // This tests a fixed length of MV rows to ensure there are no BufferOverflowExceptions on filling up the chunk testMV(DataType.LONG, longs(true), x -> x.length, long[]::new, MultiValueFixedByteRawIndexCreator::putLongMV, (reader, context, docId, buffer) -> { int length = reader.getLongMV(docId, buffer, context); return Arrays.copyOf(buffer, length); - }, compressionType); + }, compressionType, writerVersion); } @Test(dataProvider = "compressionTypes") - public void testMVFloat(ChunkCompressionType compressionType) + public void testMVFloat(ChunkCompressionType compressionType, int writerVersion) throws IOException { // This tests varying lengths of MV rows testMV(DataType.FLOAT, floats(false), x -> x.length, float[]::new, MultiValueFixedByteRawIndexCreator::putFloatMV, (reader, context, docId, buffer) -> { int length = reader.getFloatMV(docId, buffer, context); return Arrays.copyOf(buffer, length); - }, compressionType); + }, compressionType, writerVersion); // This tests a fixed length of MV rows to ensure there are no BufferOverflowExceptions on filling up the chunk testMV(DataType.FLOAT, floats(true), x -> x.length, float[]::new, MultiValueFixedByteRawIndexCreator::putFloatMV, (reader, context, docId, buffer) -> { int length = reader.getFloatMV(docId, buffer, context); return Arrays.copyOf(buffer, length); - }, compressionType); + }, compressionType, writerVersion); } @Test(dataProvider = "compressionTypes") - public void testMVDouble(ChunkCompressionType compressionType) + public void testMVDouble(ChunkCompressionType compressionType, int writerVersion) throws IOException { // This tests varying lengths of MV rows testMV(DataType.DOUBLE, doubles(false), x -> x.length, double[]::new, - MultiValueFixedByteRawIndexCreator::putDoubleMV, - (reader, context, docId, buffer) -> { + MultiValueFixedByteRawIndexCreator::putDoubleMV, (reader, context, docId, buffer) -> { int length = reader.getDoubleMV(docId, buffer, context); return Arrays.copyOf(buffer, length); - }, compressionType); + }, compressionType, writerVersion); // This tests a fixed length of MV rows to ensure there are no BufferOverflowExceptions on filling up the chunk testMV(DataType.DOUBLE, doubles(true), x -> x.length, double[]::new, - MultiValueFixedByteRawIndexCreator::putDoubleMV, - (reader, context, docId, buffer) -> { + MultiValueFixedByteRawIndexCreator::putDoubleMV, (reader, context, docId, buffer) -> { int length = reader.getDoubleMV(docId, buffer, context); return Arrays.copyOf(buffer, length); - }, compressionType); + }, compressionType, writerVersion); } public <T> void testMV(DataType dataType, List<T> inputs, ToIntFunction<T> sizeof, IntFunction<T> constructor, - Injector<T> injector, Extractor<T> extractor, ChunkCompressionType compressionType) + Injector<T> injector, Extractor<T> extractor, ChunkCompressionType compressionType, int writerVersion) throws IOException { String column = "testCol_" + dataType; int numDocs = inputs.size(); int maxElements = inputs.stream().mapToInt(sizeof).max().orElseThrow(RuntimeException::new); File file = new File(OUTPUT_DIR, column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION); file.delete(); - MultiValueFixedByteRawIndexCreator creator = new MultiValueFixedByteRawIndexCreator(new File(OUTPUT_DIR), - compressionType, column, numDocs, dataType, maxElements); + MultiValueFixedByteRawIndexCreator creator = + new MultiValueFixedByteRawIndexCreator(new File(OUTPUT_DIR), compressionType, column, numDocs, dataType, + maxElements, false, writerVersion); inputs.forEach(input -> injector.inject(creator, input)); creator.close(); //read - final PinotDataBuffer buffer = PinotDataBuffer - .mapFile(file, true, 0, file.length(), ByteOrder.BIG_ENDIAN, ""); - FixedByteChunkMVForwardIndexReader reader = new FixedByteChunkMVForwardIndexReader(buffer, - dataType.getStoredType()); - final ChunkReaderContext context = reader.createContext(); + final PinotDataBuffer buffer = PinotDataBuffer.mapFile(file, true, 0, file.length(), ByteOrder.BIG_ENDIAN, ""); + ForwardIndexReader reader = + writerVersion == VarByteChunkForwardIndexWriterV4.VERSION ? new VarByteChunkForwardIndexReaderV4(buffer, + dataType.getStoredType(), false) : new FixedByteChunkMVForwardIndexReader(buffer, dataType.getStoredType()); + + final ForwardIndexReaderContext context = reader.createContext(); T valueBuffer = constructor.apply(maxElements); for (int i = 0; i < numDocs; i++) { Assert.assertEquals(inputs.get(i), extractor.extract(reader, context, i, valueBuffer)); @@ -169,7 +174,7 @@ public class MultiValueFixedByteRawIndexCreatorTest { } interface Extractor<T> { - T extract(FixedByteChunkMVForwardIndexReader reader, ChunkReaderContext context, int offset, T buffer); + T extract(ForwardIndexReader reader, ForwardIndexReaderContext context, int offset, T buffer); } interface Injector<T> { diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java index 1006245586..32f4ff1a2c 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java @@ -30,10 +30,11 @@ import java.util.stream.IntStream; import java.util.stream.Stream; import org.apache.commons.io.FileUtils; import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator; -import org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext; -import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader; +import org.apache.pinot.segment.local.segment.index.forward.ForwardIndexReaderFactory; import org.apache.pinot.segment.spi.V1Constants.Indexes; import org.apache.pinot.segment.spi.compression.ChunkCompressionType; +import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; +import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext; import org.apache.pinot.segment.spi.memory.PinotDataBuffer; import org.apache.pinot.spi.data.FieldSpec.DataType; import org.testng.Assert; @@ -56,12 +57,12 @@ public class MultiValueVarByteRawIndexCreatorTest { @DataProvider public Object[][] params() { - return Arrays.stream(ChunkCompressionType.values()) - .flatMap(chunkCompressionType -> IntStream.of(10, 15, 20, 1000).boxed() - .flatMap(useFullSize -> Stream.of(true, false) - .flatMap(maxLength -> IntStream.range(1, 20).map(i -> i * 2 - 1).boxed() - .map(maxNumEntries -> new Object[]{chunkCompressionType, useFullSize, maxLength, - maxNumEntries})))) + return Arrays.stream(ChunkCompressionType.values()).flatMap(chunkCompressionType -> IntStream.of(2, 4).boxed() + .flatMap(writerVersion -> IntStream.of(10, 15, 20, 1000).boxed().flatMap(maxLength -> Stream.of(true, false) + .flatMap( + useFullSize -> IntStream.range(1, 20).map(i -> i * 2 - 1).boxed().map(maxNumEntries -> new Object[]{ + chunkCompressionType, useFullSize, writerVersion, maxLength, maxNumEntries + }))))) .toArray(Object[][]::new); } @@ -86,7 +87,8 @@ public class MultiValueVarByteRawIndexCreatorTest { } @Test(dataProvider = "params") - public void testMVString(ChunkCompressionType compressionType, int maxLength, boolean useFullSize, int maxNumEntries) + public void testMVString(ChunkCompressionType compressionType, boolean useFullSize, int writerVersion, int maxLength, + int maxNumEntries) throws IOException { String column = "testCol-" + UUID.randomUUID(); int numDocs = 1000; @@ -117,18 +119,16 @@ public class MultiValueVarByteRawIndexCreatorTest { inputs.add(values); } try (MultiValueVarByteRawIndexCreator creator = new MultiValueVarByteRawIndexCreator(OUTPUT_DIR, compressionType, - column, numDocs, DataType.STRING, maxTotalLength, maxElements)) { + column, numDocs, DataType.STRING, maxTotalLength, maxElements, writerVersion)) { for (String[] input : inputs) { creator.putStringMV(input); } } //read - final PinotDataBuffer buffer = PinotDataBuffer - .mapFile(file, true, 0, file.length(), ByteOrder.BIG_ENDIAN, ""); - VarByteChunkMVForwardIndexReader reader = new VarByteChunkMVForwardIndexReader(buffer, - DataType.STRING); - final ChunkReaderContext context = reader.createContext(); + final PinotDataBuffer buffer = PinotDataBuffer.mapFile(file, true, 0, file.length(), ByteOrder.BIG_ENDIAN, ""); + ForwardIndexReader reader = ForwardIndexReaderFactory.createRawIndexReader(buffer, DataType.STRING, false); + final ForwardIndexReaderContext context = reader.createContext(); String[] values = new String[maxElements]; for (int i = 0; i < numDocs; i++) { int length = reader.getStringMV(i, values, context); @@ -138,7 +138,8 @@ public class MultiValueVarByteRawIndexCreatorTest { } @Test(dataProvider = "params") - public void testMVBytes(ChunkCompressionType compressionType, int maxLength, boolean useFullSize, int maxNumEntries) + public void testMVBytes(ChunkCompressionType compressionType, boolean useFullSize, int writerVersion, int maxLength, + int maxNumEntries) throws IOException { String column = "testCol-" + UUID.randomUUID(); int numDocs = 1000; @@ -169,18 +170,16 @@ public class MultiValueVarByteRawIndexCreatorTest { inputs.add(values); } try (MultiValueVarByteRawIndexCreator creator = new MultiValueVarByteRawIndexCreator(OUTPUT_DIR, compressionType, - column, numDocs, DataType.STRING, maxTotalLength, maxElements)) { + column, numDocs, DataType.BYTES, writerVersion, maxTotalLength, maxElements)) { for (byte[][] input : inputs) { creator.putBytesMV(input); } } //read - final PinotDataBuffer buffer = PinotDataBuffer - .mapFile(file, true, 0, file.length(), ByteOrder.BIG_ENDIAN, ""); - VarByteChunkMVForwardIndexReader reader = new VarByteChunkMVForwardIndexReader(buffer, - DataType.BYTES); - final ChunkReaderContext context = reader.createContext(); + final PinotDataBuffer buffer = PinotDataBuffer.mapFile(file, true, 0, file.length(), ByteOrder.BIG_ENDIAN, ""); + ForwardIndexReader reader = ForwardIndexReaderFactory.createRawIndexReader(buffer, DataType.BYTES, false); + final ForwardIndexReaderContext context = reader.createContext(); byte[][] values = new byte[maxElements][]; for (int i = 0; i < numDocs; i++) { int length = reader.getBytesMV(i, values, context); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/VarByteChunkV4Test.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/VarByteChunkV4Test.java index ba59d057a7..88414b8b58 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/VarByteChunkV4Test.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/VarByteChunkV4Test.java @@ -31,7 +31,7 @@ import java.util.stream.IntStream; import java.util.stream.Stream; import org.apache.commons.io.FileUtils; import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4; -import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReaderV4; +import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkForwardIndexReaderV4; import org.apache.pinot.segment.spi.compression.ChunkCompressionType; import org.apache.pinot.segment.spi.memory.PinotDataBuffer; import org.apache.pinot.spi.data.FieldSpec; @@ -113,8 +113,8 @@ public class VarByteChunkV4Test { } } try (PinotDataBuffer buffer = PinotDataBuffer.mapReadOnlyBigEndianFile(_file)) { - try (VarByteChunkSVForwardIndexReaderV4 reader = new VarByteChunkSVForwardIndexReaderV4(buffer, dataType); - VarByteChunkSVForwardIndexReaderV4.ReaderContext context = reader.createContext()) { + try (VarByteChunkForwardIndexReaderV4 reader = new VarByteChunkForwardIndexReaderV4(buffer, dataType, + true); VarByteChunkForwardIndexReaderV4.ReaderContext context = reader.createContext()) { for (int i = 0; i < values.size(); i++) { assertEquals(read.read(reader, context, i), values.get(i)); } @@ -161,7 +161,7 @@ public class VarByteChunkV4Test { @FunctionalInterface interface Read<T> { - T read(VarByteChunkSVForwardIndexReaderV4 reader, VarByteChunkSVForwardIndexReaderV4.ReaderContext context, + T read(VarByteChunkForwardIndexReaderV4 reader, VarByteChunkForwardIndexReaderV4.ReaderContext context, int docId); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org