This is an automated email from the ASF dual-hosted git repository. siddteotia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new bed2e30 Power of 2 fixed size chunks (#7934) bed2e30 is described below commit bed2e307284ca2e43ff6e572be57c01f970645c3 Author: Richard Startin <rich...@startree.ai> AuthorDate: Thu Dec 23 18:48:12 2021 +0000 Power of 2 fixed size chunks (#7934) * power of 2 fixed-byte chunk reader * change method name --- .../BenchmarkFixedByteSVForwardIndexReader.java | 147 +++++++++++++++++++ .../pinot/perf/BenchmarkRawForwardIndexReader.java | 4 +- .../writer/impl/BaseChunkSVForwardIndexWriter.java | 7 +- .../impl/FixedByteChunkSVForwardIndexWriter.java | 13 +- .../impl/VarByteChunkSVForwardIndexWriter.java | 2 +- .../index/readers/DefaultIndexReaderProvider.java | 7 +- .../forward/BaseChunkSVForwardIndexReader.java | 49 +------ .../index/readers/forward/ChunkReaderContext.java | 66 +++++++++ .../FixedBytePower2ChunkSVForwardIndexReader.java | 114 +++++++++++++++ .../MultiValueFixedByteRawIndexCreatorTest.java | 2 +- .../MultiValueVarByteRawIndexCreatorTest.java | 2 +- .../segment/index/creator/RawIndexCreatorTest.java | 21 ++- .../forward/FixedByteChunkSVForwardIndexTest.java | 162 ++++++++++----------- .../forward/VarByteChunkSVForwardIndexTest.java | 12 +- 14 files changed, 449 insertions(+), 159 deletions(-) diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkFixedByteSVForwardIndexReader.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkFixedByteSVForwardIndexReader.java new file mode 100644 index 0000000..f5c7e37 --- /dev/null +++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkFixedByteSVForwardIndexReader.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.perf; + +import java.io.File; +import java.io.IOException; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.segment.local.io.writer.impl.FixedByteChunkSVForwardIndexWriter; +import org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext; +import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader; +import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBytePower2ChunkSVForwardIndexReader; +import org.apache.pinot.segment.spi.compression.ChunkCompressionType; +import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; +import org.apache.pinot.spi.data.FieldSpec; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.infra.Blackhole; + + +@State(Scope.Benchmark) +public class BenchmarkFixedByteSVForwardIndexReader { + + private static final File INDEX_DIR = + new File(FileUtils.getTempDirectory(), "BenchmarkFixedByteSVForwardIndexReader"); + + @Param("10000") + int _blockSize; + + @Param("1000") + int _numBlocks; + + private int[] _docIds; + private double[] _doubleBuffer; + private long[] _longBuffer; + private FixedByteChunkSVForwardIndexReader _compressedReader; + private FixedBytePower2ChunkSVForwardIndexReader _compressedPow2Reader; + + @Setup(Level.Trial) + public void setup() + throws IOException { + FileUtils.forceMkdir(INDEX_DIR); + File compressedIndexFile = new File(INDEX_DIR, UUID.randomUUID().toString()); + File pow2CompressedIndexFile = new File(INDEX_DIR, UUID.randomUUID().toString()); + _doubleBuffer = new double[_blockSize]; + _longBuffer = new long[_blockSize]; + try (FixedByteChunkSVForwardIndexWriter writer = new FixedByteChunkSVForwardIndexWriter(compressedIndexFile, + ChunkCompressionType.LZ4, _numBlocks * _blockSize, 1000, Long.BYTES, 3); + FixedByteChunkSVForwardIndexWriter pow2Writer = new FixedByteChunkSVForwardIndexWriter(pow2CompressedIndexFile, + ChunkCompressionType.LZ4, _numBlocks * _blockSize, 1000, Long.BYTES, 4)) { + for (int i = 0; i < _numBlocks * _blockSize; i++) { + long next = ThreadLocalRandom.current().nextLong(); + writer.putLong(next); + pow2Writer.putLong(next); + } + } + _compressedReader = new FixedByteChunkSVForwardIndexReader(PinotDataBuffer.loadBigEndianFile(compressedIndexFile), + FieldSpec.DataType.LONG); + _compressedPow2Reader = + new FixedBytePower2ChunkSVForwardIndexReader(PinotDataBuffer.loadBigEndianFile(pow2CompressedIndexFile), + FieldSpec.DataType.LONG); + _docIds = new int[_blockSize]; + } + + @TearDown(Level.Trial) + public void teardown() + throws IOException { + FileUtils.deleteDirectory(INDEX_DIR); + } + + @Benchmark + public void readCompressedDoublesNonContiguousV3(Blackhole bh) + throws IOException { + readCompressedDoublesNonContiguous(bh, _compressedReader); + } + + @Benchmark + public void readCompressedDoublesNonContiguousV4(Blackhole bh) + throws IOException { + readCompressedDoublesNonContiguous(bh, _compressedPow2Reader); + } + + @Benchmark + public void readCompressedLongsNonContiguousV3(Blackhole bh) + throws IOException { + readCompressedLongsNonContiguous(bh, _compressedReader); + } + + @Benchmark + public void readCompressedLongsNonContiguousV4(Blackhole bh) + throws IOException { + readCompressedLongsNonContiguous(bh, _compressedPow2Reader); + } + + private void readCompressedLongsNonContiguous(Blackhole bh, ForwardIndexReader<ChunkReaderContext> reader) + throws IOException { + try (ChunkReaderContext context = reader.createContext()) { + for (int block = 0; block < _numBlocks / 2; block++) { + for (int i = 0; i < _docIds.length; i++) { + _docIds[i] = block * _blockSize + i * 2; + } + for (int i = 0; i < _docIds.length; i++) { + _longBuffer[i] = reader.getLong(_docIds[i], context); + } + bh.consume(_longBuffer); + } + } + } + + private void readCompressedDoublesNonContiguous(Blackhole bh, ForwardIndexReader<ChunkReaderContext> reader) + throws IOException { + try (ChunkReaderContext context = reader.createContext()) { + for (int block = 0; block < _numBlocks / 2; block++) { + for (int i = 0; i < _docIds.length; i++) { + _docIds[i] = block * _blockSize + i * 2; + } + for (int i = 0; i < _docIds.length; i++) { + _doubleBuffer[i] = reader.getDouble(_docIds[i], context); + } + bh.consume(_doubleBuffer); + } + } + } +} diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexReader.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexReader.java index 7bc3fd8..ae223e7 100644 --- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexReader.java +++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexReader.java @@ -27,7 +27,7 @@ import java.util.function.LongSupplier; import org.apache.commons.io.FileUtils; import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter; import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriterV4; -import org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader; +import org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext; import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader; import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReaderV4; import org.apache.pinot.segment.spi.compression.ChunkCompressionType; @@ -200,7 +200,7 @@ public class BenchmarkRawForwardIndexReader { try (PinotDataBuffer buffer = PinotDataBuffer.loadBigEndianFile(state._file); VarByteChunkSVForwardIndexReader reader = new VarByteChunkSVForwardIndexReader(buffer, FieldSpec.DataType.BYTES); - BaseChunkSVForwardIndexReader.ChunkReaderContext context = reader.createContext()) { + ChunkReaderContext context = reader.createContext()) { for (int i = 0; i < state._records; i++) { bh.consume(reader.getBytes(i, context)); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkSVForwardIndexWriter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkSVForwardIndexWriter.java index 11b4361..71588d6 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkSVForwardIndexWriter.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkSVForwardIndexWriter.java @@ -65,12 +65,14 @@ public abstract class BaseChunkSVForwardIndexWriter implements Closeable { * @param chunkSize Size of chunk * @param sizeOfEntry Size of entry (in bytes), max size for variable byte implementation. * @param version version of File + * @param fixed if the data type is fixed width (required for version validation) * @throws IOException if the file isn't found or can't be mapped */ protected BaseChunkSVForwardIndexWriter(File file, ChunkCompressionType compressionType, int totalDocs, - int numDocsPerChunk, int chunkSize, int sizeOfEntry, int version) + int numDocsPerChunk, int chunkSize, int sizeOfEntry, int version, boolean fixed) throws IOException { - Preconditions.checkArgument(version == DEFAULT_VERSION || version == CURRENT_VERSION); + Preconditions.checkArgument(version == DEFAULT_VERSION || version == CURRENT_VERSION + || (fixed && version == 4)); _chunkSize = chunkSize; _chunkCompressor = ChunkCompressorFactory.getCompressor(compressionType); _headerEntryChunkOffsetSize = getHeaderEntryChunkOffsetSize(version); @@ -87,6 +89,7 @@ public abstract class BaseChunkSVForwardIndexWriter implements Closeable { case 2: return FILE_HEADER_ENTRY_CHUNK_OFFSET_SIZE_V1V2; case 3: + case 4: return FILE_HEADER_ENTRY_CHUNK_OFFSET_SIZE_V3; default: throw new IllegalStateException("Invalid version: " + version); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteChunkSVForwardIndexWriter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteChunkSVForwardIndexWriter.java index 8d9ad7e..7b942b1 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteChunkSVForwardIndexWriter.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteChunkSVForwardIndexWriter.java @@ -71,8 +71,9 @@ public class FixedByteChunkSVForwardIndexWriter extends BaseChunkSVForwardIndexW public FixedByteChunkSVForwardIndexWriter(File file, ChunkCompressionType compressionType, int totalDocs, int numDocsPerChunk, int sizeOfEntry, int writerVersion) throws IOException { - super(file, compressionType, totalDocs, numDocsPerChunk, (sizeOfEntry * numDocsPerChunk), sizeOfEntry, - writerVersion); + super(file, compressionType, totalDocs, normalizeDocsPerChunk(writerVersion, numDocsPerChunk), + (sizeOfEntry * normalizeDocsPerChunk(writerVersion, numDocsPerChunk)), sizeOfEntry, + writerVersion, true); _chunkDataOffset = 0; } @@ -112,4 +113,12 @@ public class FixedByteChunkSVForwardIndexWriter extends BaseChunkSVForwardIndexW writeChunk(); } } + + private static int normalizeDocsPerChunk(int version, int numDocsPerChunk) { + // V4 uses power of 2 chunk sizes for random access efficiency + if (version >= 4 && (numDocsPerChunk & (numDocsPerChunk - 1)) != 0) { + return 1 << (32 - Integer.numberOfLeadingZeros(numDocsPerChunk - 1)); + } + return numDocsPerChunk; + } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java index f035a62..7e99772 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java @@ -80,7 +80,7 @@ public class VarByteChunkSVForwardIndexWriter extends BaseChunkSVForwardIndexWri super(file, compressionType, totalDocs, numDocsPerChunk, numDocsPerChunk * (CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE + lengthOfLongestEntry), // chunkSize - lengthOfLongestEntry, writerVersion); + lengthOfLongestEntry, writerVersion, false); _chunkHeaderOffset = 0; _chunkHeaderSize = numDocsPerChunk * CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/DefaultIndexReaderProvider.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/DefaultIndexReaderProvider.java index 79cb55d..a342e85 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/DefaultIndexReaderProvider.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/DefaultIndexReaderProvider.java @@ -30,6 +30,7 @@ import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitMVFo import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitSVForwardIndexReaderV2; import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkMVForwardIndexReader; import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader; +import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBytePower2ChunkSVForwardIndexReader; import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader; import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader; import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReaderV4; @@ -89,10 +90,12 @@ public class DefaultIndexReaderProvider implements IndexReaderProvider { } else { FieldSpec.DataType storedType = columnMetadata.getDataType().getStoredType(); if (columnMetadata.isSingleValue()) { + int version = dataBuffer.getInt(0); if (storedType.isFixedWidth()) { - return new FixedByteChunkSVForwardIndexReader(dataBuffer, storedType); + return version >= FixedBytePower2ChunkSVForwardIndexReader.VERSION + ? new FixedBytePower2ChunkSVForwardIndexReader(dataBuffer, storedType) + : new FixedByteChunkSVForwardIndexReader(dataBuffer, storedType); } - int version = dataBuffer.getInt(0); if (version >= VarByteChunkSVForwardIndexWriterV4.VERSION) { return new VarByteChunkSVForwardIndexReaderV4(dataBuffer, storedType); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkSVForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkSVForwardIndexReader.java index 56090ca..dce4a90 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkSVForwardIndexReader.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkSVForwardIndexReader.java @@ -26,8 +26,6 @@ import org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWrit import org.apache.pinot.segment.spi.compression.ChunkCompressionType; import org.apache.pinot.segment.spi.compression.ChunkDecompressor; import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; -import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext; -import org.apache.pinot.segment.spi.memory.CleanerUtil; import org.apache.pinot.segment.spi.memory.PinotDataBuffer; import org.apache.pinot.spi.data.FieldSpec.DataType; import org.slf4j.Logger; @@ -37,8 +35,7 @@ import org.slf4j.LoggerFactory; /** * Base implementation for chunk-based single-value raw (non-dictionary-encoded) forward index reader. */ -public abstract class BaseChunkSVForwardIndexReader - implements ForwardIndexReader<BaseChunkSVForwardIndexReader.ChunkReaderContext> { +public abstract class BaseChunkSVForwardIndexReader implements ForwardIndexReader<ChunkReaderContext> { private static final Logger LOGGER = LoggerFactory.getLogger(BaseChunkSVForwardIndexReader.class); protected final PinotDataBuffer _dataBuffer; @@ -114,7 +111,10 @@ public abstract class BaseChunkSVForwardIndexReader if (context.getChunkId() == chunkId) { return context.getChunkBuffer(); } + return decompressChunk(chunkId, context); + } + protected ByteBuffer decompressChunk(int chunkId, ChunkReaderContext context) { int chunkSize; long chunkPosition = getChunkPosition(chunkId); @@ -172,45 +172,4 @@ public abstract class BaseChunkSVForwardIndexReader // NOTE: DO NOT close the PinotDataBuffer here because it is tracked by the caller and might be reused later. The // caller is responsible of closing the PinotDataBuffer. } - - /** - * Context for the chunk-based forward index readers. - * <p>Information saved in the context can be used by subsequent reads as cache: - * <ul> - * <li> - * Chunk Buffer from the previous read. Useful if the subsequent read is from the same buffer, as it avoids extra - * chunk decompression. - * </li> - * <li>Id for the chunk</li> - * </ul> - */ - public static class ChunkReaderContext implements ForwardIndexReaderContext { - private final ByteBuffer _chunkBuffer; - private int _chunkId; - - public ChunkReaderContext(int maxChunkSize) { - _chunkBuffer = ByteBuffer.allocateDirect(maxChunkSize); - _chunkId = -1; - } - - public ByteBuffer getChunkBuffer() { - return _chunkBuffer; - } - - public int getChunkId() { - return _chunkId; - } - - public void setChunkId(int chunkId) { - _chunkId = chunkId; - } - - @Override - public void close() - throws IOException { - if (CleanerUtil.UNMAP_SUPPORTED) { - CleanerUtil.getCleaner().freeBuffer(_chunkBuffer); - } - } - } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/ChunkReaderContext.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/ChunkReaderContext.java new file mode 100644 index 0000000..1adf685 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/ChunkReaderContext.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.index.readers.forward; + +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext; +import org.apache.pinot.segment.spi.memory.CleanerUtil; + + +/** + * Context for the chunk-based forward index readers. + * <p>Information saved in the context can be used by subsequent reads as cache: + * <ul> + * <li> + * Chunk Buffer from the previous read. Useful if the subsequent read is from the same buffer, as it avoids extra + * chunk decompression. + * </li> + * <li>Id for the chunk</li> + * </ul> + */ +public class ChunkReaderContext implements ForwardIndexReaderContext { + private final ByteBuffer _chunkBuffer; + private int _chunkId; + + public ChunkReaderContext(int maxChunkSize) { + _chunkBuffer = ByteBuffer.allocateDirect(maxChunkSize); + _chunkId = -1; + } + + public ByteBuffer getChunkBuffer() { + return _chunkBuffer; + } + + public int getChunkId() { + return _chunkId; + } + + public void setChunkId(int chunkId) { + _chunkId = chunkId; + } + + @Override + public void close() + throws IOException { + if (CleanerUtil.UNMAP_SUPPORTED) { + CleanerUtil.getCleaner().freeBuffer(_chunkBuffer); + } + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBytePower2ChunkSVForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBytePower2ChunkSVForwardIndexReader.java new file mode 100644 index 0000000..0effd62 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBytePower2ChunkSVForwardIndexReader.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.index.readers.forward; + +import java.nio.ByteBuffer; +import javax.annotation.Nullable; +import org.apache.pinot.segment.local.io.writer.impl.FixedByteChunkSVForwardIndexWriter; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; +import org.apache.pinot.spi.data.FieldSpec.DataType; + + +/** + * Chunk-based single-value raw (non-dictionary-encoded) forward index reader for values of fixed length data type (INT, + * LONG, FLOAT, DOUBLE). + * <p>For data layout, please refer to the documentation for {@link FixedByteChunkSVForwardIndexWriter} + */ +public final class FixedBytePower2ChunkSVForwardIndexReader extends BaseChunkSVForwardIndexReader { + public static final int VERSION = 4; + + private final int _shift; + + public FixedBytePower2ChunkSVForwardIndexReader(PinotDataBuffer dataBuffer, DataType valueType) { + super(dataBuffer, valueType); + _shift = Integer.numberOfTrailingZeros(_numDocsPerChunk); + } + + @Nullable + @Override + public ChunkReaderContext createContext() { + if (_isCompressed) { + return new ChunkReaderContext(_numDocsPerChunk * _valueType.size()); + } else { + return null; + } + } + + @Override + public int getInt(int docId, ChunkReaderContext context) { + if (_isCompressed) { + int chunkRowId = docId & (_numDocsPerChunk - 1); + ByteBuffer chunkBuffer = getChunkBuffer(docId, context); + return chunkBuffer.getInt(chunkRowId * Integer.BYTES); + } else { + return _rawData.getInt(docId * Integer.BYTES); + } + } + + @Override + public long getLong(int docId, ChunkReaderContext context) { + if (_isCompressed) { + int chunkRowId = docId & (_numDocsPerChunk - 1); + ByteBuffer chunkBuffer = getChunkBuffer(docId, context); + return chunkBuffer.getLong(chunkRowId * Long.BYTES); + } else { + return _rawData.getLong(docId * Long.BYTES); + } + } + + @Override + public float getFloat(int docId, ChunkReaderContext context) { + if (_isCompressed) { + int chunkRowId = docId & (_numDocsPerChunk - 1); + ByteBuffer chunkBuffer = getChunkBuffer(docId, context); + return chunkBuffer.getFloat(chunkRowId * Float.BYTES); + } else { + return _rawData.getFloat(docId * Float.BYTES); + } + } + + @Override + public double getDouble(int docId, ChunkReaderContext context) { + if (_isCompressed) { + int chunkRowId = docId & (_numDocsPerChunk - 1); + ByteBuffer chunkBuffer = getChunkBuffer(docId, context); + return chunkBuffer.getDouble(chunkRowId * Double.BYTES); + } else { + return _rawData.getDouble(docId * Double.BYTES); + } + } + + /** + * Helper method to return the chunk buffer that contains the value at the given document id. + * <ul> + * <li> If the chunk already exists in the reader context, returns the same. </li> + * <li> Otherwise, loads the chunk for the row, and sets it in the reader context. </li> + * </ul> + * @param docId Document id + * @param context Reader context + * @return Chunk for the row + */ + protected ByteBuffer getChunkBuffer(int docId, ChunkReaderContext context) { + int chunkId = docId >>> _shift; + if (context.getChunkId() == chunkId) { + return context.getChunkBuffer(); + } + return decompressChunk(chunkId, context); + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java index e4b6c15..fb49dce 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java @@ -30,7 +30,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.commons.io.FileUtils; import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueFixedByteRawIndexCreator; -import org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader.ChunkReaderContext; +import org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext; import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkMVForwardIndexReader; import org.apache.pinot.segment.spi.V1Constants.Indexes; import org.apache.pinot.segment.spi.compression.ChunkCompressionType; diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java index f5ad70d..fb645f5 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java @@ -27,7 +27,7 @@ import java.util.List; import java.util.Random; import org.apache.commons.io.FileUtils; import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator; -import org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader.ChunkReaderContext; +import org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext; import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader; import org.apache.pinot.segment.spi.V1Constants.Indexes; import org.apache.pinot.segment.spi.compression.ChunkCompressionType; diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java index 890292e..af99c5a 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java @@ -31,7 +31,7 @@ import org.apache.commons.lang3.RandomStringUtils; import org.apache.pinot.common.utils.StringUtil; import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; -import org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader; +import org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext; import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader; import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader; import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader; @@ -169,11 +169,9 @@ public class RawIndexCreatorTest { public void testStringRawIndexCreator() throws Exception { PinotDataBuffer indexBuffer = getIndexBufferForColumn(STRING_COLUMN); - try (VarByteChunkSVForwardIndexReader rawIndexReader = new VarByteChunkSVForwardIndexReader( - indexBuffer, + try (VarByteChunkSVForwardIndexReader rawIndexReader = new VarByteChunkSVForwardIndexReader(indexBuffer, DataType.STRING); - BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext = rawIndexReader - .createContext()) { + ChunkReaderContext readerContext = rawIndexReader.createContext()) { _recordReader.rewind(); for (int row = 0; row < NUM_ROWS; row++) { GenericRow expectedRow = _recordReader.next(); @@ -193,9 +191,8 @@ public class RawIndexCreatorTest { throws Exception { PinotDataBuffer indexBuffer = getIndexBufferForColumn(column); try (FixedByteChunkSVForwardIndexReader rawIndexReader = new FixedByteChunkSVForwardIndexReader( - indexBuffer, - dataType); BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext = rawIndexReader - .createContext()) { + indexBuffer, dataType); + ChunkReaderContext readerContext = rawIndexReader.createContext()) { _recordReader.rewind(); for (int row = 0; row < NUM_ROWS; row++) { GenericRow expectedRow = _recordReader.next(); @@ -216,7 +213,7 @@ public class RawIndexCreatorTest { try (VarByteChunkMVForwardIndexReader rawIndexReader = new VarByteChunkMVForwardIndexReader( indexBuffer, DataType.STRING); - BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext = rawIndexReader + ChunkReaderContext readerContext = rawIndexReader .createContext()) { _recordReader.rewind(); int maxNumberOfMultiValues = _segmentDirectory.getSegmentMetadata() @@ -250,7 +247,7 @@ public class RawIndexCreatorTest { PinotDataBuffer indexBuffer = getIndexBufferForColumn(BYTES_MV_COLUMN); try (VarByteChunkMVForwardIndexReader rawIndexReader = new VarByteChunkMVForwardIndexReader( indexBuffer, DataType.BYTES); - BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext = rawIndexReader + ChunkReaderContext readerContext = rawIndexReader .createContext()) { _recordReader.rewind(); int maxNumberOfMultiValues = _segmentDirectory.getSegmentMetadata() @@ -373,8 +370,8 @@ public class RawIndexCreatorTest { * @param docId Document id * @return Value read from index */ - private Object readValueFromIndex(FixedByteChunkSVForwardIndexReader rawIndexReader, - BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext, int docId) { + private Object readValueFromIndex(FixedByteChunkSVForwardIndexReader rawIndexReader, ChunkReaderContext readerContext, + int docId) { switch (rawIndexReader.getValueType()) { case INT: return rawIndexReader.getInt(docId, readerContext); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java index 9afe093..0e2386b 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java @@ -20,16 +20,20 @@ package org.apache.pinot.segment.local.segment.index.forward; import java.io.File; import java.net.URL; +import java.util.Arrays; import java.util.Random; +import java.util.stream.IntStream; import org.apache.commons.io.FileUtils; -import org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter; import org.apache.pinot.segment.local.io.writer.impl.FixedByteChunkSVForwardIndexWriter; -import org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader; +import org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext; import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader; +import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBytePower2ChunkSVForwardIndexReader; import org.apache.pinot.segment.spi.compression.ChunkCompressionType; +import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; import org.apache.pinot.segment.spi.memory.PinotDataBuffer; import org.apache.pinot.spi.data.FieldSpec.DataType; import org.testng.Assert; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -49,47 +53,16 @@ public class FixedByteChunkSVForwardIndexTest { private static final String TEST_FILE = System.getProperty("java.io.tmpdir") + File.separator + "FixedByteSVRTest"; private static final Random RANDOM = new Random(); - @Test - public void testWithCompression() - throws Exception { - ChunkCompressionType compressionType = ChunkCompressionType.SNAPPY; - testInt(compressionType); - testLong(compressionType); - testFloat(compressionType); - testDouble(compressionType); - } - - @Test - public void testWithoutCompression() - throws Exception { - ChunkCompressionType compressionType = ChunkCompressionType.PASS_THROUGH; - testInt(compressionType); - testLong(compressionType); - testFloat(compressionType); - testDouble(compressionType); - } - - @Test - public void testWithZstandardCompression() - throws Exception { - ChunkCompressionType compressionType = ChunkCompressionType.ZSTANDARD; - testInt(compressionType); - testLong(compressionType); - testFloat(compressionType); - testDouble(compressionType); - } - - @Test - public void testWithLZ4Compression() - throws Exception { - ChunkCompressionType compressionType = ChunkCompressionType.LZ4; - testInt(compressionType); - testLong(compressionType); - testFloat(compressionType); - testDouble(compressionType); + @DataProvider(name = "combinations") + public static Object[][] combinations() { + return Arrays.stream(ChunkCompressionType.values()) + .flatMap(chunkCompressionType -> IntStream.of(2, 3, 4) + .mapToObj(version -> new Object[]{chunkCompressionType, version})) + .toArray(Object[][]::new); } - public void testInt(ChunkCompressionType compressionType) + @Test(dataProvider = "combinations") + public void testInt(ChunkCompressionType compressionType, int version) throws Exception { int[] expected = new int[NUM_VALUES]; for (int i = 0; i < NUM_VALUES; i++) { @@ -103,24 +76,28 @@ public class FixedByteChunkSVForwardIndexTest { // test both formats (4-byte chunk offsets and 8-byte chunk offsets) try (FixedByteChunkSVForwardIndexWriter fourByteOffsetWriter = new FixedByteChunkSVForwardIndexWriter( - outFileFourByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, Integer.BYTES, - BaseChunkSVForwardIndexWriter.DEFAULT_VERSION); + outFileFourByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, Integer.BYTES, version); FixedByteChunkSVForwardIndexWriter eightByteOffsetWriter = new FixedByteChunkSVForwardIndexWriter( - outFileEightByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, Integer.BYTES, - BaseChunkSVForwardIndexWriter.CURRENT_VERSION)) { + outFileEightByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, Integer.BYTES, version)) { for (int value : expected) { fourByteOffsetWriter.putInt(value); eightByteOffsetWriter.putInt(value); } } - try (FixedByteChunkSVForwardIndexReader fourByteOffsetReader = new FixedByteChunkSVForwardIndexReader( - PinotDataBuffer.mapReadOnlyBigEndianFile(outFileFourByte), DataType.INT); - BaseChunkSVForwardIndexReader.ChunkReaderContext fourByteOffsetReaderContext = fourByteOffsetReader + try (ForwardIndexReader<ChunkReaderContext> fourByteOffsetReader = version >= 4 + ? new FixedBytePower2ChunkSVForwardIndexReader( + PinotDataBuffer.mapReadOnlyBigEndianFile(outFileFourByte), DataType.INT) + : new FixedByteChunkSVForwardIndexReader( + PinotDataBuffer.mapReadOnlyBigEndianFile(outFileFourByte), DataType.INT); + ChunkReaderContext fourByteOffsetReaderContext = fourByteOffsetReader .createContext(); - FixedByteChunkSVForwardIndexReader eightByteOffsetReader = new FixedByteChunkSVForwardIndexReader( - PinotDataBuffer.mapReadOnlyBigEndianFile(outFileEightByte), DataType.INT); - BaseChunkSVForwardIndexReader.ChunkReaderContext eightByteOffsetReaderContext = eightByteOffsetReader + ForwardIndexReader<ChunkReaderContext> eightByteOffsetReader = version >= 4 + ? new FixedBytePower2ChunkSVForwardIndexReader( + PinotDataBuffer.mapReadOnlyBigEndianFile(outFileEightByte), DataType.INT) + : new FixedByteChunkSVForwardIndexReader( + PinotDataBuffer.mapReadOnlyBigEndianFile(outFileEightByte), DataType.INT); + ChunkReaderContext eightByteOffsetReaderContext = eightByteOffsetReader .createContext()) { for (int i = 0; i < NUM_VALUES; i++) { Assert.assertEquals(fourByteOffsetReader.getInt(i, fourByteOffsetReaderContext), expected[i]); @@ -132,7 +109,8 @@ public class FixedByteChunkSVForwardIndexTest { FileUtils.deleteQuietly(outFileEightByte); } - public void testLong(ChunkCompressionType compressionType) + @Test(dataProvider = "combinations") + public void testLong(ChunkCompressionType compressionType, int version) throws Exception { long[] expected = new long[NUM_VALUES]; for (int i = 0; i < NUM_VALUES; i++) { @@ -146,24 +124,28 @@ public class FixedByteChunkSVForwardIndexTest { // test both formats (4-byte chunk offsets and 8-byte chunk offsets) try (FixedByteChunkSVForwardIndexWriter fourByteOffsetWriter = new FixedByteChunkSVForwardIndexWriter( - outFileFourByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, Long.BYTES, - BaseChunkSVForwardIndexWriter.DEFAULT_VERSION); + outFileFourByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, Long.BYTES, version); FixedByteChunkSVForwardIndexWriter eightByteOffsetWriter = new FixedByteChunkSVForwardIndexWriter( - outFileEightByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, Long.BYTES, - BaseChunkSVForwardIndexWriter.CURRENT_VERSION)) { + outFileEightByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, Long.BYTES, version)) { for (long value : expected) { fourByteOffsetWriter.putLong(value); eightByteOffsetWriter.putLong(value); } } - try (FixedByteChunkSVForwardIndexReader fourByteOffsetReader = new FixedByteChunkSVForwardIndexReader( - PinotDataBuffer.mapReadOnlyBigEndianFile(outFileFourByte), DataType.LONG); - BaseChunkSVForwardIndexReader.ChunkReaderContext fourByteOffsetReaderContext = fourByteOffsetReader + try (ForwardIndexReader<ChunkReaderContext> fourByteOffsetReader = version >= 4 + ? new FixedBytePower2ChunkSVForwardIndexReader( + PinotDataBuffer.mapReadOnlyBigEndianFile(outFileFourByte), DataType.LONG) + : new FixedByteChunkSVForwardIndexReader( + PinotDataBuffer.mapReadOnlyBigEndianFile(outFileFourByte), DataType.LONG); + ChunkReaderContext fourByteOffsetReaderContext = fourByteOffsetReader .createContext(); - FixedByteChunkSVForwardIndexReader eightByteOffsetReader = new FixedByteChunkSVForwardIndexReader( - PinotDataBuffer.mapReadOnlyBigEndianFile(outFileEightByte), DataType.LONG); - BaseChunkSVForwardIndexReader.ChunkReaderContext eightByteOffsetReaderContext = eightByteOffsetReader + ForwardIndexReader<ChunkReaderContext> eightByteOffsetReader = version >= 4 + ? new FixedBytePower2ChunkSVForwardIndexReader( + PinotDataBuffer.mapReadOnlyBigEndianFile(outFileEightByte), DataType.LONG) + : new FixedByteChunkSVForwardIndexReader( + PinotDataBuffer.mapReadOnlyBigEndianFile(outFileEightByte), DataType.LONG); + ChunkReaderContext eightByteOffsetReaderContext = eightByteOffsetReader .createContext()) { for (int i = 0; i < NUM_VALUES; i++) { Assert.assertEquals(fourByteOffsetReader.getLong(i, fourByteOffsetReaderContext), expected[i]); @@ -175,7 +157,8 @@ public class FixedByteChunkSVForwardIndexTest { FileUtils.deleteQuietly(outFileEightByte); } - public void testFloat(ChunkCompressionType compressionType) + @Test(dataProvider = "combinations") + public void testFloat(ChunkCompressionType compressionType, int version) throws Exception { float[] expected = new float[NUM_VALUES]; for (int i = 0; i < NUM_VALUES; i++) { @@ -189,24 +172,28 @@ public class FixedByteChunkSVForwardIndexTest { // test both formats (4-byte chunk offsets and 8-byte chunk offsets) try (FixedByteChunkSVForwardIndexWriter fourByteOffsetWriter = new FixedByteChunkSVForwardIndexWriter( - outFileFourByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, Float.BYTES, - BaseChunkSVForwardIndexWriter.DEFAULT_VERSION); + outFileFourByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, Float.BYTES, version); FixedByteChunkSVForwardIndexWriter eightByteOffsetWriter = new FixedByteChunkSVForwardIndexWriter( - outFileEightByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, Float.BYTES, - BaseChunkSVForwardIndexWriter.CURRENT_VERSION)) { + outFileEightByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, Float.BYTES, version)) { for (float value : expected) { fourByteOffsetWriter.putFloat(value); eightByteOffsetWriter.putFloat(value); } } - try (FixedByteChunkSVForwardIndexReader fourByteOffsetReader = new FixedByteChunkSVForwardIndexReader( - PinotDataBuffer.mapReadOnlyBigEndianFile(outFileFourByte), DataType.FLOAT); - BaseChunkSVForwardIndexReader.ChunkReaderContext fourByteOffsetReaderContext = fourByteOffsetReader + try (ForwardIndexReader<ChunkReaderContext> fourByteOffsetReader = version >= 4 + ? new FixedBytePower2ChunkSVForwardIndexReader( + PinotDataBuffer.mapReadOnlyBigEndianFile(outFileFourByte), DataType.FLOAT) + : new FixedByteChunkSVForwardIndexReader( + PinotDataBuffer.mapReadOnlyBigEndianFile(outFileFourByte), DataType.FLOAT); + ChunkReaderContext fourByteOffsetReaderContext = fourByteOffsetReader .createContext(); - FixedByteChunkSVForwardIndexReader eightByteOffsetReader = new FixedByteChunkSVForwardIndexReader( - PinotDataBuffer.mapReadOnlyBigEndianFile(outFileEightByte), DataType.FLOAT); - BaseChunkSVForwardIndexReader.ChunkReaderContext eightByteOffsetReaderContext = eightByteOffsetReader + ForwardIndexReader<ChunkReaderContext> eightByteOffsetReader = version >= 4 + ? new FixedBytePower2ChunkSVForwardIndexReader(PinotDataBuffer.mapReadOnlyBigEndianFile(outFileEightByte), + DataType.FLOAT) + : new FixedByteChunkSVForwardIndexReader( + PinotDataBuffer.mapReadOnlyBigEndianFile(outFileEightByte), DataType.FLOAT); + ChunkReaderContext eightByteOffsetReaderContext = eightByteOffsetReader .createContext()) { for (int i = 0; i < NUM_VALUES; i++) { Assert.assertEquals(fourByteOffsetReader.getFloat(i, fourByteOffsetReaderContext), expected[i]); @@ -218,7 +205,8 @@ public class FixedByteChunkSVForwardIndexTest { FileUtils.deleteQuietly(outFileEightByte); } - public void testDouble(ChunkCompressionType compressionType) + @Test(dataProvider = "combinations") + public void testDouble(ChunkCompressionType compressionType, int version) throws Exception { double[] expected = new double[NUM_VALUES]; for (int i = 0; i < NUM_VALUES; i++) { @@ -232,24 +220,28 @@ public class FixedByteChunkSVForwardIndexTest { // test both formats (4-byte chunk offsets and 8-byte chunk offsets) try (FixedByteChunkSVForwardIndexWriter fourByteOffsetWriter = new FixedByteChunkSVForwardIndexWriter( - outFileFourByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, Double.BYTES, - BaseChunkSVForwardIndexWriter.DEFAULT_VERSION); + outFileFourByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, Double.BYTES, version); FixedByteChunkSVForwardIndexWriter eightByteOffsetWriter = new FixedByteChunkSVForwardIndexWriter( - outFileEightByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, Double.BYTES, - BaseChunkSVForwardIndexWriter.CURRENT_VERSION)) { + outFileEightByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, Double.BYTES, version)) { for (double value : expected) { fourByteOffsetWriter.putDouble(value); eightByteOffsetWriter.putDouble(value); } } - try (FixedByteChunkSVForwardIndexReader fourByteOffsetReader = new FixedByteChunkSVForwardIndexReader( - PinotDataBuffer.mapReadOnlyBigEndianFile(outFileFourByte), DataType.DOUBLE); - BaseChunkSVForwardIndexReader.ChunkReaderContext fourByteOffsetReaderContext = fourByteOffsetReader + try (ForwardIndexReader<ChunkReaderContext> fourByteOffsetReader = version >= 4 + ? new FixedBytePower2ChunkSVForwardIndexReader( + PinotDataBuffer.mapReadOnlyBigEndianFile(outFileFourByte), DataType.DOUBLE) + : new FixedByteChunkSVForwardIndexReader( + PinotDataBuffer.mapReadOnlyBigEndianFile(outFileFourByte), DataType.DOUBLE); + ChunkReaderContext fourByteOffsetReaderContext = fourByteOffsetReader .createContext(); - FixedByteChunkSVForwardIndexReader eightByteOffsetReader = new FixedByteChunkSVForwardIndexReader( - PinotDataBuffer.mapReadOnlyBigEndianFile(outFileEightByte), DataType.DOUBLE); - BaseChunkSVForwardIndexReader.ChunkReaderContext eightByteOffsetReaderContext = eightByteOffsetReader + ForwardIndexReader<ChunkReaderContext> eightByteOffsetReader = version >= 4 + ? new FixedBytePower2ChunkSVForwardIndexReader( + PinotDataBuffer.mapReadOnlyBigEndianFile(outFileEightByte), DataType.DOUBLE) + : new FixedByteChunkSVForwardIndexReader( + PinotDataBuffer.mapReadOnlyBigEndianFile(outFileEightByte), DataType.DOUBLE); + ChunkReaderContext eightByteOffsetReaderContext = eightByteOffsetReader .createContext()) { for (int i = 0; i < NUM_VALUES; i++) { Assert.assertEquals(fourByteOffsetReader.getDouble(i, fourByteOffsetReaderContext), expected[i]); @@ -290,7 +282,7 @@ public class FixedByteChunkSVForwardIndexTest { File file = new File(resource.getFile()); try (FixedByteChunkSVForwardIndexReader reader = new FixedByteChunkSVForwardIndexReader( PinotDataBuffer.mapReadOnlyBigEndianFile(file), DataType.DOUBLE); - BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext = reader.createContext()) { + ChunkReaderContext readerContext = reader.createContext()) { for (int i = 0; i < numDocs; i++) { double actual = reader.getDouble(i, readerContext); Assert.assertEquals(actual, i + startValue); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java index 3f7f1c0..f6b8f46 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java @@ -27,7 +27,7 @@ import org.apache.commons.lang.RandomStringUtils; import org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter; import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter; import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueVarByteRawIndexCreator; -import org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader; +import org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext; import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader; import org.apache.pinot.segment.spi.compression.ChunkCompressionType; import org.apache.pinot.segment.spi.memory.PinotDataBuffer; @@ -117,11 +117,11 @@ public class VarByteChunkSVForwardIndexTest { try (VarByteChunkSVForwardIndexReader fourByteOffsetReader = new VarByteChunkSVForwardIndexReader( PinotDataBuffer.mapReadOnlyBigEndianFile(outFileFourByte), DataType.STRING); - BaseChunkSVForwardIndexReader.ChunkReaderContext fourByteOffsetReaderContext = fourByteOffsetReader + ChunkReaderContext fourByteOffsetReaderContext = fourByteOffsetReader .createContext(); VarByteChunkSVForwardIndexReader eightByteOffsetReader = new VarByteChunkSVForwardIndexReader( PinotDataBuffer.mapReadOnlyBigEndianFile(outFileEightByte), DataType.STRING); - BaseChunkSVForwardIndexReader.ChunkReaderContext eightByteOffsetReaderContext = eightByteOffsetReader + ChunkReaderContext eightByteOffsetReaderContext = eightByteOffsetReader .createContext()) { for (int i = 0; i < NUM_ENTRIES; i++) { Assert.assertEquals(fourByteOffsetReader.getString(i, fourByteOffsetReaderContext), expected[i]); @@ -164,7 +164,7 @@ public class VarByteChunkSVForwardIndexTest { File file = new File(resource.getFile()); try (VarByteChunkSVForwardIndexReader reader = new VarByteChunkSVForwardIndexReader( PinotDataBuffer.mapReadOnlyBigEndianFile(file), DataType.STRING); - BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext = reader.createContext()) { + ChunkReaderContext readerContext = reader.createContext()) { for (int i = 0; i < numDocs; i++) { String actual = reader.getString(i, readerContext); Assert.assertEquals(actual, data[i % data.length]); @@ -237,7 +237,7 @@ public class VarByteChunkSVForwardIndexTest { PinotDataBuffer buffer = PinotDataBuffer.mapReadOnlyBigEndianFile(outFile); try (VarByteChunkSVForwardIndexReader reader = new VarByteChunkSVForwardIndexReader(buffer, DataType.STRING); - BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext = reader.createContext()) { + ChunkReaderContext readerContext = reader.createContext()) { for (int i = 0; i < numDocs; i++) { Assert.assertEquals(reader.getString(i, readerContext), expected[i]); } @@ -257,7 +257,7 @@ public class VarByteChunkSVForwardIndexTest { } try (VarByteChunkSVForwardIndexReader reader = new VarByteChunkSVForwardIndexReader(buffer, DataType.STRING); - BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext = reader.createContext()) { + ChunkReaderContext readerContext = reader.createContext()) { for (int i = 0; i < numDocs; i++) { Assert.assertEquals(reader.getString(i, readerContext), expected[i]); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org