This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b40dd99  Faster bit unpacking (Part 1) (#5409)
b40dd99 is described below

commit b40dd992874f9bc38b911870e041a8f6e24c3776
Author: Sidd <siddharthteo...@gmail.com>
AuthorDate: Fri May 29 14:08:03 2020 -0700

    Faster bit unpacking (Part 1) (#5409)
    
    * Faster bit unpacking
    
    * Add unit tests
    
    * new
    
    * Improved degree of vectorization and more tests
    
    * fix build
    
    * cleanup
    
    * docs
    
    * change file name
    
    * address review comments and add more benchmarks
    
    Co-authored-by: Siddharth Teotia <steo...@steotia-mn1.linkedin.biz>
---
 .../core/io/util/FixedBitIntReaderWriterV2.java    | 149 +++++
 .../pinot/core/io/util/PinotDataBitSetV2.java      | 729 +++++++++++++++++++++
 .../pinot/core/io/util/PinotDataBitSetV2Test.java  | 438 +++++++++++++
 .../pinot/perf/BenchmarkPinotDataBitSet.java       | 546 +++++++++++++++
 4 files changed, 1862 insertions(+)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/io/util/FixedBitIntReaderWriterV2.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/io/util/FixedBitIntReaderWriterV2.java
new file mode 100644
index 0000000..55c8c94
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/io/util/FixedBitIntReaderWriterV2.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.io.util;
+
+import com.google.common.base.Preconditions;
+import java.io.Closeable;
+import java.io.IOException;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+
+
+public final class FixedBitIntReaderWriterV2 implements Closeable {
+  private PinotDataBitSetV2 _dataBitSet;
+
+  public FixedBitIntReaderWriterV2(PinotDataBuffer dataBuffer, int numValues, 
int numBitsPerValue) {
+    Preconditions
+        .checkState(dataBuffer.size() == (int) (((long) numValues * 
numBitsPerValue + Byte.SIZE - 1) / Byte.SIZE));
+    _dataBitSet = PinotDataBitSetV2.createBitSet(dataBuffer, numBitsPerValue);
+  }
+
+  /**
+   * Read dictionaryId for a particular docId
+   * @param index docId to get the dictionaryId for
+   * @return dictionaryId
+   */
+  public int readInt(int index) {
+    return _dataBitSet.readInt(index);
+  }
+
+  /**
+   * Array based API to read dictionaryIds for a contiguous
+   * range of docIds starting at startDocId for a given length
+   * @param startDocId docId range start
+   * @param length length of contiguous docId range
+   * @param buffer out buffer to read dictionaryIds into
+   */
+  public void readInt(int startDocId, int length, int[] buffer) {
+    _dataBitSet.readInt(startDocId, length, buffer);
+  }
+
+  /**
+   * Array based API to read dictionaryIds for an array of docIds
+   * which are monotonically increasing but not necessarily contiguous.
+   * The difference between this and previous array based API {@link 
#readInt(int, int, int[])}
+   * is that unlike the other API, we are provided an array of docIds.
+   * So even though the docIds in docIds[] array are monotonically increasing,
+   * they may not necessarily be contiguous. They can have gaps.
+   *
+   * {@link PinotDataBitSetV2} implements efficient bulk contiguous API
+   * {@link PinotDataBitSetV2#readInt(long, int, int[])}
+   * to read dictionaryIds for a contiguous range of docIds represented
+   * by startDocId and length.
+   *
+   * This API although works on docIds with gaps, it still tries to
+   * leverage the underlying bulk contiguous API as much as possible to
+   * get benefits of vectorization.
+   *
+   * For a given docIds[] array, we determine if we should use the
+   * bulk contiguous API or not by checking if the length of the array
+   * is >= 50% of actual docIdRange (lastDocId - firstDocId + 1). This
+   * sort of gives a very rough idea of the gaps in docIds. We will benefit
+   * from bulk contiguous read if the gaps are narrow implying fewer dictIds
+   * unpacked as part of contiguous read will have to be thrown away/ignored.
+   * If the gaps are wide, a higher number of dictIds will be thrown away
+   * before we construct the out array
+   *
+   * This method of determining if bulk contiguous should be used or not
+   * is inaccurate since it is solely dependent on the first and
+   * last docId. However, getting an exact idea of the gaps in docIds[]
+   * array will first require a single pass through the array to compute
+   * the deviations between each docId and then take mean/stddev of that.
+   * This will be expensive as it requires pre-processing.
+   *
+   * To increase the probability of using the bulk contiguous API, we make
+   * this decision for every fixed-size chunk of docIds[] array.
+   *
+   * @param docIds array of docIds to read the dictionaryIds for
+   * @param docIdStartIndex start index in docIds array
+   * @param docIdLength length to process in docIds array
+   * @param values out array to store the dictionaryIds into
+   * @param valuesStartIndex start index in values array
+   */
+  public void readValues(int[] docIds, int docIdStartIndex, int docIdLength, 
int[] values, int valuesStartIndex) {
+    int bulkReadChunks = docIdLength / 
PinotDataBitSetV2.MAX_VALUES_UNPACKED_SINGLE_ALIGNED_READ;
+    int remainingChunk = docIdLength % 
PinotDataBitSetV2.MAX_VALUES_UNPACKED_SINGLE_ALIGNED_READ;
+    int docIdEndIndex;
+    while (bulkReadChunks > 0) {
+      docIdEndIndex = docIdStartIndex + 
PinotDataBitSetV2.MAX_VALUES_UNPACKED_SINGLE_ALIGNED_READ - 1;
+      if (shouldBulkRead(docIds, docIdStartIndex, docIdEndIndex)) {
+        // use the bulk API. it takes care of populating the values array 
correctly
+        // by throwing away the extra dictIds
+        _dataBitSet.readInt(docIds, docIdStartIndex, 
PinotDataBitSetV2.MAX_VALUES_UNPACKED_SINGLE_ALIGNED_READ, values, 
valuesStartIndex);
+        valuesStartIndex += 
PinotDataBitSetV2.MAX_VALUES_UNPACKED_SINGLE_ALIGNED_READ;
+      } else {
+        // use the single read API
+        for (int i = docIdStartIndex; i <= docIdEndIndex; i++) {
+          values[valuesStartIndex++] = _dataBitSet.readInt(docIds[i]);
+        }
+      }
+      docIdStartIndex += 
PinotDataBitSetV2.MAX_VALUES_UNPACKED_SINGLE_ALIGNED_READ;
+      bulkReadChunks--;
+      docIdLength -= PinotDataBitSetV2.MAX_VALUES_UNPACKED_SINGLE_ALIGNED_READ;
+    }
+    if (remainingChunk > 0) {
+      // use the single read API
+      docIdEndIndex = docIdStartIndex + docIdLength - 1;
+      for (int i = docIdStartIndex; i <= docIdEndIndex; i++) {
+        values[valuesStartIndex++] = _dataBitSet.readInt(docIds[i]);
+      }
+    }
+  }
+
+  private boolean shouldBulkRead(int[] docIds, int startIndex, int endIndex) {
+    int numDocsToRead = endIndex - startIndex + 1;
+    int docIdRange = docIds[endIndex] - docIds[startIndex] + 1;
+    return numDocsToRead >= ((double)docIdRange * 0.5);
+  }
+
+  public void writeInt(int index, int value) {
+    _dataBitSet.writeInt(index, value);
+  }
+
+  public void writeInt(int startIndex, int length, int[] values) {
+    _dataBitSet.writeInt(startIndex, length, values);
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    if (_dataBitSet != null) {
+      _dataBitSet.close();
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/io/util/PinotDataBitSetV2.java 
b/pinot-core/src/main/java/org/apache/pinot/core/io/util/PinotDataBitSetV2.java
new file mode 100644
index 0000000..978ae38
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/io/util/PinotDataBitSetV2.java
@@ -0,0 +1,729 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.io.util;
+
+import java.io.Closeable;
+import java.io.IOException;
+import org.apache.pinot.core.plan.DocIdSetPlanNode;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+
+
+public abstract class PinotDataBitSetV2 implements Closeable {
+  private static final int BYTE_MASK = 0xFF;
+  static final int MAX_VALUES_UNPACKED_SINGLE_ALIGNED_READ = 16; // comes from 
2-bit encoding
+
+  private static final ThreadLocal<int[]> THREAD_LOCAL_DICT_IDS =
+      ThreadLocal.withInitial(() -> new 
int[DocIdSetPlanNode.MAX_DOC_PER_CALL]);
+
+  protected PinotDataBuffer _dataBuffer;
+  protected int _numBitsPerValue;
+
+  /**
+   * Decode integers starting at a given index. This is efficient
+   * because of simplified bitmath.
+   * @param index docId
+   * @return unpacked integer
+   */
+  public abstract int readInt(long index);
+
+  /**
+   * Decode integers for a contiguous range of indexes represented by 
startIndex
+   * and length. This uses vectorization as much as possible for all the 
aligned
+   * reads and also takes care of the small byte-sized window of unaligned 
read.
+   * @param startIndex start docId
+   * @param length length
+   * @param out out array to store the unpacked integers
+   */
+  public abstract void readInt(long startIndex, int length, int[] out);
+
+  /**
+   * Decode integers for an array of indexes which is not necessarily
+   * contiguous. So there could be gaps in the array:
+   * e.g: [1, 3, 7, 9, 11, 12]
+   * The actual read is done by the previous API since that is efficient
+   * as it exploits contiguity and uses vectorization. However, since
+   * the out[] array has to be correctly populated with the unpacked integer
+   * for each index, a post-processing step is needed after the bulk contiguous
+   * read to correctly set the unpacked integer into the out array throwing 
away
+   * the unnecessary values decoded as part of contiguous read
+   * @param docIds index array
+   * @param docIdsStartIndex starting index in the docIds array
+   * @param length length to read (number of docIds to read in the array)
+   * @param out out array to store the decoded integers
+   * @param outpos starting index in the out array
+   */
+  public void readInt(int[] docIds, int docIdsStartIndex, int length, int[] 
out, int outpos) {
+    int startDocId = docIds[docIdsStartIndex];
+    int endDocId = docIds[docIdsStartIndex + length - 1];
+    int[] dictIds = THREAD_LOCAL_DICT_IDS.get();
+    // do a contiguous bulk read
+    readInt(startDocId, endDocId - startDocId + 1, dictIds);
+    out[outpos] = dictIds[0];
+    // set the unpacked integer correctly. this is needed since there could
+    // be gaps and some decoded values may have to be thrown/ignored.
+    for (int i = 1; i < length; i++) {
+      out[outpos + i] = dictIds[docIds[docIdsStartIndex + i] - startDocId];
+    }
+  }
+
+  public static PinotDataBitSetV2 createBitSet(PinotDataBuffer 
pinotDataBuffer, int numBitsPerValue) {
+    switch (numBitsPerValue) {
+      case 1:
+        return new Bit1Encoded(pinotDataBuffer, numBitsPerValue);
+      case 2:
+        return new Bit2Encoded(pinotDataBuffer, numBitsPerValue);
+      case 4:
+        return new Bit4Encoded(pinotDataBuffer, numBitsPerValue);
+      case 8:
+        return new Bit8Encoded(pinotDataBuffer, numBitsPerValue);
+      case 16:
+        return new Bit16Encoded(pinotDataBuffer, numBitsPerValue);
+      case 32:
+        return new RawInt(pinotDataBuffer, numBitsPerValue);
+      default:
+        throw new UnsupportedOperationException(numBitsPerValue + "not 
supported by PinotDataBitSetV2");
+    }
+  }
+
+  public static class Bit1Encoded extends PinotDataBitSetV2 {
+    Bit1Encoded(PinotDataBuffer dataBuffer, int numBits) {
+      _dataBuffer = dataBuffer;
+      _numBitsPerValue = numBits;
+    }
+
+    @Override
+    public int readInt(long index) {
+      long bitOffset = index * _numBitsPerValue;
+      long byteOffset = bitOffset / Byte.SIZE;
+      int val = (int)_dataBuffer.getByte(byteOffset) & 0xff;
+      bitOffset = bitOffset & 7;
+      return  (val >>> (7 - bitOffset)) & 1;
+    }
+
+    @Override
+    public void readInt(long startIndex, int length, int[] out) {
+      long bitOffset = startIndex * _numBitsPerValue;
+      long byteOffset = bitOffset / Byte.SIZE;
+      bitOffset = bitOffset & 7;
+      int packed = 0;
+      int i = 0;
+
+      // unaligned read within a byte
+      if (bitOffset != 0) {
+        packed = (int)_dataBuffer.getByte(byteOffset) & 0xff;
+        if (bitOffset == 1) {
+          // unpack 7 integers from bits 1-7
+          out[0] = (packed >>> 6) & 1;
+          out[1] = (packed >>> 5) & 1;
+          out[2] = (packed >>> 4) & 1;
+          out[3] = (packed >>> 3) & 1;
+          out[4] = (packed >>> 2) & 1;
+          out[5] = (packed >>> 1) & 1;
+          out[6] = packed & 1;
+          i = 7;
+          length -= 7;
+        }
+        else if (bitOffset == 2) {
+          // unpack 6 integers from bits 2 to 7
+          out[0] = (packed >>> 5) & 1;
+          out[1] = (packed >>> 4) & 1;
+          out[2] = (packed >>> 3) & 1;
+          out[3] = (packed >>> 2) & 1;
+          out[4] = (packed >>> 1) & 1;
+          out[5] = packed & 1;
+          i = 6;
+          length -= 6;
+        } else if (bitOffset == 3) {
+          // unpack 5 integers from bits 3 to 7
+          out[0] = (packed >>> 4) & 1;
+          out[1] = (packed >>> 3) & 1;
+          out[2] = (packed >>> 2) & 1;
+          out[3] = (packed >>> 1) & 1;
+          out[4] = packed & 1;
+          i = 5;
+          length -= 5;
+        } else if (bitOffset == 4) {
+          // unpack 4 integers from bits 4 to 7
+          out[0] = (packed >>> 3) & 1;
+          out[1] = (packed >>> 2) & 1;
+          out[2] = (packed >>> 1) & 1;
+          out[3] = packed & 1;
+          i = 4;
+          length -= 4;
+        } else if (bitOffset == 5) {
+          // unpack 3 integers from bits 5 to 7
+          out[0] = (packed >>> 2) & 1;
+          out[1] = (packed >>> 1) & 1;
+          out[2] = packed & 1;
+          i = 3;
+          length -= 3;
+        } else if (bitOffset == 6) {
+          // unpack 2 integers from bits 6 to 7
+          out[0] = (packed >>> 1) & 1;
+          out[1] = packed & 1;
+          i = 2;
+          length -= 2;
+        }
+        else {
+          // unpack integer from bit 7
+          out[0] = packed & 1;
+          i = 1;
+          length -= 1;
+        }
+        byteOffset++;
+      }
+
+      // aligned reads at 4-byte boundary to unpack 32 integers
+      while (length >= 32) {
+        packed = _dataBuffer.getInt(byteOffset);
+        out[i] = packed >>> 31;
+        out[i + 1] = (packed >>> 30) & 1;
+        out[i + 2] = (packed >>> 29) & 1;
+        out[i + 3] = (packed >>> 28) & 1;
+        out[i + 4] = (packed >>> 27) & 1;
+        out[i + 5] = (packed >>> 26) & 1;
+        out[i + 6] = (packed >>> 25) & 1;
+        out[i + 7] = (packed >>> 24) & 1;
+        out[i + 8] = (packed >>> 23) & 1;
+        out[i + 9] = (packed >>> 22) & 1;
+        out[i + 10] = (packed >>> 21) & 1;
+        out[i + 11] = (packed >>> 20) & 1;
+        out[i + 12] = (packed >>> 19) & 1;
+        out[i + 13] = (packed >>> 18) & 1;
+        out[i + 14] = (packed >>> 17) & 1;
+        out[i + 15] = (packed >>> 16) & 1;
+        out[i + 16] = (packed >>> 15) & 1;
+        out[i + 17] = (packed >>> 14) & 1;
+        out[i + 18] = (packed >>> 13) & 1;
+        out[i + 19] = (packed >>> 12) & 1;
+        out[i + 20] = (packed >>> 11) & 1;
+        out[i + 21] = (packed >>> 10) & 1;
+        out[i + 22] = (packed >>> 9) & 1;
+        out[i + 23] = (packed >>> 8) & 1;
+        out[i + 24] = (packed >>> 7) & 1;
+        out[i + 25] = (packed >>> 6) & 1;
+        out[i + 26] = (packed >>> 5) & 1;
+        out[i + 27] = (packed >>> 4) & 1;
+        out[i + 28] = (packed >>> 3) & 1;
+        out[i + 29] = (packed >>> 2) & 1;
+        out[i + 30] = (packed >>> 1) & 1;
+        out[i + 31] = packed & 1;
+        length -= 32;
+        byteOffset += 4;
+        i += 32;
+      }
+
+      // aligned reads at 2-byte boundary to unpack 16 integers
+      if (length >= 16) {
+        packed = (int)_dataBuffer.getShort(byteOffset) & 0xffff;
+        out[i] = (packed >>> 15) & 1;
+        out[i + 1] = (packed >>> 14) & 1;
+        out[i + 2] = (packed >>> 13) & 1;
+        out[i + 3] = (packed >>> 12) & 1;
+        out[i + 4] = (packed >>> 11) & 1;
+        out[i + 5] = (packed >>> 10) & 1;
+        out[i + 6] = (packed >>> 9) & 1;
+        out[i + 7] = (packed >>> 8) & 1;
+        out[i + 8] = (packed >>> 7) & 1;
+        out[i + 9] = (packed >>> 6) & 1;
+        out[i + 10] = (packed >>> 5) & 1;
+        out[i + 11] = (packed >>> 4) & 1;
+        out[i + 12] = (packed >>> 3) & 1;
+        out[i + 13] = (packed >>> 2) & 1;
+        out[i + 14] = (packed >>> 1) & 1;
+        out[i + 15] = packed & 1;
+        length -= 16;
+        byteOffset += 2;
+        i += 16;
+      }
+
+      // aligned reads at byte boundary to unpack 8 integers
+      if (length >= 8) {
+        packed = (int)_dataBuffer.getByte(byteOffset) & 0xff;
+        out[i] = (packed >>> 7) & 1;
+        out[i + 1] = (packed >>> 6) & 1;
+        out[i + 2] = (packed >>> 5) & 1;
+        out[i + 3] = (packed >>> 4) & 1;
+        out[i + 4] = (packed >>> 3) & 1;
+        out[i + 5] = (packed >>> 2) & 1;
+        out[i + 6] = (packed >>> 1) & 1;
+        out[i + 7] = packed & 1;
+        length -= 8;
+        byteOffset += 1;
+        i += 8;
+      }
+
+      // handle spill-over
+
+      if (length == 7) {
+        // unpack from bits 0-6
+        packed = (int)_dataBuffer.getByte(byteOffset) & 0xff;
+        out[i] = (packed >>> 7) & 1;
+        out[i + 1] = (packed >>> 6) & 1;
+        out[i + 2] = (packed >>> 5) & 1;
+        out[i + 3] = (packed >>> 4) & 1;
+        out[i + 4] = (packed >>> 3) & 1;
+        out[i + 5] = (packed >>> 2) & 1;
+        out[i + 6] = (packed >>> 1) & 1;
+      } else if (length == 6) {
+        // unpack from bits 0-5
+        out[i] = (packed >>> 7) & 1;
+        out[i + 1] = (packed >>> 6) & 1;
+        out[i + 2] = (packed >>> 5) & 1;
+        out[i + 3] = (packed >>> 4) & 1;
+        out[i + 4] = (packed >>> 3) & 1;
+        out[i + 5] = (packed >>> 2) & 1;
+      } else if (length == 5) {
+        // unpack from bits 0-4
+        out[i] = (packed >>> 7) & 1;
+        out[i + 1] = (packed >>> 6) & 1;
+        out[i + 2] = (packed >>> 5) & 1;
+        out[i + 3] = (packed >>> 4) & 1;
+        out[i + 4] = (packed >>> 3) & 1;
+      } else if (length == 4) {
+        // unpack from bits 0-3
+        out[i] = (packed >>> 7) & 1;
+        out[i + 1] = (packed >>> 6) & 1;
+        out[i + 2] = (packed >>> 5) & 1;
+        out[i + 3] = (packed >>> 4) & 1;
+      } else if (length == 3) {
+        // unpack from bits 0-2
+        out[i] = (packed >>> 7) & 1;
+        out[i + 1] = (packed >>> 6) & 1;
+        out[i + 2] = (packed >>> 5) & 1;
+      } else if (length == 2) {
+        // unpack from bits 0-3
+        out[i] = (packed >>> 7) & 1;
+        out[i + 1] = (packed >>> 6) & 1;
+      } else {
+        out[i] = (packed >>> 7) & 1;
+      }
+    }
+  }
+
+  public static class Bit2Encoded extends PinotDataBitSetV2 {
+    Bit2Encoded(PinotDataBuffer dataBuffer, int numBits) {
+      _dataBuffer = dataBuffer;
+      _numBitsPerValue = numBits;
+    }
+
+    @Override
+    public int readInt(long index) {
+      long bitOffset = index * _numBitsPerValue;
+      long byteOffset = bitOffset / Byte.SIZE;
+      int val = (int)_dataBuffer.getByte(byteOffset) & 0xff;
+      bitOffset = bitOffset & 7;
+      return  (val >>> (6 - bitOffset)) & 3;
+    }
+
+    @Override
+    public void readInt(long startIndex, int length, int[] out) {
+      long bitOffset = startIndex * _numBitsPerValue;
+      long byteOffset = bitOffset / Byte.SIZE;
+      bitOffset = bitOffset & 7;
+      int packed = 0;
+      int i = 0;
+
+      /*
+       * Bytes are read as follows to get maximum vectorization
+       *
+       * [1 byte] - read from either the 2nd/4th/6th bit to 7th bit to unpack 
1/2/3 integers
+       * [chunks of 4 bytes] - read 4 bytes at a time to unpack 16 integers
+       * [1 chunk of 2 bytes] - read 2 bytes to unpack 8 integers
+       * [1 byte] - read the byte to unpack 4 integers
+       * [1 byte] - unpack 1/2/3 integers from first 2/4/6 bits
+       */
+
+      // unaligned read within a byte
+      if (bitOffset != 0) {
+        packed = (int)_dataBuffer.getByte(byteOffset) & 0xff;
+        if (bitOffset == 2) {
+          // unpack 3 integers from bits 2-7
+          out[0] = (packed >>> 4) & 3;
+          out[1] = (packed >>> 2) & 3;
+          out[2] = packed & 3;
+          i = 3;
+          length -= 3;
+        }
+        else if (bitOffset == 4) {
+          // unpack 2 integers from bits 4 to 7
+          out[0] = (packed >>> 2) & 3;
+          out[1] = packed & 3;
+          i = 2;
+          length -= 2;
+        } else {
+          // unpack integer from bits 6 to 7
+          out[0] = packed & 3;
+          i = 1;
+          length -= 1;
+        }
+        byteOffset++;
+      }
+
+      // aligned reads at 4-byte boundary to unpack 16 integers
+      while (length >= 16) {
+        packed = _dataBuffer.getInt(byteOffset);
+        out[i] = packed >>> 30;
+        out[i + 1] = (packed >>> 28) & 3;
+        out[i + 2] = (packed >>> 26) & 3;
+        out[i + 3] = (packed >>> 24) & 3;
+        out[i + 4] = (packed >>> 22) & 3;
+        out[i + 5] = (packed >>> 20) & 3;
+        out[i + 6] = (packed >>> 18) & 3;
+        out[i + 7] = (packed >>> 16) & 3;
+        out[i + 8] = (packed >>> 14) & 3;
+        out[i + 9] = (packed >>> 12) & 3;
+        out[i + 10] = (packed >>> 10) & 3;
+        out[i + 11] = (packed >>> 8) & 3;
+        out[i + 12] = (packed >>> 6) & 3;
+        out[i + 13] = (packed >>> 4) & 3;
+        out[i + 14] = (packed >>> 2) & 3;
+        out[i + 15] = packed & 3;
+        length -= 16;
+        byteOffset += 4;
+        i += 16;
+      }
+
+      if (length >= 8) {
+        packed = (int)_dataBuffer.getShort(byteOffset) & 0xffff;
+        out[i] = (packed >>> 14) & 3;
+        out[i + 1] = (packed >>> 12) & 3;
+        out[i + 2] = (packed >>> 10) & 3;
+        out[i + 3] = (packed >>> 8) & 3;
+        out[i + 4] = (packed >>> 6) & 3;
+        out[i + 5] = (packed >>> 4) & 3;
+        out[i + 6] = (packed >>> 2) & 3;
+        out[i + 7] = packed & 3;
+        length -= 8;
+        byteOffset += 2;
+        i += 8;
+      }
+
+      // aligned read at byte boundary to unpack 4 integers
+      if (length >= 4) {
+        packed = (int)_dataBuffer.getByte(byteOffset) & 0xff;
+        out[i] = packed >>> 6;
+        out[i + 1] = (packed >>> 4) & 3;
+        out[i + 2] = (packed >>> 2) & 3;
+        out[i + 3] = packed & 3;
+        length -= 4;
+        byteOffset++;
+        i += 4;
+      }
+
+      // handle spill-over
+
+      if (length > 0) {
+        // unpack from bits 0-1
+        packed = (int)_dataBuffer.getByte(byteOffset) & 0xff;
+        out[i] = packed >>> 6;
+        length--;
+      }
+
+      if (length > 0) {
+        // unpack from bits 2-3
+        out[i + 1] = (packed >>> 4) & 3;
+        length--;
+      }
+
+      if (length > 0) {
+        // unpack from bits 4-5
+        out[i + 2] = (packed >>> 2) & 3;
+      }
+    }
+  }
+
+  public static class Bit4Encoded extends PinotDataBitSetV2 {
+    Bit4Encoded(PinotDataBuffer dataBuffer, int numBits) {
+      _dataBuffer = dataBuffer;
+      _numBitsPerValue = numBits;
+    }
+
+    @Override
+    public int readInt(long index) {
+      long bitOffset = index * _numBitsPerValue;
+      long byteOffset = bitOffset / Byte.SIZE;
+      int val = (int)_dataBuffer.getByte(byteOffset) & 0xff;
+      bitOffset = bitOffset & 7;
+      return (bitOffset == 0) ? val >>> 4 : val & 0xf;
+    }
+
+    @Override
+    public void readInt(long startIndex, int length, int[] out) {
+      long bitOffset = startIndex * _numBitsPerValue;
+      long byteOffset = bitOffset / Byte.SIZE;
+      bitOffset = bitOffset & 7;
+      int packed = 0;
+      int i = 0;
+
+      /*
+       * Bytes are read as follows to get maximum vectorization
+       *
+       * [1 byte] - read from the 4th bit to 7th bit to unpack 1 integer
+       * [chunks of 4 bytes] - read 4 bytes at a time to unpack 8 integers
+       * [1 chunk of 2 bytes] - read 2 bytes to unpack 4 integers
+       * [1 byte] - unpack 1 integer from first 4 bits
+       */
+
+      // unaligned read within a byte from bits 4-7
+      if (bitOffset != 0) {
+        packed = (int)_dataBuffer.getByte(byteOffset) & 0xff;
+        out[0] = packed & 0xf;
+        i = 1;
+        byteOffset++;
+        length--;
+      }
+
+      // aligned read at 4-byte boundary to unpack 8 integers
+      while (length >= 8) {
+        packed = _dataBuffer.getInt(byteOffset);
+        out[i] = packed >>> 28;
+        out[i + 1] = (packed >>> 24) & 0xf;
+        out[i + 2] = (packed >>> 20) & 0xf;
+        out[i + 3] = (packed >>> 16) & 0xf;
+        out[i + 4] = (packed >>> 12) & 0xf;
+        out[i + 5] = (packed >>> 8) & 0xf;
+        out[i + 6] = (packed >>> 4) & 0xf;
+        out[i + 7] = packed & 0xf;
+        length -= 8;
+        i += 8;
+        byteOffset += 4;
+      }
+
+      // aligned read at 2-byte boundary to unpack 4 integers
+      if (length >= 4) {
+        packed = (int)_dataBuffer.getShort(byteOffset) & 0xffff;
+        out[i] = (packed >>> 12) & 0xf;
+        out[i + 1] = (packed >>> 8) & 0xf;
+        out[i + 2] = (packed >>> 4) & 0xf;
+        out[i + 3] = packed & 0xf;
+        length -= 4;
+        i += 4;
+        byteOffset += 2;
+      }
+
+      // aligned read at byte boundary to unpack 2 integers
+      if (length >= 2) {
+        packed = (int)_dataBuffer.getByte(byteOffset) & 0xff;
+        out[i] = packed >>> 4;
+        out[i + 1] = packed & 0xf;
+        length -= 2;
+        i += 2;
+        byteOffset++;
+      }
+
+      // handle spill over -- unpack from bits 0-3
+      if (length > 0) {
+        packed = (int)_dataBuffer.getByte(byteOffset) & 0xff;
+        out[i] = packed >>> 4;
+      }
+    }
+  }
+
+  public static class Bit8Encoded extends PinotDataBitSetV2 {
+    Bit8Encoded(PinotDataBuffer dataBuffer, int numBits) {
+      _dataBuffer = dataBuffer;
+      _numBitsPerValue = numBits;
+    }
+
+    @Override
+    public int readInt(long index) {
+      long bitOffset = index * _numBitsPerValue;
+      long byteOffset = bitOffset / Byte.SIZE;
+      return ((int)_dataBuffer.getByte(byteOffset)) & 0xff;
+    }
+
+    @Override
+    public void readInt(long startIndex, int length, int[] out) {
+      long bitOffset = startIndex * _numBitsPerValue;
+      long byteOffset = bitOffset / Byte.SIZE;
+      int i = 0;
+      int packed = 0;
+
+      /*
+       * Bytes are read as follows to get maximum vectorization
+       *
+       * [chunks of 4 bytes] - read 4 bytes at a time to unpack 4 integers
+       * [1 chunk of 2 bytes] - read 2 bytes to unpack 4 integers
+       * [1 byte] - unpack 1 integer from first 4 bits
+       */
+
+      // aligned read at 4-byte boundary to unpack 4 integers
+      while (length >= 4) {
+        packed = _dataBuffer.getInt(byteOffset);
+        out[i] = packed >>> 24;
+        out[i + 1] = (packed >>> 16) & 0xff;
+        out[i + 2] = (packed >>> 8) & 0xff;
+        out[i + 3] = packed & 0xff;
+        length -= 4;
+        byteOffset += 4;
+        i += 4;
+      }
+
+      // aligned read at 2-byte boundary to unpack 2 integers
+      if (length >= 2) {
+        packed = (int)_dataBuffer.getShort(byteOffset) & 0xffff;
+        out[i] = (packed >>> 8) & 0xff;
+        out[i + 1] = packed & 0xff;
+        length -= 2;
+        byteOffset += 2;
+        i += 2;
+      }
+
+      // handle spill over at byte boundary to unpack 1 integer
+      if (length > 0) {
+        out[i] = (int)_dataBuffer.getByte(byteOffset) & 0xff;
+      }
+    }
+  }
+
+  public static class Bit16Encoded extends PinotDataBitSetV2 {
+    Bit16Encoded(PinotDataBuffer dataBuffer, int numBits) {
+      _dataBuffer = dataBuffer;
+      _numBitsPerValue = numBits;
+    }
+
+    @Override
+    public int readInt(long index) {
+      long bitOffset = index * _numBitsPerValue;
+      long byteOffset = bitOffset / Byte.SIZE;
+      return ((int)_dataBuffer.getShort(byteOffset)) & 0xffff;
+    }
+
+    @Override
+    public void readInt(long startIndex, int length, int[] out) {
+      long bitOffset = startIndex * _numBitsPerValue;
+      long byteOffset = bitOffset / Byte.SIZE;
+      int i = 0;
+      int packed;
+
+      /*
+       * Bytes are read as follows to get maximum vectorization
+       *
+       * [chunks of 4 bytes] - read 4 bytes at a time to unpack 2 integers
+       * [1 chunk of 2 bytes] - read 2 bytes to unpack 1 integer
+       */
+
+      // aligned reads at 4-byte boundary to unpack 2 integers
+      while (length >= 2) {
+        packed = _dataBuffer.getInt(byteOffset);
+        out[i] = packed >>> 16;
+        out[i + 1] = packed & 0xffff;
+        length -= 2;
+        i += 2;
+        byteOffset += 4;
+      }
+
+      // handle spill over at 2-byte boundary to unpack 1 integer
+      if (length > 0) {
+        out[i] = (int)_dataBuffer.getShort(byteOffset) & 0xffff;
+      }
+    }
+  }
+
+  public static class RawInt extends PinotDataBitSetV2 {
+    RawInt(PinotDataBuffer dataBuffer, int numBits) {
+      _dataBuffer = dataBuffer;
+      _numBitsPerValue = numBits;
+    }
+
+    @Override
+    public int readInt(long index) {
+      return _dataBuffer.getInt(index * Integer.BYTES);
+    }
+
+    @Override
+    public void readInt(long startIndex, int length, int[] out) {
+      long byteOffset = startIndex * Integer.BYTES;
+      for (int i = 0; i < length; i++) {
+        out[i] = _dataBuffer.getInt(byteOffset);
+        byteOffset += 4;
+      }
+    }
+  }
+
+  protected void writeInt(int index, int value) {
+    long bitOffset = (long) index * _numBitsPerValue;
+    int byteOffset = (int) (bitOffset / Byte.SIZE);
+    int bitOffsetInFirstByte = (int) (bitOffset % Byte.SIZE);
+
+    int firstByte = _dataBuffer.getByte(byteOffset);
+
+    int firstByteMask = BYTE_MASK >>> bitOffsetInFirstByte;
+    int numBitsLeft = _numBitsPerValue - (Byte.SIZE - bitOffsetInFirstByte);
+    if (numBitsLeft <= 0) {
+      // The value is inside the first byte
+      firstByteMask &= BYTE_MASK << -numBitsLeft;
+      _dataBuffer.putByte(byteOffset, (byte) ((firstByte & ~firstByteMask) | 
(value << -numBitsLeft)));
+    } else {
+      // The value is in multiple bytes
+      _dataBuffer
+          .putByte(byteOffset, (byte) ((firstByte & ~firstByteMask) | ((value 
>>> numBitsLeft) & firstByteMask)));
+      while (numBitsLeft > Byte.SIZE) {
+        numBitsLeft -= Byte.SIZE;
+        _dataBuffer.putByte(++byteOffset, (byte) (value >> numBitsLeft));
+      }
+      int lastByte = _dataBuffer.getByte(++byteOffset);
+      _dataBuffer.putByte(byteOffset,
+          (byte) ((lastByte & (BYTE_MASK >>> numBitsLeft)) | (value << 
(Byte.SIZE - numBitsLeft))));
+    }
+  }
+
+  public void writeInt(int startIndex, int length, int[] values) {
+    long startBitOffset = (long) startIndex * _numBitsPerValue;
+    int byteOffset = (int) (startBitOffset / Byte.SIZE);
+    int bitOffsetInFirstByte = (int) (startBitOffset % Byte.SIZE);
+
+    int firstByte = _dataBuffer.getByte(byteOffset);
+
+    for (int i = 0; i < length; i++) {
+      int value = values[i];
+      if (bitOffsetInFirstByte == Byte.SIZE) {
+        bitOffsetInFirstByte = 0;
+        firstByte = _dataBuffer.getByte(++byteOffset);
+      }
+      int firstByteMask = BYTE_MASK >>> bitOffsetInFirstByte;
+      int numBitsLeft = _numBitsPerValue - (Byte.SIZE - bitOffsetInFirstByte);
+      if (numBitsLeft <= 0) {
+        // The value is inside the first byte
+        firstByteMask &= BYTE_MASK << -numBitsLeft;
+        firstByte = ((firstByte & ~firstByteMask) | (value << -numBitsLeft));
+        _dataBuffer.putByte(byteOffset, (byte) firstByte);
+        bitOffsetInFirstByte = Byte.SIZE + numBitsLeft;
+      } else {
+        // The value is in multiple bytes
+        _dataBuffer
+            .putByte(byteOffset, (byte) ((firstByte & ~firstByteMask) | 
((value >>> numBitsLeft) & firstByteMask)));
+        while (numBitsLeft > Byte.SIZE) {
+          numBitsLeft -= Byte.SIZE;
+          _dataBuffer.putByte(++byteOffset, (byte) (value >> numBitsLeft));
+        }
+        int lastByte = _dataBuffer.getByte(++byteOffset);
+        firstByte = (lastByte & (0xFF >>> numBitsLeft)) | (value << (Byte.SIZE 
- numBitsLeft));
+        _dataBuffer.putByte(byteOffset, (byte) firstByte);
+        bitOffsetInFirstByte = numBitsLeft;
+      }
+    }
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+  }
+}
\ No newline at end of file
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/io/util/PinotDataBitSetV2Test.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/io/util/PinotDataBitSetV2Test.java
new file mode 100644
index 0000000..b8786ea
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/io/util/PinotDataBitSetV2Test.java
@@ -0,0 +1,438 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.io.util;
+
+import java.nio.ByteOrder;
+import java.util.Random;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class PinotDataBitSetV2Test {
+
+  private void batchRead(PinotDataBitSetV2 bitset, int startDocId, int 
batchLength, int[] unpacked,
+      int[] forwardIndex) {
+    bitset.readInt(startDocId, batchLength, unpacked);
+    for (int i = 0; i < batchLength; i++) {
+      Assert.assertEquals(forwardIndex[startDocId + i], unpacked[i]);
+    }
+  }
+
+  @Test
+  public void testBit2Encoded() throws Exception {
+    int cardinality = 3;
+    int rows = 10000;
+    int[] forwardIndex = new int[rows];
+    Random random = new Random();
+
+    for (int i = 0; i < rows; i++) {
+      forwardIndex[i] = random.nextInt(cardinality);
+    }
+
+    int numBitsPerValue = PinotDataBitSet.getNumBitsPerValue(cardinality - 1);
+    int bitPackedBufferSize = (rows * numBitsPerValue + Byte.SIZE - 1) / 
Byte.SIZE;
+    PinotDataBitSetV2 bitSet = getEmptyBitSet(bitPackedBufferSize, 
numBitsPerValue);
+
+    Assert.assertEquals(2, numBitsPerValue);
+    Assert.assertTrue(bitSet instanceof PinotDataBitSetV2.Bit2Encoded);
+
+    for (int i = 0; i < rows; i++) {
+      bitSet.writeInt(i, forwardIndex[i]);
+    }
+
+    // test single read API for sequential consecutive
+    for (int i = 0; i < rows; i++) {
+      int unpacked = bitSet.readInt(i);
+      Assert.assertEquals(forwardIndex[i], unpacked);
+    }
+
+    // for each batch:
+    // 3 aligned reads at 4-byte boundary to unpack 16 integers after each 
read -- 48 integers unpacked
+    // followed by reading the next byte to unpack 2 integers from first 4 bits
+    int batchLength = 50;
+    int[] unpacked = new int[batchLength];
+    int startDocId;
+    for (startDocId = 0; startDocId < rows; startDocId += 50) {
+      bitSet.readInt(startDocId, batchLength, unpacked);
+      for (int i = 0; i < batchLength; i++) {
+        Assert.assertEquals(forwardIndex[startDocId + i], unpacked[i]);
+      }
+    }
+
+    // 3 aligned reads at 4-byte boundary to unpack 16 integers after each 
read -- 48 integers unpacked
+    // followed by reading the next 2 bytes to unpack 8 integers
+    batchLength = 56;
+    unpacked = new int[batchLength];
+    startDocId = 1;
+    batchRead(bitSet, startDocId, batchLength, unpacked, forwardIndex);
+
+    // 3 aligned reads at 4-byte boundary to unpack 16 integers after each 
read -- 48 integers unpacked
+    // followed by reading the next 2 bytes to unpack 8 integers
+    // followed by reading the next byte to unpack 4 integers
+    batchLength = 60;
+    unpacked = new int[batchLength];
+    startDocId = 20;
+    batchRead(bitSet, startDocId, batchLength, unpacked, forwardIndex);
+
+    // 3 aligned reads at 4-byte boundary to unpack 16 integers after each 
read -- 48 integers unpacked
+    // followed by reading the next 2 bytes to unpack 8 integers
+    // followed by reading the next byte to unpack 4 integers
+    // followed by reading the next byte to unpack 1 integer from first 2 bits
+    batchLength = 61;
+    unpacked = new int[batchLength];
+    startDocId = 20;
+    batchRead(bitSet, startDocId, batchLength, unpacked, forwardIndex);
+
+    // 3 aligned reads at 4-byte boundary to unpack 16 integers after each 
read -- 48 integers unpacked
+    // followed by reading the next 2 bytes to unpack 8 integers
+    // followed by reading the next byte to unpack 4 integers
+    // followed by reading the next byte to unpack 2 integers from first 4 bits
+    batchLength = 62;
+    unpacked = new int[batchLength];
+    startDocId = 20;
+    batchRead(bitSet, startDocId, batchLength, unpacked, forwardIndex);
+
+    // 3 aligned reads at 4-byte boundary to unpack 16 integers after each 
read -- 48 integers unpacked
+    // followed by reading the next 2 bytes to unpack 8 integers
+    // followed by reading the next byte to unpack 4 integers
+    // followed by reading the next byte to unpack 6 integers from first 6 bits
+    batchLength = 63;
+    unpacked = new int[batchLength];
+    startDocId = 20;
+    batchRead(bitSet, startDocId, batchLength, unpacked, forwardIndex);
+
+    // for each batch:
+    // unaligned read on the first byte to unpack 3 integers
+    // followed by 3 aligned reads at byte boundary to unpack 4 integers after 
each read -- 12 integers unpacked
+    // followed by reading the next byte to unpack 2 integer from first 4 bits
+    // 3 + 12 + 2  = 17 unpacked integers
+    batchLength = 17;
+    unpacked = new int[batchLength];
+    startDocId = 1;
+    bitSet.readInt(startDocId, batchLength, unpacked);
+    for (int i = 0; i < batchLength; i++) {
+      Assert.assertEquals(forwardIndex[startDocId + i], unpacked[i]);
+    }
+
+    // unaligned read on the first byte to unpack 3 integers (bits 2 to 7)
+    // followed by 3 aligned reads at 4-byte boundary to unpack 16 integers 
after each read -- 48 integers unpacked
+    // followed by 1 aligned read at byte boundary to unpack 4 integers -- 4 
integers unpacked
+    // followed by reading the next byte to unpack 3 integers from first 6 bits
+    // 3 + 48 + 4 + 3 = 58 unpacked integers
+    batchLength = 58;
+    unpacked = new int[batchLength];
+    startDocId = 1;
+    bitSet.readInt(startDocId, batchLength, unpacked);
+    for (int i = 0; i < batchLength; i++) {
+      Assert.assertEquals(forwardIndex[startDocId + i], unpacked[i]);
+    }
+
+    // for each batch:
+    // unaligned read on the first byte to unpack 2 integers (bits 4 to 7)
+    // followed by 3 aligned reads at 4-byte boundary to unpack 16 integers 
after each read -- 48 integers unpacked
+    // followed by 2 aligned reads at byte boundary to unpack 4 integers after 
each read -- 8 integers unpacked
+    // 3 + 48 + 8 = 58 unpacked integers
+    startDocId = 2;
+    unpacked = new int[batchLength];
+    bitSet.readInt(startDocId, batchLength, unpacked);
+    for (int i = 0; i < batchLength; i++) {
+      Assert.assertEquals(forwardIndex[startDocId + i], unpacked[i]);
+    }
+
+    // for each batch:
+    // unaligned read on the first byte to unpack 1 integers (bits 6 to 7)
+    // followed by 3 aligned reads at 4-byte boundary to unpack 16 integers 
after each read -- 48 integers unpacked
+    // followed by 2 aligned reads at byte boundary to unpack 4 integers after 
each read -- 8 integers unpacked
+    // followed by reading the next byte to unpack 1 integer from first 2 bits
+    // 1 + 48 + 8 + 1 = 58 unpacked integers
+    startDocId = 3;
+    unpacked = new int[batchLength];
+    bitSet.readInt(startDocId, batchLength, unpacked);
+    for (int i = 0; i < batchLength; i++) {
+      Assert.assertEquals(forwardIndex[startDocId + i], unpacked[i]);
+    }
+
+    bitSet.close();
+  }
+
+  @Test
+  public void testBit4Encoded() throws Exception {
+    int cardinality = 11;
+    int rows = 10000;
+    int[] forwardIndex = new int[rows];
+    Random random = new Random();
+
+    for (int i = 0; i < rows; i++) {
+      forwardIndex[i] = random.nextInt(cardinality);
+    }
+
+    int numBitsPerValue = PinotDataBitSet.getNumBitsPerValue(cardinality - 1);
+    int bitPackedBufferSize = (rows * numBitsPerValue + Byte.SIZE - 1) / 
Byte.SIZE;
+    PinotDataBitSetV2 bitSet = getEmptyBitSet(bitPackedBufferSize, 
numBitsPerValue);
+
+    Assert.assertEquals(4, numBitsPerValue);
+    Assert.assertTrue(bitSet instanceof PinotDataBitSetV2.Bit4Encoded);
+
+    for (int i = 0; i < rows; i++) {
+      bitSet.writeInt(i, forwardIndex[i]);
+    }
+
+    // test single read API for sequential consecutive
+    for (int i = 0; i < rows; i++) {
+      int unpacked = bitSet.readInt(i);
+      Assert.assertEquals(forwardIndex[i], unpacked);
+    }
+
+    // test array API for sequential consecutive
+
+    // for each batch: do a combination of aligned and unaligned reads
+    // 6 aligned reads at 4-byte boundary to unpack 8 integers after each read 
-- 48 integers unpacked
+    // followed by reading the next byte to unpack 2 integers
+    int batchLength = 50;
+    int[] unpacked = new int[batchLength];
+    int startDocId;
+    for (startDocId = 0; startDocId < rows; startDocId += batchLength) {
+      bitSet.readInt(startDocId, batchLength, unpacked);
+      for (int i = 0; i < batchLength; i++) {
+        Assert.assertEquals(forwardIndex[startDocId + i], unpacked[i]);
+      }
+    }
+
+    // 12 aligned reads at 4-byte boundary to unpack 8 integers after each 
read -- 96 integers unpacked
+    batchLength = 96;
+    unpacked = new int[batchLength];
+    startDocId = 19;
+    bitSet.readInt(startDocId, batchLength, unpacked);
+    for (int i = 0; i < batchLength; i++) {
+      Assert.assertEquals(forwardIndex[startDocId + i], unpacked[i]);
+    }
+
+    // only a single unaligned read from the middle of a byte (44th bit)
+    batchLength = 1;
+    startDocId = 21;
+    bitSet.readInt(startDocId, batchLength, unpacked);
+    Assert.assertEquals(forwardIndex[startDocId], unpacked[0]);
+
+    // unaligned read within a byte to unpack an integer from bits 4 to 7
+    // followed by 2 aligned reads at 4-byte boundary to unpack 8 integers 
after each read -- unpacked 16 integers
+    // followed by 1 aligned read at 2-byte boundary to unpack 4 integers
+    // followed by 1 aligned read at byte boundary to unpack 2 integers
+    // followed by reading the next byte to unpack integer from first 4 bits
+    // 1 + 16 + 4 + 2 + 1 = 24 unpacked integers
+    startDocId = 1;
+    batchLength = 24;
+    unpacked = new int[batchLength];
+    bitSet.readInt(startDocId, batchLength, unpacked);
+    for (int i = 0; i < batchLength; i++) {
+      Assert.assertEquals(forwardIndex[startDocId + i], unpacked[i]);
+    }
+
+    // unaligned read within a byte to unpack an integer from bits 4 to 7
+    // 1 aligned read at 2-byte boundary to unpack 4 integers
+    // followed by 1 aligned read at byte boundary to unpack 2 integers
+    // followed by reading the next byte to unpack integer from first 4 bits
+    // 1 + 4 + 2 + 1 = 8 unpacked integers
+    startDocId = 1;
+    batchLength = 8;
+    unpacked = new int[batchLength];
+    bitSet.readInt(startDocId, batchLength, unpacked);
+    for (int i = 0; i < batchLength; i++) {
+      Assert.assertEquals(forwardIndex[startDocId + i], unpacked[i]);
+    }
+
+    // 1 aligned read at 2-byte boundary to unpack 4 integers
+    // followed by 1 aligned read at byte boundary to unpack 2 integers
+    // followed by reading the next byte to unpack integer from first 4 bits
+    // 4 + 2 + 1 = 7 unpacked integers
+    startDocId = 4;
+    batchLength = 7;
+    unpacked = new int[batchLength];
+    bitSet.readInt(startDocId, batchLength, unpacked);
+    for (int i = 0; i < batchLength; i++) {
+      Assert.assertEquals(forwardIndex[startDocId + i], unpacked[i]);
+    }
+
+    // test bulk API for sequential but not necessarily consecutive
+    testBulkSequentialWithGaps(bitSet, 1, 50, -1, forwardIndex);
+    testBulkSequentialWithGaps(bitSet, 5, 57, 4, forwardIndex);
+    testBulkSequentialWithGaps(bitSet, 17, 109, 19, forwardIndex);
+    testBulkSequentialWithGaps(bitSet, 17, 1, 19, forwardIndex);
+
+    bitSet.close();
+  }
+
+  @Test
+  public void testBit8Encoded() throws Exception {
+    int cardinality = 190;
+    int rows = 10000;
+    int[] forwardIndex = new int[rows];
+    Random random = new Random();
+
+    for (int i = 0; i < rows; i++) {
+      forwardIndex[i] = random.nextInt(cardinality);
+    }
+
+    int numBitsPerValue = PinotDataBitSet.getNumBitsPerValue(cardinality - 1);
+    int bitPackedBufferSize = (rows * numBitsPerValue + Byte.SIZE - 1) / 
Byte.SIZE;
+    PinotDataBitSetV2 bitSet = getEmptyBitSet(bitPackedBufferSize, 
numBitsPerValue);
+
+    Assert.assertEquals(8, numBitsPerValue);
+    Assert.assertTrue(bitSet instanceof PinotDataBitSetV2.Bit8Encoded);
+
+    for (int i = 0; i < rows; i++) {
+      bitSet.writeInt(i, forwardIndex[i]);
+    }
+
+    // test single read API for sequential consecutive
+    for (int i = 0; i < rows; i++) {
+      int unpacked = bitSet.readInt(i);
+      Assert.assertEquals(forwardIndex[i], unpacked);
+    }
+
+    // test array API for sequential consecutive
+
+    // for each batch:
+    // 12 aligned reads at 4-byte boundary to unpack 4 integers after each 
read -- 48 integers unpacked
+    // followed by reading the next 2 bytes to unpack 2 integers
+    int batchLength = 50;
+    int[] unpacked = new int[batchLength];
+    int startDocId;
+    for (startDocId = 0; startDocId < rows; startDocId += batchLength) {
+      bitSet.readInt(startDocId, batchLength, unpacked);
+      for (int i = 0; i < batchLength; i++) {
+        Assert.assertEquals(forwardIndex[startDocId + i], unpacked[i]);
+      }
+    }
+
+    // for each batch:
+    // 24 aligned reads at 4-byte boundary to unpack 4 integers after each 
read -- 96 integers unpacked
+    batchLength = 96;
+    unpacked = new int[batchLength];
+    startDocId = 7;
+    bitSet.readInt(startDocId, batchLength, unpacked);
+    for (int i = 0; i < batchLength; i++) {
+      Assert.assertEquals(forwardIndex[startDocId + i], unpacked[i]);
+    }
+
+    // unaligned spill over
+    startDocId = 19;
+    batchLength = 3;
+    bitSet.readInt(startDocId, batchLength, unpacked);
+    Assert.assertEquals(forwardIndex[startDocId], unpacked[0]);
+
+    // test bulk API for sequential but not necessarily consecutive
+    testBulkSequentialWithGaps(bitSet, 1, 50, -1, forwardIndex);
+    testBulkSequentialWithGaps(bitSet, 5, 57, 4, forwardIndex);
+    testBulkSequentialWithGaps(bitSet, 17, 109, 19, forwardIndex);
+    testBulkSequentialWithGaps(bitSet, 17, 1, 19, forwardIndex);
+
+    bitSet.close();
+  }
+
+  @Test
+  public void testBit16Encoded() throws Exception {
+    int cardinality = 40000;
+    int rows = 100000;
+    int[] forwardIndex = new int[rows];
+    Random random = new Random();
+
+    for (int i = 0; i < rows; i++) {
+      forwardIndex[i] = random.nextInt(cardinality);
+    }
+
+    int numBitsPerValue = PinotDataBitSet.getNumBitsPerValue(cardinality - 1);
+    int bitPackedBufferSize = (rows * numBitsPerValue + Byte.SIZE - 1) / 
Byte.SIZE;
+    PinotDataBitSetV2 bitSet = getEmptyBitSet(bitPackedBufferSize, 
numBitsPerValue);
+
+    Assert.assertEquals(16, numBitsPerValue);
+    Assert.assertTrue(bitSet instanceof PinotDataBitSetV2.Bit16Encoded);
+
+    for (int i = 0; i < rows; i++) {
+      bitSet.writeInt(i, forwardIndex[i]);
+    }
+
+    // test single read API for sequential consecutive
+    for (int i = 0; i < rows; i++) {
+      int unpacked = bitSet.readInt(i);
+      Assert.assertEquals(forwardIndex[i], unpacked);
+    }
+
+    // test array API for sequential consecutive
+
+    // for each batch:
+    // 25 aligned reads at 4-byte boundary to unpack 2 integers after each 
read -- 50 integers unpacked
+    int batchLength = 50;
+    int[] unpacked = new int[batchLength];
+    int startDocId;
+    for (startDocId = 0; startDocId < rows; startDocId += batchLength) {
+      bitSet.readInt(startDocId, batchLength, unpacked);
+      for (int i = 0; i < batchLength; i++) {
+        Assert.assertEquals(forwardIndex[startDocId + i], unpacked[i]);
+      }
+    }
+
+    // 25 aligned reads at 4-byte boundary to unpack 2 integers after each 
read -- 50 integers unpacked
+    // followed by unpacking 1 integer from the next 2 bytes
+    batchLength = 51;
+    startDocId = 3;
+    unpacked = new int[batchLength];
+    bitSet.readInt(startDocId, batchLength, unpacked);
+    for (int i = 0; i < batchLength; i++) {
+      Assert.assertEquals(forwardIndex[startDocId + i], unpacked[i]);
+    }
+
+    // unaligned spill over
+    startDocId = 7;
+    batchLength = 1;
+    bitSet.readInt(startDocId, batchLength, unpacked);
+    Assert.assertEquals(forwardIndex[startDocId], unpacked[0]);
+
+    // test array API for sequential but not necessarily consecutive
+    testBulkSequentialWithGaps(bitSet, 1, 50, -1, forwardIndex);
+    testBulkSequentialWithGaps(bitSet, 5, 57, 4, forwardIndex);
+    testBulkSequentialWithGaps(bitSet, 17, 109, 19, forwardIndex);
+    testBulkSequentialWithGaps(bitSet, 17, 1, 19, forwardIndex);
+
+    bitSet.close();
+  }
+
+  private void testBulkSequentialWithGaps(PinotDataBitSetV2 bitset, int gaps, 
int batchLength, int startDocId, int[] forwardIndex) {
+    int docId = startDocId;
+    int[] docIds = new int[batchLength];
+    Random random = new Random();
+    for (int i = 0; i < batchLength; i++) {
+      docId = docId + 1 + random.nextInt(gaps);
+      docIds[i] = docId;
+    }
+    int[] unpacked = new int[batchLength];
+    bitset.readInt(docIds, 0, batchLength, unpacked, 0);
+    for (int i = 0; i < batchLength; i++) {
+      Assert.assertEquals(forwardIndex[docIds[i]], unpacked[i]);
+    }
+  }
+
+  private PinotDataBitSetV2 getEmptyBitSet(int size, int numBitsPerValue) {
+    PinotDataBuffer bitPackedBuffer = PinotDataBuffer.allocateDirect(size, 
ByteOrder.BIG_ENDIAN, null);
+    for (int i = 0; i < size; i++) {
+      bitPackedBuffer.readFrom(0, new byte[size]);
+    }
+    return PinotDataBitSetV2.createBitSet(bitPackedBuffer, numBitsPerValue);
+  }
+}
diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkPinotDataBitSet.java 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkPinotDataBitSet.java
new file mode 100644
index 0000000..81d3812
--- /dev/null
+++ 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkPinotDataBitSet.java
@@ -0,0 +1,546 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.perf;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.core.io.reader.impl.v1.FixedBitSingleValueReader;
+import org.apache.pinot.core.io.util.FixedBitIntReaderWriter;
+import org.apache.pinot.core.io.util.FixedBitIntReaderWriterV2;
+import org.apache.pinot.core.io.util.PinotDataBitSet;
+import org.apache.pinot.core.io.util.PinotDataBitSetV2;
+import org.apache.pinot.core.io.writer.impl.v1.FixedBitSingleValueWriter;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.ChainedOptionsBuilder;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+@Fork(1)
+@State(Scope.Benchmark)
+public class BenchmarkPinotDataBitSet {
+
+  private static final int CARDINALITY_2_BITS = 3;
+  private static final int CARDINALITY_4_BITS = 12;
+  private static final int CARDINALITY_8_BITS = 190;
+  private static final int CARDINALITY_16_BITS = 40000;
+
+  private File twoBitEncodedFile = new File("/Users/steotia/two-bit-encoded");
+  private File fourBitEncodedFile = new 
File("/Users/steotia/four-bit-encoded");
+  private File eightBitEncodedFile = new 
File("/Users/steotia/eight-bit-encoded");
+  private File sixteenBitEncodedFile = new 
File("/Users/steotia/sixteen-bit-encoded");
+
+  private File rawFile2 = new File("/Users/steotia/raw2");
+  private File rawFile4 = new File("/Users/steotia/raw4");
+  private File rawFile8 = new File("/Users/steotia/raw8");
+  private File rawFile16 = new File("/Users/steotia/raw16");
+
+  static int ROWS = 20_000_000;
+  static int NUM_DOCIDS_WITH_GAPS = 9000;
+
+  private int[] rawValues2 = new int[ROWS];
+  private int[] rawValues4 = new int[ROWS];
+  private int[] rawValues8 = new int[ROWS];
+  private int[] rawValues16 = new int[ROWS];
+
+  PinotDataBuffer _pinotDataBuffer2;
+  private PinotDataBitSet _bitSet2;
+  private PinotDataBitSetV2 _bitSet2Fast;
+  private FixedBitSingleValueReader _bit2Reader;
+  private FixedBitIntReaderWriterV2 _bit2ReaderFast;
+
+  PinotDataBuffer _pinotDataBuffer4;
+  private PinotDataBitSet _bitSet4;
+  private PinotDataBitSetV2 _bitSet4Fast;
+  private FixedBitSingleValueReader _bit4Reader;
+  private FixedBitIntReaderWriterV2 _bit4ReaderFast;
+
+  PinotDataBuffer _pinotDataBuffer8;
+  private PinotDataBitSet _bitSet8;
+  private PinotDataBitSetV2 _bitSet8Fast;
+  private FixedBitSingleValueReader _bit8Reader;
+  private FixedBitIntReaderWriterV2 _bit8ReaderFast;
+
+  PinotDataBuffer _pinotDataBuffer16;
+  private PinotDataBitSet _bitSet16;
+  private PinotDataBitSetV2 _bitSet16Fast;
+  private FixedBitSingleValueReader _bit16Reader;
+  private FixedBitIntReaderWriterV2 _bit16ReaderFast;
+
+  int[] unpacked = new int[32];
+  int[] unalignedUnpacked2Bit = new int[50];
+  int[] unalignedUnpacked4Bit = new int[48];
+  int[] unalignedUnpacked8Bit = new int[51];
+  int[] unalignedUnpacked16Bit = new int[49];
+
+  int[] docIdsWithGaps = new int[NUM_DOCIDS_WITH_GAPS];
+  int[] unpackedWithGaps = new int[NUM_DOCIDS_WITH_GAPS];
+
+  @Setup(Level.Trial)
+  public void setUp() throws Exception {
+    generateRawFile();
+    generateBitEncodedFwdIndex();
+  }
+
+  @TearDown(Level.Trial)
+  public void tearDown() throws Exception {
+    rawFile2.delete();
+    rawFile4.delete();
+    rawFile8.delete();
+    rawFile16.delete();
+
+    twoBitEncodedFile.delete();
+    fourBitEncodedFile.delete();
+    eightBitEncodedFile.delete();
+    sixteenBitEncodedFile.delete();
+
+    _pinotDataBuffer2.close();
+    _pinotDataBuffer4.close();
+    _pinotDataBuffer8.close();
+    _pinotDataBuffer16.close();
+  }
+
+  private void generateRawFile()
+      throws IOException {
+    BufferedWriter bw2 = new BufferedWriter(new FileWriter(rawFile2));
+    BufferedWriter bw4 = new BufferedWriter(new FileWriter(rawFile4));
+    BufferedWriter bw8 = new BufferedWriter(new FileWriter(rawFile8));
+    BufferedWriter bw16 = new BufferedWriter(new FileWriter(rawFile16));
+    Random r = new Random();
+    for (int i = 0; i < ROWS; i++) {
+      rawValues2[i] =  r.nextInt(CARDINALITY_2_BITS);
+      rawValues4[i] =  r.nextInt(CARDINALITY_4_BITS);
+      rawValues8[i] =  r.nextInt(CARDINALITY_8_BITS);
+      rawValues16[i] =  r.nextInt(CARDINALITY_16_BITS);
+      bw2.write("" + rawValues2[i]);
+      bw2.write("\n");
+      bw4.write("" + rawValues4[i]);
+      bw4.write("\n");
+      bw8.write("" + rawValues8[i]);
+      bw8.write("\n");
+      bw16.write("" + rawValues16[i]);
+      bw16.write("\n");
+    }
+    bw2.close();
+    bw4.close();
+    bw8.close();
+    bw16.close();
+  }
+
+  private void generateBitEncodedFwdIndex() throws Exception {
+    generateFwdIndexHelper(rawFile2, twoBitEncodedFile, 2);
+    generateFwdIndexHelper(rawFile4, fourBitEncodedFile, 4);
+    generateFwdIndexHelper(rawFile8, eightBitEncodedFile, 8);
+    generateFwdIndexHelper(rawFile16, sixteenBitEncodedFile, 16);
+
+    _pinotDataBuffer2 = PinotDataBuffer.loadBigEndianFile(twoBitEncodedFile);
+    _bitSet2Fast = PinotDataBitSetV2.createBitSet(_pinotDataBuffer2, 2);
+    _bit2ReaderFast = new FixedBitIntReaderWriterV2(_pinotDataBuffer2, ROWS, 
2);
+    _bitSet2 = new PinotDataBitSet(_pinotDataBuffer2);
+    _bit2Reader = new FixedBitSingleValueReader(_pinotDataBuffer2, ROWS, 2);
+
+    _pinotDataBuffer4 = PinotDataBuffer.loadBigEndianFile(fourBitEncodedFile);
+    _bitSet4Fast = PinotDataBitSetV2.createBitSet(_pinotDataBuffer4, 4);
+    _bit4ReaderFast = new FixedBitIntReaderWriterV2(_pinotDataBuffer4, ROWS, 
4);
+    _bitSet4 = new PinotDataBitSet(_pinotDataBuffer4);
+    _bit4Reader = new FixedBitSingleValueReader(_pinotDataBuffer4, ROWS, 4);
+
+    _pinotDataBuffer8 = PinotDataBuffer.loadBigEndianFile(eightBitEncodedFile);
+    _bitSet8Fast = PinotDataBitSetV2.createBitSet(_pinotDataBuffer8, 8);
+    _bit8ReaderFast = new FixedBitIntReaderWriterV2(_pinotDataBuffer8, ROWS, 
8);
+    _bitSet8 = new PinotDataBitSet(_pinotDataBuffer8);
+    _bit8Reader = new FixedBitSingleValueReader(_pinotDataBuffer8, ROWS, 8);
+
+    _pinotDataBuffer16 = 
PinotDataBuffer.loadBigEndianFile(sixteenBitEncodedFile);
+    _bitSet16Fast = PinotDataBitSetV2.createBitSet(_pinotDataBuffer16, 16);
+    _bit16ReaderFast = new FixedBitIntReaderWriterV2(_pinotDataBuffer16, ROWS, 
16);
+    _bitSet16 = new PinotDataBitSet(_pinotDataBuffer16);
+    _bit16Reader = new FixedBitSingleValueReader(_pinotDataBuffer16, ROWS, 16);
+
+    docIdsWithGaps[0] = 0;
+    for (int i = 1; i < NUM_DOCIDS_WITH_GAPS; i++) {
+      docIdsWithGaps[i] = docIdsWithGaps[i - 1] + 2;
+    }
+  }
+
+  private void generateFwdIndexHelper(File rawFile, File encodedFile, int 
numBits) throws Exception {
+    BufferedReader bfr = new BufferedReader(new FileReader(rawFile));
+    FixedBitSingleValueWriter fixedBitSingleValueWriter = new 
FixedBitSingleValueWriter(encodedFile, ROWS, numBits);
+    String line;
+    int rowId = 0;
+    while ((line = bfr.readLine()) != null) {
+      fixedBitSingleValueWriter.setInt(rowId++, Integer.parseInt(line));
+    }
+    fixedBitSingleValueWriter.close();
+  }
+
+  // 2-bit: test single integer decode in a contiguous manner one docId at a 
time
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public void twoBitContiguous() {
+    for (int startIndex = 0; startIndex < ROWS; startIndex++) {
+      _bitSet2.readInt(startIndex, 2);
+    }
+  }
+
+  // 2-bit: test single integer decode in a contiguous manner one docId at a 
time
+  // with optimized API
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public void twoBitContiguousFast() {
+    for (int startIndex = 0; startIndex < ROWS; startIndex++) {
+      _bitSet2Fast.readInt(startIndex);
+    }
+  }
+
+  // 2-bit: test multi integer decode for a range of contiguous docIds
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public void twoBitBulkContiguous() {
+    for (int startIndex = 0; startIndex < ROWS; startIndex += 32) {
+      _bitSet2.readInt(startIndex, 2, 32, unpacked);
+    }
+  }
+
+  // 2-bit: test multi integer decode for a range of contiguous docIds
+  // with optimized API
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public void twoBitBulkContiguousFast() {
+    for (int startIndex = 0; startIndex < ROWS; startIndex += 32) {
+      _bitSet2Fast.readInt(startIndex, 32, unpacked);
+    }
+  }
+
+  // 2-bit: test multi integer decode for a set of monotonically
+  // increasing docIds with gaps
+  @Benchmark
+  @BenchmarkMode(Mode.Throughput)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public void twoBitBulkWithGaps() {
+    _bit2Reader.readValues(docIdsWithGaps, 0, NUM_DOCIDS_WITH_GAPS, 
unpackedWithGaps, 0);
+  }
+
+  // 2-bit: test multi integer decode for a set of monotonically
+  // increasing docIds with gaps with optimized API
+  @Benchmark
+  @BenchmarkMode(Mode.Throughput)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public void twoBitBulkWithGapsFast() {
+    _bit2ReaderFast.readValues(docIdsWithGaps, 0, NUM_DOCIDS_WITH_GAPS, 
unpackedWithGaps, 0);
+  }
+
+  // 2-bit: test multi integer decode for a range of contiguous docIds
+  // starting at unaligned boundary and spilling over to unaligned boundary
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public void twoBitBulkContiguousUnaligned() {
+    for (int startIndex = 1; startIndex < ROWS - 50; startIndex += 50) {
+      _bitSet2.readInt(startIndex, 2, 50, unalignedUnpacked2Bit);
+    }
+  }
+
+  // 2-bit: test multi integer decode for a range of contiguous docIds
+  // starting at unaligned boundary and spilling over to unaligned boundary
+  // with optimized API
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public void twoBitBulkContiguousUnalignedFast() {
+    for (int startIndex = 1; startIndex < ROWS - 50; startIndex += 50) {
+      _bitSet2Fast.readInt(startIndex, 50, unalignedUnpacked2Bit);
+    }
+  }
+
+  // 4-bit: test single integer decode in a contiguous manner one docId at a 
time
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public void fourBitContiguous() {
+    for (int startIndex = 0; startIndex < ROWS; startIndex++) {
+      _bitSet4.readInt(startIndex, 4);
+    }
+  }
+
+  // 4-bit: test single integer decode in a contiguous manner one docId at a 
time
+  // with optimized API
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public void fourBitContiguousFast() {
+    for (int startIndex = 0; startIndex < ROWS; startIndex++) {
+      _bitSet4Fast.readInt(startIndex);
+    }
+  }
+
+  // 4-bit: test multi integer decode for a range of contiguous docIds
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public void fourBitBulkContiguous() {
+    for (int startIndex = 0; startIndex < ROWS; startIndex += 32) {
+      _bitSet4.readInt(startIndex, 4, 32, unpacked);
+    }
+  }
+
+  // 4-bit: test multi integer decode for a range of contiguous docIds
+  // with optimized API
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public void fourBitBulkContiguousFast() {
+    for (int startIndex = 0; startIndex < ROWS; startIndex += 32) {
+      _bitSet4Fast.readInt(startIndex, 32, unpacked);
+    }
+  }
+
+  // 4-bit: test multi integer decode for a set of monotonically
+  // increasing docIds with gaps
+  @Benchmark
+  @BenchmarkMode(Mode.Throughput)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public void fourBitBulkWithGaps() {
+    _bit4Reader.readValues(docIdsWithGaps, 0, NUM_DOCIDS_WITH_GAPS, 
unpackedWithGaps, 0);
+  }
+
+  // 4-bit: test multi integer decode for a set of monotonically
+  // increasing docIds with gaps with optimized API
+  @Benchmark
+  @BenchmarkMode(Mode.Throughput)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public void fourBitBulkWithGapsFast() {
+    _bit4ReaderFast.readValues(docIdsWithGaps, 0, NUM_DOCIDS_WITH_GAPS, 
unpackedWithGaps, 0);
+  }
+
+  // 4-bit: test multi integer decode for a range of contiguous docIds
+  // starting at unaligned boundary and spilling over to unaligned boundary
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public void fourBitBulkContiguousUnaligned() {
+    for (int startIndex = 1; startIndex < ROWS - 48; startIndex += 48) {
+      _bitSet4.readInt(startIndex, 4, 48, unalignedUnpacked4Bit);
+    }
+  }
+
+  // 4-bit: test multi integer decode for a range of contiguous docIds
+  // starting at unaligned boundary and spilling over to unaligned boundary
+  // with optimized API
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public void fourBitBulkContiguousUnalignedFast() {
+    for (int startIndex = 1; startIndex < ROWS - 48; startIndex += 48) {
+      _bitSet4Fast.readInt(startIndex, 48, unalignedUnpacked8Bit);
+    }
+  }
+
+  // 8-bit: test single integer decode in a contiguous manner one docId at a 
time
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public void eightBitContiguous() {
+    for (int startIndex = 0; startIndex < ROWS; startIndex++) {
+      _bitSet8.readInt(startIndex, 8);
+    }
+  }
+
+  // 8-bit: test single integer decode in a contiguous manner one docId at a 
time
+  // with optimized API
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public void eightBitContiguousFast() {
+    for (int startIndex = 0; startIndex < ROWS; startIndex++) {
+      _bitSet8Fast.readInt(startIndex);
+    }
+  }
+
+  // 8-bit: test multi integer decode for a range of contiguous docIds
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public void eightBitBulkContiguous() {
+    for (int startIndex = 0; startIndex < ROWS; startIndex += 32) {
+      _bitSet8.readInt(startIndex, 8, 32, unpacked);
+    }
+  }
+
+  // 8-bit: test multi integer decode for a range of contiguous docIds
+  // with optimized API
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public void eightBitBulkContiguousFast() {
+    for (int startIndex = 0; startIndex < ROWS; startIndex += 32) {
+      _bitSet8Fast.readInt(startIndex, 32, unpacked);
+    }
+  }
+
+  // 8-bit: test multi integer decode for a set of monotonically
+  // increasing docIds with gaps
+  @Benchmark
+  @BenchmarkMode(Mode.Throughput)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public void eightBitBulkWithGaps() {
+    _bit8Reader.readValues(docIdsWithGaps, 0, NUM_DOCIDS_WITH_GAPS, 
unpackedWithGaps, 0);
+  }
+
+  // 8-bit: test multi integer decode for a set of monotonically
+  // increasing docIds with gaps with optimized API
+  @Benchmark
+  @BenchmarkMode(Mode.Throughput)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public void eightBitBulkWithGapsFast() {
+    _bit8ReaderFast.readValues(docIdsWithGaps, 0, NUM_DOCIDS_WITH_GAPS, 
unpackedWithGaps, 0);
+  }
+
+  // 8-bit: test multi integer decode for a range of contiguous docIds
+  // starting at unaligned boundary and spilling over to unaligned boundary
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public void eightBitBulkContiguousUnaligned() {
+    for (int startIndex = 1; startIndex < ROWS - 51; startIndex += 51) {
+      _bitSet8.readInt(startIndex, 8, 51, unalignedUnpacked8Bit);
+    }
+  }
+
+  // 8-bit: test multi integer decode for a range of contiguous docIds
+  // starting at unaligned boundary and spilling over to unaligned boundary
+  // with optimized API
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public void eightBitBulkContiguousUnalignedFast() {
+    for (int startIndex = 1; startIndex < ROWS - 51; startIndex += 51) {
+      _bitSet8Fast.readInt(startIndex, 51, unalignedUnpacked8Bit);
+    }
+  }
+
+  // 16-bit: test single integer decode in a contiguous manner one docId at a 
time
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public void sixteenBitContiguous() {
+    for (int startIndex = 0; startIndex < ROWS; startIndex++) {
+      _bitSet16.readInt(startIndex, 16);
+    }
+  }
+
+  // 16-bit: test single integer decode in a contiguous manner one docId at a 
time
+  // with optimized API
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public void sixteenBitContiguousFast() {
+    for (int startIndex = 0; startIndex < ROWS; startIndex++) {
+      _bitSet16Fast.readInt(startIndex);
+    }
+  }
+
+  // 16-bit: test multi integer decode for a range of contiguous docIds
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public void sixteenBitBulkContiguous() {
+    for (int startIndex = 0; startIndex < ROWS; startIndex += 32) {
+      _bitSet16.readInt(startIndex, 16, 32, unpacked);
+    }
+  }
+
+  // 2-bit: test multi integer decode for a range of contiguous docIds
+  // with optimized API
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public void sixteenBitBulkContiguousFast() {
+    for (int startIndex = 0; startIndex < ROWS; startIndex += 32) {
+      _bitSet16Fast.readInt(startIndex, 32, unpacked);
+    }
+  }
+
+  // 16-bit: test multi integer decode for a set of monotonically
+  // increasing docIds with gaps
+  @Benchmark
+  @BenchmarkMode(Mode.Throughput)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public void sixteenBitBulkWithGaps() {
+    _bit16Reader.readValues(docIdsWithGaps, 0, NUM_DOCIDS_WITH_GAPS, 
unpackedWithGaps, 0);
+  }
+
+  // 16-bit: test multi integer decode for a set of monotonically
+  // increasing docIds with gaps with optimized API
+  @Benchmark
+  @BenchmarkMode(Mode.Throughput)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public void sixteenBitBulkWithGapsFast() {
+    _bit16ReaderFast.readValues(docIdsWithGaps, 0, NUM_DOCIDS_WITH_GAPS, 
unpackedWithGaps, 0);
+  }
+
+  // 16-bit: test multi integer decode for a range of contiguous docIds
+  // starting at unaligned boundary and spilling over to unaligned boundary
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public void sixteenBitBulkContiguousUnaligned() {
+    for (int startIndex = 1; startIndex < ROWS - 49; startIndex += 49) {
+      _bitSet16.readInt(startIndex, 16, 51, unalignedUnpacked16Bit);
+    }
+  }
+
+  // 16-bit: test multi integer decode for a range of contiguous docIds
+  // starting at unaligned boundary and spilling over to unaligned boundary
+  // with optimized API
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public void sixteenBitBulkContiguousUnalignedFast() {
+    for (int startIndex = 1; startIndex < ROWS - 49; startIndex += 49) {
+      _bitSet16Fast.readInt(startIndex, 49, unalignedUnpacked16Bit);
+    }
+  }
+
+  public static void main(String[] args)
+      throws Exception {
+    ChainedOptionsBuilder opt = new 
OptionsBuilder().include(BenchmarkPinotDataBitSet.class.getSimpleName())
+        .warmupIterations(1).measurementIterations(3);
+    new Runner(opt.build()).run();
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to