Jackie-Jiang commented on a change in pull request #5816: URL: https://github.com/apache/incubator-pinot/pull/5816#discussion_r466085122
########## File path: pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java ########## @@ -19,92 +19,172 @@ package org.apache.pinot.core.segment.index.readers.forward; import java.nio.ByteBuffer; +import javax.annotation.Nullable; import org.apache.pinot.common.utils.StringUtil; import org.apache.pinot.core.io.writer.impl.VarByteChunkSVForwardIndexWriter; import org.apache.pinot.core.segment.memory.PinotDataBuffer; import org.apache.pinot.spi.data.FieldSpec.DataType; /** - * Chunk-based single-value raw (non-dictionary-encoded) forward index reader for values of of variable length data - * type (STRING, BYTES). + * Chunk-based single-value raw (non-dictionary-encoded) forward index reader for values of variable length data type + * (STRING, BYTES). * <p>For data layout, please refer to the documentation for {@link VarByteChunkSVForwardIndexWriter} */ public final class VarByteChunkSVForwardIndexReader extends BaseChunkSVForwardIndexReader { + private static final int ROW_OFFSET_SIZE = VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE; + private final int _maxChunkSize; // Thread local (reusable) byte[] to read bytes from data file. private final ThreadLocal<byte[]> _reusableBytes = ThreadLocal.withInitial(() -> new byte[_lengthOfLongestEntry]); public VarByteChunkSVForwardIndexReader(PinotDataBuffer dataBuffer, DataType valueType) { super(dataBuffer, valueType); - _maxChunkSize = _numDocsPerChunk * (VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE - + _lengthOfLongestEntry); + _maxChunkSize = _numDocsPerChunk * (ROW_OFFSET_SIZE + _lengthOfLongestEntry); } + @Nullable @Override public ChunkReaderContext createContext() { - return new ChunkReaderContext(_maxChunkSize); + if (_isCompressed) { + return new ChunkReaderContext(_maxChunkSize); + } else { + return null; + } } @Override public String getString(int docId, ChunkReaderContext context) { + if (_isCompressed) { + return getStringCompressed(docId, context); + } else { + return getStringUncompressed(docId); + } + } + + /** + * Helper method to read STRING value from the compressed index. + */ + private String getStringCompressed(int docId, ChunkReaderContext context) { int chunkRowId = docId % _numDocsPerChunk; ByteBuffer chunkBuffer = getChunkBuffer(docId, context); - int rowOffset = - chunkBuffer.getInt(chunkRowId * VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE); - int nextRowOffset = getNextRowOffset(chunkRowId, chunkBuffer); + // These offsets are offset in the chunk buffer + int valueStartOffset = chunkBuffer.getInt(chunkRowId * ROW_OFFSET_SIZE); + int valueEndOffset = getValueEndOffset(chunkRowId, chunkBuffer); - int length = nextRowOffset - rowOffset; + int length = valueEndOffset - valueStartOffset; byte[] bytes = _reusableBytes.get(); - - chunkBuffer.position(rowOffset); + chunkBuffer.position(valueStartOffset); chunkBuffer.get(bytes, 0, length); + return StringUtil.decodeUtf8(bytes, 0, length); + } + + /** + * Helper method to read STRING value from the uncompressed index. + */ + private String getStringUncompressed(int docId) { + int chunkId = docId / _numDocsPerChunk; + int chunkRowId = docId % _numDocsPerChunk; + + // These offsets are offset in the data buffer + long chunkStartOffset = getChunkPosition(chunkId); + long valueStartOffset = chunkStartOffset + _dataBuffer.getInt(chunkStartOffset + chunkRowId * ROW_OFFSET_SIZE); + long valueEndOffset = getValueEndOffset(chunkId, chunkRowId, chunkStartOffset); + int length = (int) (valueEndOffset - valueStartOffset); + byte[] bytes = _reusableBytes.get(); + _dataBuffer.copyTo(valueStartOffset, bytes, 0, length); return StringUtil.decodeUtf8(bytes, 0, length); } @Override public byte[] getBytes(int docId, ChunkReaderContext context) { + if (_isCompressed) { + return getBytesCompressed(docId, context); + } else { + return getBytesUncompressed(docId); + } + } + + /** + * Helper method to read BYTES value from the compressed index. + */ + private byte[] getBytesCompressed(int docId, ChunkReaderContext context) { int chunkRowId = docId % _numDocsPerChunk; ByteBuffer chunkBuffer = getChunkBuffer(docId, context); - int rowOffset = - chunkBuffer.getInt(chunkRowId * VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE); - int nextRowOffset = getNextRowOffset(chunkRowId, chunkBuffer); + // These offsets are offset in the chunk buffer + int valueStartOffset = chunkBuffer.getInt(chunkRowId * ROW_OFFSET_SIZE); + int valueEndOffset = getValueEndOffset(chunkRowId, chunkBuffer); - int length = nextRowOffset - rowOffset; - byte[] bytes = new byte[length]; + byte[] bytes = new byte[valueEndOffset - valueStartOffset]; + chunkBuffer.position(valueStartOffset); + chunkBuffer.get(bytes); + return bytes; + } - chunkBuffer.position(rowOffset); - chunkBuffer.get(bytes, 0, length); + /** + * Helper method to read BYTES value from the uncompressed index. + */ + private byte[] getBytesUncompressed(int docId) { + int chunkId = docId / _numDocsPerChunk; + int chunkRowId = docId % _numDocsPerChunk; + + // These offsets are offset in the data buffer + long chunkStartOffset = getChunkPosition(chunkId); + long valueStartOffset = chunkStartOffset + _dataBuffer.getInt(chunkStartOffset + chunkRowId * ROW_OFFSET_SIZE); + long valueEndOffset = getValueEndOffset(chunkId, chunkRowId, chunkStartOffset); + + byte[] bytes = new byte[(int) (valueEndOffset - valueStartOffset)]; + _dataBuffer.copyTo(valueStartOffset, bytes); return bytes; } /** - * Helper method to compute the offset of next row in the chunk buffer. - * - * @param currentRowId Current row id within the chunk buffer. - * @param chunkBuffer Chunk buffer containing the rows. - * - * @return Offset of next row within the chunk buffer. If current row is the last one, - * chunkBuffer.limit() is returned. + * Helper method to compute the end offset of the value in the chunk buffer. */ - private int getNextRowOffset(int currentRowId, ByteBuffer chunkBuffer) { - int nextRowOffset; + private int getValueEndOffset(int rowId, ByteBuffer chunkBuffer) { + if (rowId == _numDocsPerChunk - 1) { + // Last row in the trunk + return chunkBuffer.limit(); + } else { + int valueEndOffset = chunkBuffer.getInt((rowId + 1) * ROW_OFFSET_SIZE); + if (valueEndOffset == 0) { + // Last row in the last chunk (chunk is incomplete, which stores 0 as the offset for the absent rows) + return chunkBuffer.limit(); + } else { + return valueEndOffset; + } + } + } - if (currentRowId == _numDocsPerChunk - 1) { - // Last row in this trunk. - nextRowOffset = chunkBuffer.limit(); + /** + * Helper method to compute the end offset of the value in the data buffer. + */ + private long getValueEndOffset(int chunkId, int chunkRowId, long chunkStartOffset) { Review comment: Algorithm is slightly different because with the `chunkBuffer` we can directly get the `chunkEndOffset` via `chunkBuffer.limit()`, which is not the case for the uncompressed one. That is why we have a branch on the last chunk. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org