Jackie-Jiang commented on a change in pull request #7661:
URL: https://github.com/apache/pinot/pull/7661#discussion_r745184469



##########
File path: 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReaderV4.java
##########
@@ -0,0 +1,299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.readers.forward;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory;
+import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriterV4;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
+import org.apache.pinot.segment.spi.memory.CleanerUtil;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class VarByteChunkSVForwardIndexReaderV4
+    implements 
ForwardIndexReader<VarByteChunkSVForwardIndexReaderV4.ReaderContext> {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(VarByteChunkSVForwardIndexReaderV4.class);
+
+  private static final int METADATA_ENTRY_SIZE = 8;
+
+  private final FieldSpec.DataType _valueType;
+  private final int _targetDecompressedChunkSize;
+  private final ChunkDecompressor _chunkDecompressor;
+  private final ChunkCompressionType _chunkCompressionType;
+
+  private final PinotDataBuffer _metadata;
+  private final PinotDataBuffer _chunks;
+
+  public VarByteChunkSVForwardIndexReaderV4(PinotDataBuffer dataBuffer, 
FieldSpec.DataType valueType) {
+    if (dataBuffer.getInt(0) < VarByteChunkSVForwardIndexWriterV4.VERSION) {
+      throw new IllegalStateException("version " + dataBuffer.getInt(0) + " < "
+          + VarByteChunkSVForwardIndexWriterV4.VERSION);
+    }
+    _valueType = valueType;
+    _targetDecompressedChunkSize = dataBuffer.getInt(4);
+    _chunkCompressionType = ChunkCompressionType.valueOf(dataBuffer.getInt(8));
+    _chunkDecompressor = 
ChunkCompressorFactory.getDecompressor(_chunkCompressionType);
+    int chunksOffset = dataBuffer.getInt(12);
+    // the file has a BE header for compatability reasons (version selection) 
but the content is LE
+    _metadata = dataBuffer.view(16, chunksOffset, ByteOrder.LITTLE_ENDIAN);
+    _chunks = dataBuffer.view(chunksOffset, dataBuffer.size(), 
ByteOrder.LITTLE_ENDIAN);
+  }
+
+  @Override
+  public boolean isDictionaryEncoded() {
+    return false;
+  }
+
+  @Override
+  public boolean isSingleValue() {
+    return true;
+  }
+
+  @Override
+  public FieldSpec.DataType getValueType() {
+    return _valueType;
+  }
+
+  @Override
+  public String getString(int docId, ReaderContext context) {
+    return new String(context.getValue(docId), StandardCharsets.UTF_8);
+  }
+
+  @Override
+  public byte[] getBytes(int docId, ReaderContext context) {
+    return context.getValue(docId);
+  }
+
+  @Nullable
+  @Override
+  public ReaderContext createContext() {
+    return _chunkCompressionType == ChunkCompressionType.PASS_THROUGH
+        ? new UncompressedReaderContext(_chunks, _metadata)
+        : new CompressedReaderContext(_metadata, _chunks, _chunkDecompressor, 
_chunkCompressionType,
+            _targetDecompressedChunkSize);
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+  }
+
+  public static abstract class ReaderContext implements 
ForwardIndexReaderContext {
+
+    protected final PinotDataBuffer _chunks;
+    protected final PinotDataBuffer _metadata;
+    protected int _docIdOffset;
+    protected int _nextDocIdOffset;
+    protected boolean _regularChunk;
+    protected int _numDocsInCurrentChunk;
+
+    protected ReaderContext(PinotDataBuffer metadata, PinotDataBuffer chunks) {
+      _chunks = chunks;
+      _metadata = metadata;
+    }
+
+    public byte[] getValue(int docId) {
+      if (docId >= _docIdOffset && docId < _nextDocIdOffset) {
+        return readSmallUncompressedValue(docId);
+      } else {
+        try {
+          return decompressAndRead(docId);
+        } catch (IOException e) {
+          LOGGER.error("Exception caught while decompressing data chunk", e);
+          throw new RuntimeException(e);
+        }
+      }
+    }
+
+    protected long chunkIndexFor(int docId) {
+      long low = 0;
+      long high = (_metadata.size() / METADATA_ENTRY_SIZE) - 1;
+      while (low <= high) {
+        long mid = (low + high) >>> 1;
+        long position = mid * METADATA_ENTRY_SIZE;
+        int midDocId = _metadata.getInt(position) & 0x7FFFFFFF;
+        if (midDocId < docId) {
+          low = mid + 1;
+        } else if (midDocId > docId) {
+          high = mid - 1;
+        } else {
+          return position;
+        }
+      }
+      return (low - 1) * METADATA_ENTRY_SIZE;
+    }
+
+    protected abstract byte[] processChunkAndReadFirstValue(int docId, long 
offset, long limit)
+        throws IOException;
+
+    protected abstract byte[] readSmallUncompressedValue(int docId);
+
+    private byte[] decompressAndRead(int docId)
+        throws IOException {
+      long metadataEntry = chunkIndexFor(docId);
+      int info = _metadata.getInt(metadataEntry);
+      _docIdOffset = info & 0x7FFFFFFF;
+      _regularChunk = _docIdOffset == info;
+      long offset = _metadata.getInt(metadataEntry + Integer.BYTES) & 
0xFFFFFFFFL;
+      long limit;
+      if (_metadata.size() - METADATA_ENTRY_SIZE > metadataEntry) {
+        _nextDocIdOffset = _metadata.getInt(metadataEntry + 
METADATA_ENTRY_SIZE) & 0x7FFFFFFF;
+        limit = _metadata.getInt(metadataEntry + METADATA_ENTRY_SIZE + 
Integer.BYTES) & 0xFFFFFFFFL;
+      } else {
+        _nextDocIdOffset = Integer.MAX_VALUE;
+        limit = _chunks.size();
+      }
+      return processChunkAndReadFirstValue(docId, offset, limit);
+    }
+  }
+
+  private static final class UncompressedReaderContext extends ReaderContext {
+
+    private ByteBuffer _chunk;
+
+    UncompressedReaderContext(PinotDataBuffer metadata, PinotDataBuffer 
chunks) {
+      super(chunks, metadata);
+    }
+
+    @Override
+    protected byte[] processChunkAndReadFirstValue(int docId, long offset, 
long limit) {
+      _chunk = _chunks.toDirectByteBuffer(offset, (int) (limit - offset));
+      if (!_regularChunk) {
+        return readHugeValue();
+      }
+      _numDocsInCurrentChunk = _chunk.getInt(0);
+      return readSmallUncompressedValue(docId);
+    }
+
+    private byte[] readHugeValue() {
+      byte[] value = new byte[_chunk.capacity()];
+      _chunk.get(value);
+      return value;
+    }
+
+    @Override
+    protected byte[] readSmallUncompressedValue(int docId) {
+      int index = docId - _docIdOffset;
+      int offset = _chunk.getInt((index + 1) * Integer.BYTES);
+      int nextOffset = index == _numDocsInCurrentChunk - 1
+          ? _chunk.limit()
+          : _chunk.getInt((index + 2) * Integer.BYTES);
+      ByteBuffer view = _chunk.duplicate();
+      view.position(offset);
+      view.order(ByteOrder.LITTLE_ENDIAN);
+      byte[] bytes = new byte[nextOffset - offset];
+      view.get(bytes);
+      return bytes;
+    }
+
+    @Override
+    public void close()
+        throws IOException {
+    }
+  }
+
+  private static final class CompressedReaderContext extends ReaderContext {
+
+    private final ByteBuffer _decompressedBuffer;
+    private final ChunkDecompressor _chunkDecompressor;
+    private final ChunkCompressionType _chunkCompressionType;
+
+    CompressedReaderContext(PinotDataBuffer metadata, PinotDataBuffer chunks, 
ChunkDecompressor chunkDecompressor,
+        ChunkCompressionType chunkCompressionType, int targetChunkSize) {
+      super(metadata, chunks);
+      _chunkDecompressor = chunkDecompressor;
+      _chunkCompressionType = chunkCompressionType;
+      _decompressedBuffer = 
ByteBuffer.allocateDirect(targetChunkSize).order(ByteOrder.LITTLE_ENDIAN);
+    }
+
+    @Override
+    protected byte[] processChunkAndReadFirstValue(int docId, long offset, 
long limit)
+        throws IOException {
+      _decompressedBuffer.clear();
+      ByteBuffer compressed = _chunks.toDirectByteBuffer(offset, (int) (limit 
- offset));
+      int decompressedLength = 
_chunkDecompressor.decompressedLength(compressed);

Review comment:
       `decompressedLength` is only needed for huge values, suggest moving it 
after the if check (not sure if java compiler is able to skip this line for 
regular chunk)

##########
File path: 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReaderV4.java
##########
@@ -0,0 +1,299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.readers.forward;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory;
+import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriterV4;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
+import org.apache.pinot.segment.spi.memory.CleanerUtil;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class VarByteChunkSVForwardIndexReaderV4
+    implements 
ForwardIndexReader<VarByteChunkSVForwardIndexReaderV4.ReaderContext> {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(VarByteChunkSVForwardIndexReaderV4.class);
+
+  private static final int METADATA_ENTRY_SIZE = 8;
+
+  private final FieldSpec.DataType _valueType;
+  private final int _targetDecompressedChunkSize;
+  private final ChunkDecompressor _chunkDecompressor;
+  private final ChunkCompressionType _chunkCompressionType;
+
+  private final PinotDataBuffer _metadata;
+  private final PinotDataBuffer _chunks;
+
+  public VarByteChunkSVForwardIndexReaderV4(PinotDataBuffer dataBuffer, 
FieldSpec.DataType valueType) {
+    if (dataBuffer.getInt(0) < VarByteChunkSVForwardIndexWriterV4.VERSION) {
+      throw new IllegalStateException("version " + dataBuffer.getInt(0) + " < "
+          + VarByteChunkSVForwardIndexWriterV4.VERSION);
+    }
+    _valueType = valueType;
+    _targetDecompressedChunkSize = dataBuffer.getInt(4);
+    _chunkCompressionType = ChunkCompressionType.valueOf(dataBuffer.getInt(8));
+    _chunkDecompressor = 
ChunkCompressorFactory.getDecompressor(_chunkCompressionType);
+    int chunksOffset = dataBuffer.getInt(12);
+    // the file has a BE header for compatability reasons (version selection) 
but the content is LE
+    _metadata = dataBuffer.view(16, chunksOffset, ByteOrder.LITTLE_ENDIAN);
+    _chunks = dataBuffer.view(chunksOffset, dataBuffer.size(), 
ByteOrder.LITTLE_ENDIAN);
+  }
+
+  @Override
+  public boolean isDictionaryEncoded() {
+    return false;
+  }
+
+  @Override
+  public boolean isSingleValue() {
+    return true;
+  }
+
+  @Override
+  public FieldSpec.DataType getValueType() {
+    return _valueType;
+  }
+
+  @Override
+  public String getString(int docId, ReaderContext context) {
+    return new String(context.getValue(docId), StandardCharsets.UTF_8);
+  }
+
+  @Override
+  public byte[] getBytes(int docId, ReaderContext context) {
+    return context.getValue(docId);
+  }
+
+  @Nullable
+  @Override
+  public ReaderContext createContext() {
+    return _chunkCompressionType == ChunkCompressionType.PASS_THROUGH
+        ? new UncompressedReaderContext(_chunks, _metadata)
+        : new CompressedReaderContext(_metadata, _chunks, _chunkDecompressor, 
_chunkCompressionType,
+            _targetDecompressedChunkSize);
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+  }
+
+  public static abstract class ReaderContext implements 
ForwardIndexReaderContext {
+
+    protected final PinotDataBuffer _chunks;
+    protected final PinotDataBuffer _metadata;
+    protected int _docIdOffset;
+    protected int _nextDocIdOffset;
+    protected boolean _regularChunk;
+    protected int _numDocsInCurrentChunk;
+
+    protected ReaderContext(PinotDataBuffer metadata, PinotDataBuffer chunks) {
+      _chunks = chunks;
+      _metadata = metadata;
+    }
+
+    public byte[] getValue(int docId) {
+      if (docId >= _docIdOffset && docId < _nextDocIdOffset) {
+        return readSmallUncompressedValue(docId);
+      } else {
+        try {
+          return decompressAndRead(docId);
+        } catch (IOException e) {
+          LOGGER.error("Exception caught while decompressing data chunk", e);
+          throw new RuntimeException(e);
+        }
+      }
+    }
+
+    protected long chunkIndexFor(int docId) {
+      long low = 0;
+      long high = (_metadata.size() / METADATA_ENTRY_SIZE) - 1;
+      while (low <= high) {
+        long mid = (low + high) >>> 1;
+        long position = mid * METADATA_ENTRY_SIZE;
+        int midDocId = _metadata.getInt(position) & 0x7FFFFFFF;
+        if (midDocId < docId) {
+          low = mid + 1;
+        } else if (midDocId > docId) {
+          high = mid - 1;
+        } else {
+          return position;
+        }
+      }
+      return (low - 1) * METADATA_ENTRY_SIZE;
+    }
+
+    protected abstract byte[] processChunkAndReadFirstValue(int docId, long 
offset, long limit)
+        throws IOException;
+
+    protected abstract byte[] readSmallUncompressedValue(int docId);
+
+    private byte[] decompressAndRead(int docId)
+        throws IOException {
+      long metadataEntry = chunkIndexFor(docId);
+      int info = _metadata.getInt(metadataEntry);
+      _docIdOffset = info & 0x7FFFFFFF;
+      _regularChunk = _docIdOffset == info;
+      long offset = _metadata.getInt(metadataEntry + Integer.BYTES) & 
0xFFFFFFFFL;
+      long limit;
+      if (_metadata.size() - METADATA_ENTRY_SIZE > metadataEntry) {
+        _nextDocIdOffset = _metadata.getInt(metadataEntry + 
METADATA_ENTRY_SIZE) & 0x7FFFFFFF;
+        limit = _metadata.getInt(metadataEntry + METADATA_ENTRY_SIZE + 
Integer.BYTES) & 0xFFFFFFFFL;
+      } else {
+        _nextDocIdOffset = Integer.MAX_VALUE;
+        limit = _chunks.size();
+      }
+      return processChunkAndReadFirstValue(docId, offset, limit);
+    }
+  }
+
+  private static final class UncompressedReaderContext extends ReaderContext {
+
+    private ByteBuffer _chunk;
+
+    UncompressedReaderContext(PinotDataBuffer metadata, PinotDataBuffer 
chunks) {
+      super(chunks, metadata);
+    }
+
+    @Override
+    protected byte[] processChunkAndReadFirstValue(int docId, long offset, 
long limit) {
+      _chunk = _chunks.toDirectByteBuffer(offset, (int) (limit - offset));
+      if (!_regularChunk) {
+        return readHugeValue();
+      }
+      _numDocsInCurrentChunk = _chunk.getInt(0);
+      return readSmallUncompressedValue(docId);
+    }
+
+    private byte[] readHugeValue() {
+      byte[] value = new byte[_chunk.capacity()];
+      _chunk.get(value);
+      return value;
+    }
+
+    @Override
+    protected byte[] readSmallUncompressedValue(int docId) {
+      int index = docId - _docIdOffset;
+      int offset = _chunk.getInt((index + 1) * Integer.BYTES);
+      int nextOffset = index == _numDocsInCurrentChunk - 1
+          ? _chunk.limit()
+          : _chunk.getInt((index + 2) * Integer.BYTES);
+      ByteBuffer view = _chunk.duplicate();

Review comment:
       Since it is always single-threaded, we can directly read from the 
`_chunk` without duplicating it

##########
File path: 
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/VarByteChunkV4Test.java
##########
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.creator;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.commons.io.FileUtils;
+import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriterV4;
+import 
org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReaderV4;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class VarByteChunkV4Test {
+
+  private static final String DIR_NAME = System.getProperty("java.io.tmpdir") 
+ File.separator
+      + "VarByteChunkV4Test";

Review comment:
       Consistent with other tests
   ```suggestion
     private static final String DIR_NAME = new 
File(FileUtils.getTempDirectory(), "VarByteChunkV4Test");
   ```

##########
File path: 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReaderV4.java
##########
@@ -0,0 +1,299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.readers.forward;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory;
+import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriterV4;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
+import org.apache.pinot.segment.spi.memory.CleanerUtil;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class VarByteChunkSVForwardIndexReaderV4
+    implements 
ForwardIndexReader<VarByteChunkSVForwardIndexReaderV4.ReaderContext> {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(VarByteChunkSVForwardIndexReaderV4.class);
+
+  private static final int METADATA_ENTRY_SIZE = 8;
+
+  private final FieldSpec.DataType _valueType;
+  private final int _targetDecompressedChunkSize;
+  private final ChunkDecompressor _chunkDecompressor;
+  private final ChunkCompressionType _chunkCompressionType;
+
+  private final PinotDataBuffer _metadata;
+  private final PinotDataBuffer _chunks;
+
+  public VarByteChunkSVForwardIndexReaderV4(PinotDataBuffer dataBuffer, 
FieldSpec.DataType valueType) {
+    if (dataBuffer.getInt(0) < VarByteChunkSVForwardIndexWriterV4.VERSION) {
+      throw new IllegalStateException("version " + dataBuffer.getInt(0) + " < "
+          + VarByteChunkSVForwardIndexWriterV4.VERSION);
+    }
+    _valueType = valueType;
+    _targetDecompressedChunkSize = dataBuffer.getInt(4);
+    _chunkCompressionType = ChunkCompressionType.valueOf(dataBuffer.getInt(8));
+    _chunkDecompressor = 
ChunkCompressorFactory.getDecompressor(_chunkCompressionType);
+    int chunksOffset = dataBuffer.getInt(12);
+    // the file has a BE header for compatability reasons (version selection) 
but the content is LE
+    _metadata = dataBuffer.view(16, chunksOffset, ByteOrder.LITTLE_ENDIAN);
+    _chunks = dataBuffer.view(chunksOffset, dataBuffer.size(), 
ByteOrder.LITTLE_ENDIAN);
+  }
+
+  @Override
+  public boolean isDictionaryEncoded() {
+    return false;
+  }
+
+  @Override
+  public boolean isSingleValue() {
+    return true;
+  }
+
+  @Override
+  public FieldSpec.DataType getValueType() {
+    return _valueType;
+  }
+
+  @Override
+  public String getString(int docId, ReaderContext context) {
+    return new String(context.getValue(docId), StandardCharsets.UTF_8);

Review comment:
       When reading string values, we can reuse the bytes (see 
`VarByteChunkSVForwardIndexReader` for details).
   
   There are pros and cons of reusing the bytes. I'm not sure how well the 
modern JVM handles allocation and collection of these short-lived objects. 
Suggest adding some comments if you intentionally not reuse the bytes.

##########
File path: 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReaderV4.java
##########
@@ -0,0 +1,299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.readers.forward;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory;
+import 
org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriterV4;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
+import org.apache.pinot.segment.spi.memory.CleanerUtil;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class VarByteChunkSVForwardIndexReaderV4
+    implements 
ForwardIndexReader<VarByteChunkSVForwardIndexReaderV4.ReaderContext> {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(VarByteChunkSVForwardIndexReaderV4.class);
+
+  private static final int METADATA_ENTRY_SIZE = 8;
+
+  private final FieldSpec.DataType _valueType;
+  private final int _targetDecompressedChunkSize;
+  private final ChunkDecompressor _chunkDecompressor;
+  private final ChunkCompressionType _chunkCompressionType;
+
+  private final PinotDataBuffer _metadata;
+  private final PinotDataBuffer _chunks;
+
+  public VarByteChunkSVForwardIndexReaderV4(PinotDataBuffer dataBuffer, 
FieldSpec.DataType valueType) {
+    if (dataBuffer.getInt(0) < VarByteChunkSVForwardIndexWriterV4.VERSION) {
+      throw new IllegalStateException("version " + dataBuffer.getInt(0) + " < "
+          + VarByteChunkSVForwardIndexWriterV4.VERSION);
+    }
+    _valueType = valueType;
+    _targetDecompressedChunkSize = dataBuffer.getInt(4);
+    _chunkCompressionType = ChunkCompressionType.valueOf(dataBuffer.getInt(8));
+    _chunkDecompressor = 
ChunkCompressorFactory.getDecompressor(_chunkCompressionType);
+    int chunksOffset = dataBuffer.getInt(12);
+    // the file has a BE header for compatability reasons (version selection) 
but the content is LE
+    _metadata = dataBuffer.view(16, chunksOffset, ByteOrder.LITTLE_ENDIAN);
+    _chunks = dataBuffer.view(chunksOffset, dataBuffer.size(), 
ByteOrder.LITTLE_ENDIAN);
+  }
+
+  @Override
+  public boolean isDictionaryEncoded() {
+    return false;
+  }
+
+  @Override
+  public boolean isSingleValue() {
+    return true;
+  }
+
+  @Override
+  public FieldSpec.DataType getValueType() {
+    return _valueType;
+  }
+
+  @Override
+  public String getString(int docId, ReaderContext context) {
+    return new String(context.getValue(docId), StandardCharsets.UTF_8);
+  }
+
+  @Override
+  public byte[] getBytes(int docId, ReaderContext context) {
+    return context.getValue(docId);
+  }
+
+  @Nullable
+  @Override
+  public ReaderContext createContext() {
+    return _chunkCompressionType == ChunkCompressionType.PASS_THROUGH
+        ? new UncompressedReaderContext(_chunks, _metadata)
+        : new CompressedReaderContext(_metadata, _chunks, _chunkDecompressor, 
_chunkCompressionType,
+            _targetDecompressedChunkSize);
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+  }
+
+  public static abstract class ReaderContext implements 
ForwardIndexReaderContext {
+
+    protected final PinotDataBuffer _chunks;
+    protected final PinotDataBuffer _metadata;
+    protected int _docIdOffset;
+    protected int _nextDocIdOffset;
+    protected boolean _regularChunk;
+    protected int _numDocsInCurrentChunk;
+
+    protected ReaderContext(PinotDataBuffer metadata, PinotDataBuffer chunks) {
+      _chunks = chunks;
+      _metadata = metadata;
+    }
+
+    public byte[] getValue(int docId) {
+      if (docId >= _docIdOffset && docId < _nextDocIdOffset) {
+        return readSmallUncompressedValue(docId);
+      } else {
+        try {
+          return decompressAndRead(docId);
+        } catch (IOException e) {
+          LOGGER.error("Exception caught while decompressing data chunk", e);
+          throw new RuntimeException(e);
+        }
+      }
+    }
+
+    protected long chunkIndexFor(int docId) {
+      long low = 0;
+      long high = (_metadata.size() / METADATA_ENTRY_SIZE) - 1;
+      while (low <= high) {
+        long mid = (low + high) >>> 1;
+        long position = mid * METADATA_ENTRY_SIZE;
+        int midDocId = _metadata.getInt(position) & 0x7FFFFFFF;
+        if (midDocId < docId) {
+          low = mid + 1;
+        } else if (midDocId > docId) {
+          high = mid - 1;
+        } else {
+          return position;
+        }
+      }
+      return (low - 1) * METADATA_ENTRY_SIZE;
+    }
+
+    protected abstract byte[] processChunkAndReadFirstValue(int docId, long 
offset, long limit)
+        throws IOException;
+
+    protected abstract byte[] readSmallUncompressedValue(int docId);
+
+    private byte[] decompressAndRead(int docId)
+        throws IOException {
+      long metadataEntry = chunkIndexFor(docId);
+      int info = _metadata.getInt(metadataEntry);
+      _docIdOffset = info & 0x7FFFFFFF;
+      _regularChunk = _docIdOffset == info;
+      long offset = _metadata.getInt(metadataEntry + Integer.BYTES) & 
0xFFFFFFFFL;
+      long limit;
+      if (_metadata.size() - METADATA_ENTRY_SIZE > metadataEntry) {
+        _nextDocIdOffset = _metadata.getInt(metadataEntry + 
METADATA_ENTRY_SIZE) & 0x7FFFFFFF;
+        limit = _metadata.getInt(metadataEntry + METADATA_ENTRY_SIZE + 
Integer.BYTES) & 0xFFFFFFFFL;
+      } else {
+        _nextDocIdOffset = Integer.MAX_VALUE;
+        limit = _chunks.size();
+      }
+      return processChunkAndReadFirstValue(docId, offset, limit);
+    }
+  }
+
+  private static final class UncompressedReaderContext extends ReaderContext {
+
+    private ByteBuffer _chunk;
+
+    UncompressedReaderContext(PinotDataBuffer metadata, PinotDataBuffer 
chunks) {
+      super(chunks, metadata);
+    }
+
+    @Override
+    protected byte[] processChunkAndReadFirstValue(int docId, long offset, 
long limit) {
+      _chunk = _chunks.toDirectByteBuffer(offset, (int) (limit - offset));
+      if (!_regularChunk) {
+        return readHugeValue();
+      }
+      _numDocsInCurrentChunk = _chunk.getInt(0);
+      return readSmallUncompressedValue(docId);
+    }
+
+    private byte[] readHugeValue() {
+      byte[] value = new byte[_chunk.capacity()];
+      _chunk.get(value);
+      return value;
+    }
+
+    @Override
+    protected byte[] readSmallUncompressedValue(int docId) {
+      int index = docId - _docIdOffset;
+      int offset = _chunk.getInt((index + 1) * Integer.BYTES);
+      int nextOffset = index == _numDocsInCurrentChunk - 1
+          ? _chunk.limit()
+          : _chunk.getInt((index + 2) * Integer.BYTES);
+      ByteBuffer view = _chunk.duplicate();
+      view.position(offset);
+      view.order(ByteOrder.LITTLE_ENDIAN);
+      byte[] bytes = new byte[nextOffset - offset];
+      view.get(bytes);
+      return bytes;
+    }
+
+    @Override
+    public void close()
+        throws IOException {
+    }
+  }
+
+  private static final class CompressedReaderContext extends ReaderContext {
+
+    private final ByteBuffer _decompressedBuffer;
+    private final ChunkDecompressor _chunkDecompressor;
+    private final ChunkCompressionType _chunkCompressionType;
+
+    CompressedReaderContext(PinotDataBuffer metadata, PinotDataBuffer chunks, 
ChunkDecompressor chunkDecompressor,
+        ChunkCompressionType chunkCompressionType, int targetChunkSize) {
+      super(metadata, chunks);
+      _chunkDecompressor = chunkDecompressor;
+      _chunkCompressionType = chunkCompressionType;
+      _decompressedBuffer = 
ByteBuffer.allocateDirect(targetChunkSize).order(ByteOrder.LITTLE_ENDIAN);
+    }
+
+    @Override
+    protected byte[] processChunkAndReadFirstValue(int docId, long offset, 
long limit)
+        throws IOException {
+      _decompressedBuffer.clear();
+      ByteBuffer compressed = _chunks.toDirectByteBuffer(offset, (int) (limit 
- offset));
+      int decompressedLength = 
_chunkDecompressor.decompressedLength(compressed);
+      if (_regularChunk) {
+        _chunkDecompressor.decompress(compressed, _decompressedBuffer);
+        _numDocsInCurrentChunk = _decompressedBuffer.getInt(0);
+        return readSmallUncompressedValue(docId);
+      }
+      // huge value, no benefit from buffering, return the whole thing
+      return readHugeCompressedValue(compressed, decompressedLength);
+    }
+
+    @Override
+    protected byte[] readSmallUncompressedValue(int docId) {
+      int index = docId - _docIdOffset;
+      int offset = _decompressedBuffer.getInt((index + 1) * Integer.BYTES);
+      int nextOffset = index == _numDocsInCurrentChunk - 1
+          ? _decompressedBuffer.limit()
+          : _decompressedBuffer.getInt((index + 2) * Integer.BYTES);
+      ByteBuffer view = _decompressedBuffer.duplicate();

Review comment:
       Since it is always single-threaded, we can directly read from the 
`_chunk` without duplicating it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to