This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new bed2e30  Power of 2 fixed size chunks (#7934)
bed2e30 is described below

commit bed2e307284ca2e43ff6e572be57c01f970645c3
Author: Richard Startin <rich...@startree.ai>
AuthorDate: Thu Dec 23 18:48:12 2021 +0000

    Power of 2 fixed size chunks (#7934)
    
    * power of 2 fixed-byte chunk reader
    
    * change method name
---
 .../BenchmarkFixedByteSVForwardIndexReader.java    | 147 +++++++++++++++++++
 .../pinot/perf/BenchmarkRawForwardIndexReader.java |   4 +-
 .../writer/impl/BaseChunkSVForwardIndexWriter.java |   7 +-
 .../impl/FixedByteChunkSVForwardIndexWriter.java   |  13 +-
 .../impl/VarByteChunkSVForwardIndexWriter.java     |   2 +-
 .../index/readers/DefaultIndexReaderProvider.java  |   7 +-
 .../forward/BaseChunkSVForwardIndexReader.java     |  49 +------
 .../index/readers/forward/ChunkReaderContext.java  |  66 +++++++++
 .../FixedBytePower2ChunkSVForwardIndexReader.java  | 114 +++++++++++++++
 .../MultiValueFixedByteRawIndexCreatorTest.java    |   2 +-
 .../MultiValueVarByteRawIndexCreatorTest.java      |   2 +-
 .../segment/index/creator/RawIndexCreatorTest.java |  21 ++-
 .../forward/FixedByteChunkSVForwardIndexTest.java  | 162 ++++++++++-----------
 .../forward/VarByteChunkSVForwardIndexTest.java    |  12 +-
 14 files changed, 449 insertions(+), 159 deletions(-)

diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkFixedByteSVForwardIndexReader.java
 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkFixedByteSVForwardIndexReader.java
new file mode 100644
index 0000000..f5c7e37
--- /dev/null
+++ 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkFixedByteSVForwardIndexReader.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.perf;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.commons.io.FileUtils;
+import 
org.apache.pinot.segment.local.io.writer.impl.FixedByteChunkSVForwardIndexWriter;
+import 
org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext;
+import 
org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
+import 
org.apache.pinot.segment.local.segment.index.readers.forward.FixedBytePower2ChunkSVForwardIndexReader;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.infra.Blackhole;
+
+
+@State(Scope.Benchmark)
+public class BenchmarkFixedByteSVForwardIndexReader {
+
+  private static final File INDEX_DIR =
+      new File(FileUtils.getTempDirectory(), 
"BenchmarkFixedByteSVForwardIndexReader");
+
+  @Param("10000")
+  int _blockSize;
+
+  @Param("1000")
+  int _numBlocks;
+
+  private int[] _docIds;
+  private double[] _doubleBuffer;
+  private long[] _longBuffer;
+  private FixedByteChunkSVForwardIndexReader _compressedReader;
+  private FixedBytePower2ChunkSVForwardIndexReader _compressedPow2Reader;
+
+  @Setup(Level.Trial)
+  public void setup()
+      throws IOException {
+    FileUtils.forceMkdir(INDEX_DIR);
+    File compressedIndexFile = new File(INDEX_DIR, 
UUID.randomUUID().toString());
+    File pow2CompressedIndexFile = new File(INDEX_DIR, 
UUID.randomUUID().toString());
+    _doubleBuffer = new double[_blockSize];
+    _longBuffer = new long[_blockSize];
+    try (FixedByteChunkSVForwardIndexWriter writer = new 
FixedByteChunkSVForwardIndexWriter(compressedIndexFile,
+        ChunkCompressionType.LZ4, _numBlocks * _blockSize, 1000, Long.BYTES, 
3);
+        FixedByteChunkSVForwardIndexWriter pow2Writer = new 
FixedByteChunkSVForwardIndexWriter(pow2CompressedIndexFile,
+            ChunkCompressionType.LZ4, _numBlocks * _blockSize, 1000, 
Long.BYTES, 4)) {
+      for (int i = 0; i < _numBlocks * _blockSize; i++) {
+        long next = ThreadLocalRandom.current().nextLong();
+        writer.putLong(next);
+        pow2Writer.putLong(next);
+      }
+    }
+    _compressedReader = new 
FixedByteChunkSVForwardIndexReader(PinotDataBuffer.loadBigEndianFile(compressedIndexFile),
+        FieldSpec.DataType.LONG);
+    _compressedPow2Reader =
+        new 
FixedBytePower2ChunkSVForwardIndexReader(PinotDataBuffer.loadBigEndianFile(pow2CompressedIndexFile),
+            FieldSpec.DataType.LONG);
+    _docIds = new int[_blockSize];
+  }
+
+  @TearDown(Level.Trial)
+  public void teardown()
+      throws IOException {
+    FileUtils.deleteDirectory(INDEX_DIR);
+  }
+
+  @Benchmark
+  public void readCompressedDoublesNonContiguousV3(Blackhole bh)
+      throws IOException {
+    readCompressedDoublesNonContiguous(bh, _compressedReader);
+  }
+
+  @Benchmark
+  public void readCompressedDoublesNonContiguousV4(Blackhole bh)
+      throws IOException {
+    readCompressedDoublesNonContiguous(bh, _compressedPow2Reader);
+  }
+
+  @Benchmark
+  public void readCompressedLongsNonContiguousV3(Blackhole bh)
+      throws IOException {
+    readCompressedLongsNonContiguous(bh, _compressedReader);
+  }
+
+  @Benchmark
+  public void readCompressedLongsNonContiguousV4(Blackhole bh)
+      throws IOException {
+    readCompressedLongsNonContiguous(bh, _compressedPow2Reader);
+  }
+
+  private void readCompressedLongsNonContiguous(Blackhole bh, 
ForwardIndexReader<ChunkReaderContext> reader)
+      throws IOException {
+    try (ChunkReaderContext context = reader.createContext()) {
+      for (int block = 0; block < _numBlocks / 2; block++) {
+        for (int i = 0; i < _docIds.length; i++) {
+          _docIds[i] = block * _blockSize + i * 2;
+        }
+        for (int i = 0; i < _docIds.length; i++) {
+          _longBuffer[i] = reader.getLong(_docIds[i], context);
+        }
+        bh.consume(_longBuffer);
+      }
+    }
+  }
+
+  private void readCompressedDoublesNonContiguous(Blackhole bh, 
ForwardIndexReader<ChunkReaderContext> reader)
+      throws IOException {
+    try (ChunkReaderContext context = reader.createContext()) {
+      for (int block = 0; block < _numBlocks / 2; block++) {
+        for (int i = 0; i < _docIds.length; i++) {
+          _docIds[i] = block * _blockSize + i * 2;
+        }
+        for (int i = 0; i < _docIds.length; i++) {
+          _doubleBuffer[i] = reader.getDouble(_docIds[i], context);
+        }
+        bh.consume(_doubleBuffer);
+      }
+    }
+  }
+}
diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexReader.java
 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexReader.java
index 7bc3fd8..ae223e7 100644
--- 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexReader.java
+++ 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRawForwardIndexReader.java
@@ -27,7 +27,7 @@ import java.util.function.LongSupplier;
 import org.apache.commons.io.FileUtils;
 import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
 import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriterV4;
-import 
org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader;
+import 
org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReaderV4;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
@@ -200,7 +200,7 @@ public class BenchmarkRawForwardIndexReader {
     try (PinotDataBuffer buffer = 
PinotDataBuffer.loadBigEndianFile(state._file);
         VarByteChunkSVForwardIndexReader reader =
             new VarByteChunkSVForwardIndexReader(buffer, 
FieldSpec.DataType.BYTES);
-        BaseChunkSVForwardIndexReader.ChunkReaderContext context = 
reader.createContext()) {
+        ChunkReaderContext context = reader.createContext()) {
       for (int i = 0; i < state._records; i++) {
         bh.consume(reader.getBytes(i, context));
       }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkSVForwardIndexWriter.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkSVForwardIndexWriter.java
index 11b4361..71588d6 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkSVForwardIndexWriter.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkSVForwardIndexWriter.java
@@ -65,12 +65,14 @@ public abstract class BaseChunkSVForwardIndexWriter 
implements Closeable {
    * @param chunkSize Size of chunk
    * @param sizeOfEntry Size of entry (in bytes), max size for variable byte 
implementation.
    * @param version version of File
+   * @param fixed if the data type is fixed width (required for version 
validation)
    * @throws IOException if the file isn't found or can't be mapped
    */
   protected BaseChunkSVForwardIndexWriter(File file, ChunkCompressionType 
compressionType, int totalDocs,
-      int numDocsPerChunk, int chunkSize, int sizeOfEntry, int version)
+      int numDocsPerChunk, int chunkSize, int sizeOfEntry, int version, 
boolean fixed)
       throws IOException {
-    Preconditions.checkArgument(version == DEFAULT_VERSION || version == 
CURRENT_VERSION);
+    Preconditions.checkArgument(version == DEFAULT_VERSION || version == 
CURRENT_VERSION
+        || (fixed && version == 4));
     _chunkSize = chunkSize;
     _chunkCompressor = ChunkCompressorFactory.getCompressor(compressionType);
     _headerEntryChunkOffsetSize = getHeaderEntryChunkOffsetSize(version);
@@ -87,6 +89,7 @@ public abstract class BaseChunkSVForwardIndexWriter 
implements Closeable {
       case 2:
         return FILE_HEADER_ENTRY_CHUNK_OFFSET_SIZE_V1V2;
       case 3:
+      case 4:
         return FILE_HEADER_ENTRY_CHUNK_OFFSET_SIZE_V3;
       default:
         throw new IllegalStateException("Invalid version: " + version);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteChunkSVForwardIndexWriter.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteChunkSVForwardIndexWriter.java
index 8d9ad7e..7b942b1 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteChunkSVForwardIndexWriter.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteChunkSVForwardIndexWriter.java
@@ -71,8 +71,9 @@ public class FixedByteChunkSVForwardIndexWriter extends 
BaseChunkSVForwardIndexW
   public FixedByteChunkSVForwardIndexWriter(File file, ChunkCompressionType 
compressionType, int totalDocs,
       int numDocsPerChunk, int sizeOfEntry, int writerVersion)
       throws IOException {
-    super(file, compressionType, totalDocs, numDocsPerChunk, (sizeOfEntry * 
numDocsPerChunk), sizeOfEntry,
-        writerVersion);
+    super(file, compressionType, totalDocs, 
normalizeDocsPerChunk(writerVersion, numDocsPerChunk),
+        (sizeOfEntry * normalizeDocsPerChunk(writerVersion, numDocsPerChunk)), 
sizeOfEntry,
+        writerVersion, true);
     _chunkDataOffset = 0;
   }
 
@@ -112,4 +113,12 @@ public class FixedByteChunkSVForwardIndexWriter extends 
BaseChunkSVForwardIndexW
       writeChunk();
     }
   }
+
+  private static int normalizeDocsPerChunk(int version, int numDocsPerChunk) {
+    // V4 uses power of 2 chunk sizes for random access efficiency
+    if (version >= 4 && (numDocsPerChunk & (numDocsPerChunk - 1)) != 0) {
+      return 1 << (32 - Integer.numberOfLeadingZeros(numDocsPerChunk - 1));
+    }
+    return numDocsPerChunk;
+  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java
index f035a62..7e99772 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java
@@ -80,7 +80,7 @@ public class VarByteChunkSVForwardIndexWriter extends 
BaseChunkSVForwardIndexWri
     super(file, compressionType, totalDocs, numDocsPerChunk,
         numDocsPerChunk * (CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE + 
lengthOfLongestEntry),
         // chunkSize
-        lengthOfLongestEntry, writerVersion);
+        lengthOfLongestEntry, writerVersion, false);
 
     _chunkHeaderOffset = 0;
     _chunkHeaderSize = numDocsPerChunk * CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/DefaultIndexReaderProvider.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/DefaultIndexReaderProvider.java
index 79cb55d..a342e85 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/DefaultIndexReaderProvider.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/DefaultIndexReaderProvider.java
@@ -30,6 +30,7 @@ import 
org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitMVFo
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitSVForwardIndexReaderV2;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkMVForwardIndexReader;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
+import 
org.apache.pinot.segment.local.segment.index.readers.forward.FixedBytePower2ChunkSVForwardIndexReader;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReaderV4;
@@ -89,10 +90,12 @@ public class DefaultIndexReaderProvider implements 
IndexReaderProvider {
     } else {
       FieldSpec.DataType storedType = 
columnMetadata.getDataType().getStoredType();
       if (columnMetadata.isSingleValue()) {
+        int version = dataBuffer.getInt(0);
         if (storedType.isFixedWidth()) {
-          return new FixedByteChunkSVForwardIndexReader(dataBuffer, 
storedType);
+          return version >= FixedBytePower2ChunkSVForwardIndexReader.VERSION
+              ? new FixedBytePower2ChunkSVForwardIndexReader(dataBuffer, 
storedType)
+              : new FixedByteChunkSVForwardIndexReader(dataBuffer, storedType);
         }
-        int version = dataBuffer.getInt(0);
         if (version >= VarByteChunkSVForwardIndexWriterV4.VERSION) {
           return new VarByteChunkSVForwardIndexReaderV4(dataBuffer, 
storedType);
         }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkSVForwardIndexReader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkSVForwardIndexReader.java
index 56090ca..dce4a90 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkSVForwardIndexReader.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkSVForwardIndexReader.java
@@ -26,8 +26,6 @@ import 
org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWrit
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
-import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
-import org.apache.pinot.segment.spi.memory.CleanerUtil;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.slf4j.Logger;
@@ -37,8 +35,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Base implementation for chunk-based single-value raw 
(non-dictionary-encoded) forward index reader.
  */
-public abstract class BaseChunkSVForwardIndexReader
-    implements 
ForwardIndexReader<BaseChunkSVForwardIndexReader.ChunkReaderContext> {
+public abstract class BaseChunkSVForwardIndexReader implements 
ForwardIndexReader<ChunkReaderContext> {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(BaseChunkSVForwardIndexReader.class);
 
   protected final PinotDataBuffer _dataBuffer;
@@ -114,7 +111,10 @@ public abstract class BaseChunkSVForwardIndexReader
     if (context.getChunkId() == chunkId) {
       return context.getChunkBuffer();
     }
+    return decompressChunk(chunkId, context);
+  }
 
+  protected ByteBuffer decompressChunk(int chunkId, ChunkReaderContext 
context) {
     int chunkSize;
     long chunkPosition = getChunkPosition(chunkId);
 
@@ -172,45 +172,4 @@ public abstract class BaseChunkSVForwardIndexReader
     // NOTE: DO NOT close the PinotDataBuffer here because it is tracked by 
the caller and might be reused later. The
     // caller is responsible of closing the PinotDataBuffer.
   }
-
-  /**
-   * Context for the chunk-based forward index readers.
-   * <p>Information saved in the context can be used by subsequent reads as 
cache:
-   * <ul>
-   *   <li>
-   *     Chunk Buffer from the previous read. Useful if the subsequent read is 
from the same buffer, as it avoids extra
-   *     chunk decompression.
-   *   </li>
-   *   <li>Id for the chunk</li>
-   * </ul>
-   */
-  public static class ChunkReaderContext implements ForwardIndexReaderContext {
-    private final ByteBuffer _chunkBuffer;
-    private int _chunkId;
-
-    public ChunkReaderContext(int maxChunkSize) {
-      _chunkBuffer = ByteBuffer.allocateDirect(maxChunkSize);
-      _chunkId = -1;
-    }
-
-    public ByteBuffer getChunkBuffer() {
-      return _chunkBuffer;
-    }
-
-    public int getChunkId() {
-      return _chunkId;
-    }
-
-    public void setChunkId(int chunkId) {
-      _chunkId = chunkId;
-    }
-
-    @Override
-    public void close()
-        throws IOException {
-      if (CleanerUtil.UNMAP_SUPPORTED) {
-        CleanerUtil.getCleaner().freeBuffer(_chunkBuffer);
-      }
-    }
-  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/ChunkReaderContext.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/ChunkReaderContext.java
new file mode 100644
index 0000000..1adf685
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/ChunkReaderContext.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.readers.forward;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
+import org.apache.pinot.segment.spi.memory.CleanerUtil;
+
+
+/**
+ * Context for the chunk-based forward index readers.
+ * <p>Information saved in the context can be used by subsequent reads as 
cache:
+ * <ul>
+ *   <li>
+ *     Chunk Buffer from the previous read. Useful if the subsequent read is 
from the same buffer, as it avoids extra
+ *     chunk decompression.
+ *   </li>
+ *   <li>Id for the chunk</li>
+ * </ul>
+ */
+public class ChunkReaderContext implements ForwardIndexReaderContext {
+  private final ByteBuffer _chunkBuffer;
+  private int _chunkId;
+
+  public ChunkReaderContext(int maxChunkSize) {
+    _chunkBuffer = ByteBuffer.allocateDirect(maxChunkSize);
+    _chunkId = -1;
+  }
+
+  public ByteBuffer getChunkBuffer() {
+    return _chunkBuffer;
+  }
+
+  public int getChunkId() {
+    return _chunkId;
+  }
+
+  public void setChunkId(int chunkId) {
+    _chunkId = chunkId;
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    if (CleanerUtil.UNMAP_SUPPORTED) {
+      CleanerUtil.getCleaner().freeBuffer(_chunkBuffer);
+    }
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBytePower2ChunkSVForwardIndexReader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBytePower2ChunkSVForwardIndexReader.java
new file mode 100644
index 0000000..0effd62
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBytePower2ChunkSVForwardIndexReader.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.readers.forward;
+
+import java.nio.ByteBuffer;
+import javax.annotation.Nullable;
+import 
org.apache.pinot.segment.local.io.writer.impl.FixedByteChunkSVForwardIndexWriter;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+/**
+ * Chunk-based single-value raw (non-dictionary-encoded) forward index reader 
for values of fixed length data type (INT,
+ * LONG, FLOAT, DOUBLE).
+ * <p>For data layout, please refer to the documentation for {@link 
FixedByteChunkSVForwardIndexWriter}
+ */
+public final class FixedBytePower2ChunkSVForwardIndexReader extends 
BaseChunkSVForwardIndexReader {
+  public static final int VERSION = 4;
+
+  private final int _shift;
+
+  public FixedBytePower2ChunkSVForwardIndexReader(PinotDataBuffer dataBuffer, 
DataType valueType) {
+    super(dataBuffer, valueType);
+    _shift = Integer.numberOfTrailingZeros(_numDocsPerChunk);
+  }
+
+  @Nullable
+  @Override
+  public ChunkReaderContext createContext() {
+    if (_isCompressed) {
+      return new ChunkReaderContext(_numDocsPerChunk * _valueType.size());
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public int getInt(int docId, ChunkReaderContext context) {
+    if (_isCompressed) {
+      int chunkRowId = docId & (_numDocsPerChunk - 1);
+      ByteBuffer chunkBuffer = getChunkBuffer(docId, context);
+      return chunkBuffer.getInt(chunkRowId * Integer.BYTES);
+    } else {
+      return _rawData.getInt(docId * Integer.BYTES);
+    }
+  }
+
+  @Override
+  public long getLong(int docId, ChunkReaderContext context) {
+    if (_isCompressed) {
+      int chunkRowId = docId & (_numDocsPerChunk - 1);
+      ByteBuffer chunkBuffer = getChunkBuffer(docId, context);
+      return chunkBuffer.getLong(chunkRowId * Long.BYTES);
+    } else {
+      return _rawData.getLong(docId * Long.BYTES);
+    }
+  }
+
+  @Override
+  public float getFloat(int docId, ChunkReaderContext context) {
+    if (_isCompressed) {
+      int chunkRowId = docId & (_numDocsPerChunk - 1);
+      ByteBuffer chunkBuffer = getChunkBuffer(docId, context);
+      return chunkBuffer.getFloat(chunkRowId * Float.BYTES);
+    } else {
+      return _rawData.getFloat(docId * Float.BYTES);
+    }
+  }
+
+  @Override
+  public double getDouble(int docId, ChunkReaderContext context) {
+    if (_isCompressed) {
+      int chunkRowId = docId & (_numDocsPerChunk - 1);
+      ByteBuffer chunkBuffer = getChunkBuffer(docId, context);
+      return chunkBuffer.getDouble(chunkRowId * Double.BYTES);
+    } else {
+      return _rawData.getDouble(docId * Double.BYTES);
+    }
+  }
+
+  /**
+   * Helper method to return the chunk buffer that contains the value at the 
given document id.
+   * <ul>
+   *   <li> If the chunk already exists in the reader context, returns the 
same. </li>
+   *   <li> Otherwise, loads the chunk for the row, and sets it in the reader 
context. </li>
+   * </ul>
+   * @param docId Document id
+   * @param context Reader context
+   * @return Chunk for the row
+   */
+  protected ByteBuffer getChunkBuffer(int docId, ChunkReaderContext context) {
+    int chunkId = docId >>> _shift;
+    if (context.getChunkId() == chunkId) {
+      return context.getChunkBuffer();
+    }
+    return decompressChunk(chunkId, context);
+  }
+}
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java
index e4b6c15..fb49dce 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java
@@ -30,7 +30,7 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.commons.io.FileUtils;
 import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueFixedByteRawIndexCreator;
-import 
org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader.ChunkReaderContext;
+import 
org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkMVForwardIndexReader;
 import org.apache.pinot.segment.spi.V1Constants.Indexes;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
index f5ad70d..fb645f5 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
@@ -27,7 +27,7 @@ import java.util.List;
 import java.util.Random;
 import org.apache.commons.io.FileUtils;
 import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator;
-import 
org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader.ChunkReaderContext;
+import 
org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader;
 import org.apache.pinot.segment.spi.V1Constants.Indexes;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java
index 890292e..af99c5a 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java
@@ -31,7 +31,7 @@ import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.pinot.common.utils.StringUtil;
 import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
-import 
org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader;
+import 
org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
@@ -169,11 +169,9 @@ public class RawIndexCreatorTest {
   public void testStringRawIndexCreator()
       throws Exception {
     PinotDataBuffer indexBuffer = getIndexBufferForColumn(STRING_COLUMN);
-    try (VarByteChunkSVForwardIndexReader rawIndexReader = new 
VarByteChunkSVForwardIndexReader(
-        indexBuffer,
+    try (VarByteChunkSVForwardIndexReader rawIndexReader = new 
VarByteChunkSVForwardIndexReader(indexBuffer,
         DataType.STRING);
-        BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext = 
rawIndexReader
-            .createContext()) {
+        ChunkReaderContext readerContext = rawIndexReader.createContext()) {
       _recordReader.rewind();
       for (int row = 0; row < NUM_ROWS; row++) {
         GenericRow expectedRow = _recordReader.next();
@@ -193,9 +191,8 @@ public class RawIndexCreatorTest {
       throws Exception {
     PinotDataBuffer indexBuffer = getIndexBufferForColumn(column);
     try (FixedByteChunkSVForwardIndexReader rawIndexReader = new 
FixedByteChunkSVForwardIndexReader(
-        indexBuffer,
-        dataType); BaseChunkSVForwardIndexReader.ChunkReaderContext 
readerContext = rawIndexReader
-        .createContext()) {
+        indexBuffer, dataType);
+        ChunkReaderContext readerContext = rawIndexReader.createContext()) {
       _recordReader.rewind();
       for (int row = 0; row < NUM_ROWS; row++) {
         GenericRow expectedRow = _recordReader.next();
@@ -216,7 +213,7 @@ public class RawIndexCreatorTest {
     try (VarByteChunkMVForwardIndexReader rawIndexReader = new 
VarByteChunkMVForwardIndexReader(
         indexBuffer,
         DataType.STRING);
-        BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext = 
rawIndexReader
+        ChunkReaderContext readerContext = rawIndexReader
             .createContext()) {
       _recordReader.rewind();
       int maxNumberOfMultiValues = _segmentDirectory.getSegmentMetadata()
@@ -250,7 +247,7 @@ public class RawIndexCreatorTest {
     PinotDataBuffer indexBuffer = getIndexBufferForColumn(BYTES_MV_COLUMN);
     try (VarByteChunkMVForwardIndexReader rawIndexReader = new 
VarByteChunkMVForwardIndexReader(
         indexBuffer, DataType.BYTES);
-        BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext = 
rawIndexReader
+        ChunkReaderContext readerContext = rawIndexReader
             .createContext()) {
       _recordReader.rewind();
       int maxNumberOfMultiValues = _segmentDirectory.getSegmentMetadata()
@@ -373,8 +370,8 @@ public class RawIndexCreatorTest {
    * @param docId Document id
    * @return Value read from index
    */
-  private Object readValueFromIndex(FixedByteChunkSVForwardIndexReader 
rawIndexReader,
-      BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext, int 
docId) {
+  private Object readValueFromIndex(FixedByteChunkSVForwardIndexReader 
rawIndexReader, ChunkReaderContext readerContext,
+      int docId) {
     switch (rawIndexReader.getValueType()) {
       case INT:
         return rawIndexReader.getInt(docId, readerContext);
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java
index 9afe093..0e2386b 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedByteChunkSVForwardIndexTest.java
@@ -20,16 +20,20 @@ package 
org.apache.pinot.segment.local.segment.index.forward;
 
 import java.io.File;
 import java.net.URL;
+import java.util.Arrays;
 import java.util.Random;
+import java.util.stream.IntStream;
 import org.apache.commons.io.FileUtils;
-import 
org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
 import 
org.apache.pinot.segment.local.io.writer.impl.FixedByteChunkSVForwardIndexWriter;
-import 
org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader;
+import 
org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
+import 
org.apache.pinot.segment.local.segment.index.readers.forward.FixedBytePower2ChunkSVForwardIndexReader;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.testng.Assert;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 
@@ -49,47 +53,16 @@ public class FixedByteChunkSVForwardIndexTest {
   private static final String TEST_FILE = System.getProperty("java.io.tmpdir") 
+ File.separator + "FixedByteSVRTest";
   private static final Random RANDOM = new Random();
 
-  @Test
-  public void testWithCompression()
-      throws Exception {
-    ChunkCompressionType compressionType = ChunkCompressionType.SNAPPY;
-    testInt(compressionType);
-    testLong(compressionType);
-    testFloat(compressionType);
-    testDouble(compressionType);
-  }
-
-  @Test
-  public void testWithoutCompression()
-      throws Exception {
-    ChunkCompressionType compressionType = ChunkCompressionType.PASS_THROUGH;
-    testInt(compressionType);
-    testLong(compressionType);
-    testFloat(compressionType);
-    testDouble(compressionType);
-  }
-
-  @Test
-  public void testWithZstandardCompression()
-      throws Exception {
-    ChunkCompressionType compressionType = ChunkCompressionType.ZSTANDARD;
-    testInt(compressionType);
-    testLong(compressionType);
-    testFloat(compressionType);
-    testDouble(compressionType);
-  }
-
-  @Test
-  public void testWithLZ4Compression()
-      throws Exception {
-    ChunkCompressionType compressionType = ChunkCompressionType.LZ4;
-    testInt(compressionType);
-    testLong(compressionType);
-    testFloat(compressionType);
-    testDouble(compressionType);
+  @DataProvider(name = "combinations")
+  public static Object[][] combinations() {
+    return Arrays.stream(ChunkCompressionType.values())
+        .flatMap(chunkCompressionType -> IntStream.of(2, 3, 4)
+            .mapToObj(version -> new Object[]{chunkCompressionType, version}))
+        .toArray(Object[][]::new);
   }
 
-  public void testInt(ChunkCompressionType compressionType)
+  @Test(dataProvider = "combinations")
+  public void testInt(ChunkCompressionType compressionType, int version)
       throws Exception {
     int[] expected = new int[NUM_VALUES];
     for (int i = 0; i < NUM_VALUES; i++) {
@@ -103,24 +76,28 @@ public class FixedByteChunkSVForwardIndexTest {
 
     // test both formats (4-byte chunk offsets and 8-byte chunk offsets)
     try (FixedByteChunkSVForwardIndexWriter fourByteOffsetWriter = new 
FixedByteChunkSVForwardIndexWriter(
-        outFileFourByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, 
Integer.BYTES,
-        BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
+        outFileFourByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, 
Integer.BYTES, version);
         FixedByteChunkSVForwardIndexWriter eightByteOffsetWriter = new 
FixedByteChunkSVForwardIndexWriter(
-            outFileEightByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, 
Integer.BYTES,
-            BaseChunkSVForwardIndexWriter.CURRENT_VERSION)) {
+            outFileEightByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, 
Integer.BYTES, version)) {
       for (int value : expected) {
         fourByteOffsetWriter.putInt(value);
         eightByteOffsetWriter.putInt(value);
       }
     }
 
-    try (FixedByteChunkSVForwardIndexReader fourByteOffsetReader = new 
FixedByteChunkSVForwardIndexReader(
-        PinotDataBuffer.mapReadOnlyBigEndianFile(outFileFourByte), 
DataType.INT);
-        BaseChunkSVForwardIndexReader.ChunkReaderContext 
fourByteOffsetReaderContext = fourByteOffsetReader
+    try (ForwardIndexReader<ChunkReaderContext> fourByteOffsetReader = version 
>= 4
+        ? new FixedBytePower2ChunkSVForwardIndexReader(
+        PinotDataBuffer.mapReadOnlyBigEndianFile(outFileFourByte), 
DataType.INT)
+        : new FixedByteChunkSVForwardIndexReader(
+            PinotDataBuffer.mapReadOnlyBigEndianFile(outFileFourByte), 
DataType.INT);
+        ChunkReaderContext fourByteOffsetReaderContext = fourByteOffsetReader
             .createContext();
-        FixedByteChunkSVForwardIndexReader eightByteOffsetReader = new 
FixedByteChunkSVForwardIndexReader(
-            PinotDataBuffer.mapReadOnlyBigEndianFile(outFileEightByte), 
DataType.INT);
-        BaseChunkSVForwardIndexReader.ChunkReaderContext 
eightByteOffsetReaderContext = eightByteOffsetReader
+        ForwardIndexReader<ChunkReaderContext> eightByteOffsetReader = version 
>= 4
+            ? new FixedBytePower2ChunkSVForwardIndexReader(
+            PinotDataBuffer.mapReadOnlyBigEndianFile(outFileEightByte), 
DataType.INT)
+            : new FixedByteChunkSVForwardIndexReader(
+                PinotDataBuffer.mapReadOnlyBigEndianFile(outFileEightByte), 
DataType.INT);
+        ChunkReaderContext eightByteOffsetReaderContext = eightByteOffsetReader
             .createContext()) {
       for (int i = 0; i < NUM_VALUES; i++) {
         Assert.assertEquals(fourByteOffsetReader.getInt(i, 
fourByteOffsetReaderContext), expected[i]);
@@ -132,7 +109,8 @@ public class FixedByteChunkSVForwardIndexTest {
     FileUtils.deleteQuietly(outFileEightByte);
   }
 
-  public void testLong(ChunkCompressionType compressionType)
+  @Test(dataProvider = "combinations")
+  public void testLong(ChunkCompressionType compressionType, int version)
       throws Exception {
     long[] expected = new long[NUM_VALUES];
     for (int i = 0; i < NUM_VALUES; i++) {
@@ -146,24 +124,28 @@ public class FixedByteChunkSVForwardIndexTest {
 
     // test both formats (4-byte chunk offsets and 8-byte chunk offsets)
     try (FixedByteChunkSVForwardIndexWriter fourByteOffsetWriter = new 
FixedByteChunkSVForwardIndexWriter(
-        outFileFourByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, 
Long.BYTES,
-        BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
+        outFileFourByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, 
Long.BYTES, version);
         FixedByteChunkSVForwardIndexWriter eightByteOffsetWriter = new 
FixedByteChunkSVForwardIndexWriter(
-            outFileEightByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, 
Long.BYTES,
-            BaseChunkSVForwardIndexWriter.CURRENT_VERSION)) {
+            outFileEightByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, 
Long.BYTES, version)) {
       for (long value : expected) {
         fourByteOffsetWriter.putLong(value);
         eightByteOffsetWriter.putLong(value);
       }
     }
 
-    try (FixedByteChunkSVForwardIndexReader fourByteOffsetReader = new 
FixedByteChunkSVForwardIndexReader(
-        PinotDataBuffer.mapReadOnlyBigEndianFile(outFileFourByte), 
DataType.LONG);
-        BaseChunkSVForwardIndexReader.ChunkReaderContext 
fourByteOffsetReaderContext = fourByteOffsetReader
+    try (ForwardIndexReader<ChunkReaderContext> fourByteOffsetReader = version 
>= 4
+        ? new FixedBytePower2ChunkSVForwardIndexReader(
+        PinotDataBuffer.mapReadOnlyBigEndianFile(outFileFourByte), 
DataType.LONG)
+        : new FixedByteChunkSVForwardIndexReader(
+            PinotDataBuffer.mapReadOnlyBigEndianFile(outFileFourByte), 
DataType.LONG);
+        ChunkReaderContext fourByteOffsetReaderContext = fourByteOffsetReader
             .createContext();
-        FixedByteChunkSVForwardIndexReader eightByteOffsetReader = new 
FixedByteChunkSVForwardIndexReader(
-            PinotDataBuffer.mapReadOnlyBigEndianFile(outFileEightByte), 
DataType.LONG);
-        BaseChunkSVForwardIndexReader.ChunkReaderContext 
eightByteOffsetReaderContext = eightByteOffsetReader
+        ForwardIndexReader<ChunkReaderContext> eightByteOffsetReader = version 
>= 4
+            ? new FixedBytePower2ChunkSVForwardIndexReader(
+            PinotDataBuffer.mapReadOnlyBigEndianFile(outFileEightByte), 
DataType.LONG)
+            : new FixedByteChunkSVForwardIndexReader(
+                PinotDataBuffer.mapReadOnlyBigEndianFile(outFileEightByte), 
DataType.LONG);
+        ChunkReaderContext eightByteOffsetReaderContext = eightByteOffsetReader
             .createContext()) {
       for (int i = 0; i < NUM_VALUES; i++) {
         Assert.assertEquals(fourByteOffsetReader.getLong(i, 
fourByteOffsetReaderContext), expected[i]);
@@ -175,7 +157,8 @@ public class FixedByteChunkSVForwardIndexTest {
     FileUtils.deleteQuietly(outFileEightByte);
   }
 
-  public void testFloat(ChunkCompressionType compressionType)
+  @Test(dataProvider = "combinations")
+  public void testFloat(ChunkCompressionType compressionType, int version)
       throws Exception {
     float[] expected = new float[NUM_VALUES];
     for (int i = 0; i < NUM_VALUES; i++) {
@@ -189,24 +172,28 @@ public class FixedByteChunkSVForwardIndexTest {
 
     // test both formats (4-byte chunk offsets and 8-byte chunk offsets)
     try (FixedByteChunkSVForwardIndexWriter fourByteOffsetWriter = new 
FixedByteChunkSVForwardIndexWriter(
-        outFileFourByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, 
Float.BYTES,
-        BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
+        outFileFourByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, 
Float.BYTES, version);
         FixedByteChunkSVForwardIndexWriter eightByteOffsetWriter = new 
FixedByteChunkSVForwardIndexWriter(
-            outFileEightByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, 
Float.BYTES,
-            BaseChunkSVForwardIndexWriter.CURRENT_VERSION)) {
+            outFileEightByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, 
Float.BYTES, version)) {
       for (float value : expected) {
         fourByteOffsetWriter.putFloat(value);
         eightByteOffsetWriter.putFloat(value);
       }
     }
 
-    try (FixedByteChunkSVForwardIndexReader fourByteOffsetReader = new 
FixedByteChunkSVForwardIndexReader(
-        PinotDataBuffer.mapReadOnlyBigEndianFile(outFileFourByte), 
DataType.FLOAT);
-        BaseChunkSVForwardIndexReader.ChunkReaderContext 
fourByteOffsetReaderContext = fourByteOffsetReader
+    try (ForwardIndexReader<ChunkReaderContext> fourByteOffsetReader = version 
>= 4
+        ? new FixedBytePower2ChunkSVForwardIndexReader(
+        PinotDataBuffer.mapReadOnlyBigEndianFile(outFileFourByte), 
DataType.FLOAT)
+        : new FixedByteChunkSVForwardIndexReader(
+            PinotDataBuffer.mapReadOnlyBigEndianFile(outFileFourByte), 
DataType.FLOAT);
+        ChunkReaderContext fourByteOffsetReaderContext = fourByteOffsetReader
             .createContext();
-        FixedByteChunkSVForwardIndexReader eightByteOffsetReader = new 
FixedByteChunkSVForwardIndexReader(
-            PinotDataBuffer.mapReadOnlyBigEndianFile(outFileEightByte), 
DataType.FLOAT);
-        BaseChunkSVForwardIndexReader.ChunkReaderContext 
eightByteOffsetReaderContext = eightByteOffsetReader
+        ForwardIndexReader<ChunkReaderContext> eightByteOffsetReader = version 
>= 4
+            ? new 
FixedBytePower2ChunkSVForwardIndexReader(PinotDataBuffer.mapReadOnlyBigEndianFile(outFileEightByte),
+            DataType.FLOAT)
+            : new FixedByteChunkSVForwardIndexReader(
+                PinotDataBuffer.mapReadOnlyBigEndianFile(outFileEightByte), 
DataType.FLOAT);
+        ChunkReaderContext eightByteOffsetReaderContext = eightByteOffsetReader
             .createContext()) {
       for (int i = 0; i < NUM_VALUES; i++) {
         Assert.assertEquals(fourByteOffsetReader.getFloat(i, 
fourByteOffsetReaderContext), expected[i]);
@@ -218,7 +205,8 @@ public class FixedByteChunkSVForwardIndexTest {
     FileUtils.deleteQuietly(outFileEightByte);
   }
 
-  public void testDouble(ChunkCompressionType compressionType)
+  @Test(dataProvider = "combinations")
+  public void testDouble(ChunkCompressionType compressionType, int version)
       throws Exception {
     double[] expected = new double[NUM_VALUES];
     for (int i = 0; i < NUM_VALUES; i++) {
@@ -232,24 +220,28 @@ public class FixedByteChunkSVForwardIndexTest {
 
     // test both formats (4-byte chunk offsets and 8-byte chunk offsets)
     try (FixedByteChunkSVForwardIndexWriter fourByteOffsetWriter = new 
FixedByteChunkSVForwardIndexWriter(
-        outFileFourByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, 
Double.BYTES,
-        BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
+        outFileFourByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, 
Double.BYTES, version);
         FixedByteChunkSVForwardIndexWriter eightByteOffsetWriter = new 
FixedByteChunkSVForwardIndexWriter(
-            outFileEightByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, 
Double.BYTES,
-            BaseChunkSVForwardIndexWriter.CURRENT_VERSION)) {
+            outFileEightByte, compressionType, NUM_VALUES, NUM_DOCS_PER_CHUNK, 
Double.BYTES, version)) {
       for (double value : expected) {
         fourByteOffsetWriter.putDouble(value);
         eightByteOffsetWriter.putDouble(value);
       }
     }
 
-    try (FixedByteChunkSVForwardIndexReader fourByteOffsetReader = new 
FixedByteChunkSVForwardIndexReader(
-        PinotDataBuffer.mapReadOnlyBigEndianFile(outFileFourByte), 
DataType.DOUBLE);
-        BaseChunkSVForwardIndexReader.ChunkReaderContext 
fourByteOffsetReaderContext = fourByteOffsetReader
+    try (ForwardIndexReader<ChunkReaderContext> fourByteOffsetReader = version 
>= 4
+        ? new FixedBytePower2ChunkSVForwardIndexReader(
+        PinotDataBuffer.mapReadOnlyBigEndianFile(outFileFourByte), 
DataType.DOUBLE)
+        : new FixedByteChunkSVForwardIndexReader(
+            PinotDataBuffer.mapReadOnlyBigEndianFile(outFileFourByte), 
DataType.DOUBLE);
+        ChunkReaderContext fourByteOffsetReaderContext = fourByteOffsetReader
             .createContext();
-        FixedByteChunkSVForwardIndexReader eightByteOffsetReader = new 
FixedByteChunkSVForwardIndexReader(
-            PinotDataBuffer.mapReadOnlyBigEndianFile(outFileEightByte), 
DataType.DOUBLE);
-        BaseChunkSVForwardIndexReader.ChunkReaderContext 
eightByteOffsetReaderContext = eightByteOffsetReader
+        ForwardIndexReader<ChunkReaderContext> eightByteOffsetReader = version 
>= 4
+            ? new FixedBytePower2ChunkSVForwardIndexReader(
+            PinotDataBuffer.mapReadOnlyBigEndianFile(outFileEightByte), 
DataType.DOUBLE)
+            : new FixedByteChunkSVForwardIndexReader(
+                PinotDataBuffer.mapReadOnlyBigEndianFile(outFileEightByte), 
DataType.DOUBLE);
+        ChunkReaderContext eightByteOffsetReaderContext = eightByteOffsetReader
             .createContext()) {
       for (int i = 0; i < NUM_VALUES; i++) {
         Assert.assertEquals(fourByteOffsetReader.getDouble(i, 
fourByteOffsetReaderContext), expected[i]);
@@ -290,7 +282,7 @@ public class FixedByteChunkSVForwardIndexTest {
     File file = new File(resource.getFile());
     try (FixedByteChunkSVForwardIndexReader reader = new 
FixedByteChunkSVForwardIndexReader(
         PinotDataBuffer.mapReadOnlyBigEndianFile(file), DataType.DOUBLE);
-        BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext = 
reader.createContext()) {
+        ChunkReaderContext readerContext = reader.createContext()) {
       for (int i = 0; i < numDocs; i++) {
         double actual = reader.getDouble(i, readerContext);
         Assert.assertEquals(actual, i + startValue);
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java
index 3f7f1c0..f6b8f46 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java
@@ -27,7 +27,7 @@ import org.apache.commons.lang.RandomStringUtils;
 import 
org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
 import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
 import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueVarByteRawIndexCreator;
-import 
org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader;
+import 
org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext;
 import 
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
@@ -117,11 +117,11 @@ public class VarByteChunkSVForwardIndexTest {
 
     try (VarByteChunkSVForwardIndexReader fourByteOffsetReader = new 
VarByteChunkSVForwardIndexReader(
         PinotDataBuffer.mapReadOnlyBigEndianFile(outFileFourByte), 
DataType.STRING);
-        BaseChunkSVForwardIndexReader.ChunkReaderContext 
fourByteOffsetReaderContext = fourByteOffsetReader
+        ChunkReaderContext fourByteOffsetReaderContext = fourByteOffsetReader
             .createContext();
         VarByteChunkSVForwardIndexReader eightByteOffsetReader = new 
VarByteChunkSVForwardIndexReader(
             PinotDataBuffer.mapReadOnlyBigEndianFile(outFileEightByte), 
DataType.STRING);
-        BaseChunkSVForwardIndexReader.ChunkReaderContext 
eightByteOffsetReaderContext = eightByteOffsetReader
+        ChunkReaderContext eightByteOffsetReaderContext = eightByteOffsetReader
             .createContext()) {
       for (int i = 0; i < NUM_ENTRIES; i++) {
         Assert.assertEquals(fourByteOffsetReader.getString(i, 
fourByteOffsetReaderContext), expected[i]);
@@ -164,7 +164,7 @@ public class VarByteChunkSVForwardIndexTest {
     File file = new File(resource.getFile());
     try (VarByteChunkSVForwardIndexReader reader = new 
VarByteChunkSVForwardIndexReader(
         PinotDataBuffer.mapReadOnlyBigEndianFile(file), DataType.STRING);
-        BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext = 
reader.createContext()) {
+        ChunkReaderContext readerContext = reader.createContext()) {
       for (int i = 0; i < numDocs; i++) {
         String actual = reader.getString(i, readerContext);
         Assert.assertEquals(actual, data[i % data.length]);
@@ -237,7 +237,7 @@ public class VarByteChunkSVForwardIndexTest {
 
     PinotDataBuffer buffer = PinotDataBuffer.mapReadOnlyBigEndianFile(outFile);
     try (VarByteChunkSVForwardIndexReader reader = new 
VarByteChunkSVForwardIndexReader(buffer, DataType.STRING);
-        BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext = 
reader.createContext()) {
+        ChunkReaderContext readerContext = reader.createContext()) {
       for (int i = 0; i < numDocs; i++) {
         Assert.assertEquals(reader.getString(i, readerContext), expected[i]);
       }
@@ -257,7 +257,7 @@ public class VarByteChunkSVForwardIndexTest {
     }
 
     try (VarByteChunkSVForwardIndexReader reader = new 
VarByteChunkSVForwardIndexReader(buffer, DataType.STRING);
-        BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext = 
reader.createContext()) {
+        ChunkReaderContext readerContext = reader.createContext()) {
       for (int i = 0; i < numDocs; i++) {
         Assert.assertEquals(reader.getString(i, readerContext), expected[i]);
       }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to