This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new d4d6be4 implement FixedByteChunkMVForwardIndexReader, address post-review comments (#7629) d4d6be4 is described below commit d4d6be4e4cc9c84a8da9722b001e8f70ba3781ad Author: Richard Startin <rich...@startree.ai> AuthorDate: Tue Oct 26 00:25:13 2021 +0100 implement FixedByteChunkMVForwardIndexReader, address post-review comments (#7629) --- .../creator/impl/SegmentColumnarIndexCreator.java | 42 ++--- .../fwd/MultiValueFixedByteRawIndexCreator.java | 32 ++-- .../impl/fwd/MultiValueVarByteRawIndexCreator.java | 31 ++-- .../index/column/PhysicalColumnIndexContainer.java | 13 +- .../FixedByteChunkMVForwardIndexReader.java | 178 +++++++++++++++++++++ .../MultiValueFixedByteRawIndexCreatorTest.java | 178 +++++++++++++++++++++ .../MultiValueVarByteRawIndexCreatorTest.java | 6 +- .../spi/index/reader/ForwardIndexReader.java | 4 - 8 files changed, 404 insertions(+), 80 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java index dcc0ea2..0ea31d5 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java @@ -23,7 +23,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import java.io.File; import java.io.IOException; -import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -82,6 +81,7 @@ import org.joda.time.format.DateTimeFormatter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.*; import static org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Segment.*; @@ -476,23 +476,16 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { value = FieldConfig.TEXT_INDEX_DEFAULT_RAW_VALUE; } if (forwardIndexCreator.getValueType().getStoredType() == DataType.STRING) { - value = String.valueOf(value); - int length = ((String[]) columnValueToIndex).length; - columnValueToIndex = new String[length]; - Arrays.fill((String[]) columnValueToIndex, value); + columnValueToIndex = new String[] {String.valueOf(value)}; } else if (forwardIndexCreator.getValueType().getStoredType() == DataType.BYTES) { - int length = ((byte[][]) columnValueToIndex).length; - columnValueToIndex = new byte[length][]; - Arrays.fill((byte[][]) columnValueToIndex, String.valueOf(value).getBytes()); + columnValueToIndex = new byte[][] {String.valueOf(value).getBytes(UTF_8)}; } else { throw new RuntimeException("Text Index is only supported for STRING and BYTES stored type"); } } switch (forwardIndexCreator.getValueType()) { case INT: - if (columnValueToIndex instanceof int[]) { - forwardIndexCreator.putIntMV((int[]) columnValueToIndex); - } else if (columnValueToIndex instanceof Object[]) { + if (columnValueToIndex instanceof Object[]) { int[] array = new int[((Object[]) columnValueToIndex).length]; for (int i = 0; i < array.length; i++) { array[i] = (Integer) ((Object[]) columnValueToIndex)[i]; @@ -501,9 +494,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { } break; case LONG: - if (columnValueToIndex instanceof long[]) { - forwardIndexCreator.putLongMV((long[]) columnValueToIndex); - } else if (columnValueToIndex instanceof Object[]) { + if (columnValueToIndex instanceof Object[]) { long[] array = new long[((Object[]) columnValueToIndex).length]; for (int i = 0; i < array.length; i++) { array[i] = (Long) ((Object[]) columnValueToIndex)[i]; @@ -512,9 +503,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { } break; case FLOAT: - if (columnValueToIndex instanceof float[]) { - forwardIndexCreator.putFloatMV((float[]) columnValueToIndex); - } else if (columnValueToIndex instanceof Object[]) { + if (columnValueToIndex instanceof Object[]) { float[] array = new float[((Object[]) columnValueToIndex).length]; for (int i = 0; i < array.length; i++) { array[i] = (Float) ((Object[]) columnValueToIndex)[i]; @@ -523,9 +512,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { } break; case DOUBLE: - if (columnValueToIndex instanceof double[]) { - forwardIndexCreator.putDoubleMV((double[]) columnValueToIndex); - } else if (columnValueToIndex instanceof Object[]) { + if (columnValueToIndex instanceof Object[]) { double[] array = new double[((Object[]) columnValueToIndex).length]; for (int i = 0; i < array.length; i++) { array[i] = (Double) ((Object[]) columnValueToIndex)[i]; @@ -835,10 +822,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { * @param writerVersion version to use for the raw index writer * @return raw index creator */ - public static ForwardIndexCreator getRawIndexCreatorForSVColumn(File file, - ChunkCompressionType compressionType, - String column, DataType dataType, int totalDocs, int lengthOfLongestEntry, - boolean deriveNumDocsPerChunk, + public static ForwardIndexCreator getRawIndexCreatorForSVColumn(File file, ChunkCompressionType compressionType, + String column, DataType dataType, int totalDocs, int lengthOfLongestEntry, boolean deriveNumDocsPerChunk, int writerVersion) throws IOException { switch (dataType.getStoredType()) { @@ -871,9 +856,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { * @return raw index creator */ public static ForwardIndexCreator getRawIndexCreatorForMVColumn(File file, ChunkCompressionType compressionType, - String column, DataType dataType, final int totalDocs, - final int maxNumberOfMultiValueElements, boolean deriveNumDocsPerChunk, int writerVersion, - int maxRowLengthInBytes) + String column, DataType dataType, final int totalDocs, int maxNumberOfMultiValueElements, + boolean deriveNumDocsPerChunk, int writerVersion, int maxRowLengthInBytes) throws IOException { switch (dataType.getStoredType()) { case INT: @@ -881,11 +865,11 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { case FLOAT: case DOUBLE: return new MultiValueFixedByteRawIndexCreator(file, compressionType, column, totalDocs, dataType, - dataType.getStoredType().size(), maxNumberOfMultiValueElements, deriveNumDocsPerChunk, writerVersion); + maxNumberOfMultiValueElements, deriveNumDocsPerChunk, writerVersion); case STRING: case BYTES: return new MultiValueVarByteRawIndexCreator(file, compressionType, column, totalDocs, dataType, writerVersion, - maxRowLengthInBytes); + maxRowLengthInBytes, maxNumberOfMultiValueElements); default: throw new UnsupportedOperationException( "Data type not supported for raw indexing: " + dataType); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java index 572c793..c43f8b7 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java @@ -18,11 +18,9 @@ */ package org.apache.pinot.segment.local.segment.creator.impl.fwd; -import com.google.common.annotations.VisibleForTesting; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; -import org.apache.commons.io.FileUtils; import org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter; import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter; import org.apache.pinot.segment.spi.V1Constants.Indexes; @@ -53,12 +51,10 @@ public class MultiValueFixedByteRawIndexCreator implements ForwardIndexCreator { * @param totalDocs Total number of documents to index * @param valueType Type of the values */ - public MultiValueFixedByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType, - String column, - int totalDocs, DataType valueType, final int maxLengthOfEachEntry, - final int maxNumberOfMultiValueElements) + public MultiValueFixedByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType, String column, + int totalDocs, DataType valueType, int maxNumberOfMultiValueElements) throws IOException { - this(baseIndexDir, compressionType, column, totalDocs, valueType, maxLengthOfEachEntry, + this(baseIndexDir, compressionType, column, totalDocs, valueType, maxNumberOfMultiValueElements, false, BaseChunkSVForwardIndexWriter.DEFAULT_VERSION); } @@ -71,33 +67,23 @@ public class MultiValueFixedByteRawIndexCreator implements ForwardIndexCreator { * @param column Name of column to index * @param totalDocs Total number of documents to index * @param valueType Type of the values - * @param maxLengthOfEachEntry length of longest entry (in bytes) * @param deriveNumDocsPerChunk true if writer should auto-derive the number of rows per chunk * @param writerVersion writer format version */ public MultiValueFixedByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType, - String column, int totalDocs, DataType valueType, final int maxLengthOfEachEntry, - final int maxNumberOfMultiValueElements, boolean deriveNumDocsPerChunk, - int writerVersion) + String column, int totalDocs, DataType valueType, int maxNumberOfMultiValueElements, + boolean deriveNumDocsPerChunk, int writerVersion) throws IOException { - File file = new File(baseIndexDir, - column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION); - FileUtils.deleteQuietly(file); - int totalMaxLength = maxNumberOfMultiValueElements * maxLengthOfEachEntry; + File file = new File(baseIndexDir, column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION); + int totalMaxLength = maxNumberOfMultiValueElements * valueType.getStoredType().size(); int numDocsPerChunk = - deriveNumDocsPerChunk ? getNumDocsPerChunk(totalMaxLength) : DEFAULT_NUM_DOCS_PER_CHUNK; + deriveNumDocsPerChunk ? Math.max(TARGET_MAX_CHUNK_SIZE / (totalMaxLength + + VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE), 1) : DEFAULT_NUM_DOCS_PER_CHUNK; _indexWriter = new VarByteChunkSVForwardIndexWriter(file, compressionType, totalDocs, numDocsPerChunk, totalMaxLength, writerVersion); _valueType = valueType; } - @VisibleForTesting - public static int getNumDocsPerChunk(int lengthOfLongestEntry) { - int overheadPerEntry = - lengthOfLongestEntry + VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE; - return Math.max(TARGET_MAX_CHUNK_SIZE / overheadPerEntry, 1); - } - @Override public boolean isDictionaryEncoded() { return false; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java index 5d5b3cf..abfc436 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java @@ -49,13 +49,13 @@ public class MultiValueVarByteRawIndexCreator implements ForwardIndexCreator { * @param totalDocs Total number of documents to index * @param valueType Type of the values * @param maxRowLengthInBytes the length in bytes of the largest row + * @param maxNumberOfElements the maximum number of elements in a row */ - public MultiValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType, - String column, - int totalDocs, DataType valueType, int maxRowLengthInBytes) + public MultiValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType, String column, + int totalDocs, DataType valueType, int maxRowLengthInBytes, int maxNumberOfElements) throws IOException { this(baseIndexDir, compressionType, column, totalDocs, valueType, - BaseChunkSVForwardIndexWriter.DEFAULT_VERSION, maxRowLengthInBytes); + BaseChunkSVForwardIndexWriter.DEFAULT_VERSION, maxRowLengthInBytes, maxNumberOfElements); } /** @@ -67,28 +67,25 @@ public class MultiValueVarByteRawIndexCreator implements ForwardIndexCreator { * @param totalDocs Total number of documents to index * @param valueType Type of the values * @param maxRowLengthInBytes the size in bytes of the largest row, the chunk size cannot be smaller than this + * @param maxNumberOfElements the maximum number of elements in a row * @param writerVersion writer format version */ - public MultiValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType, - String column, int totalDocs, DataType valueType, int writerVersion, int maxRowLengthInBytes) + public MultiValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType, String column, + int totalDocs, DataType valueType, int writerVersion, int maxRowLengthInBytes, int maxNumberOfElements) throws IOException { //we will prepend the actual content with numElements and length array containing length of each element - int totalMaxLength = Integer.BYTES + Math.max(maxRowLengthInBytes, TARGET_MAX_CHUNK_SIZE); + int maxLengthPrefixes = Integer.BYTES * maxNumberOfElements; + int totalMaxLength = Integer.BYTES + maxRowLengthInBytes + maxLengthPrefixes; File file = new File(baseIndexDir, column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION); - int numDocsPerChunk = getNumDocsPerChunk(totalMaxLength); - _indexWriter = new VarByteChunkSVForwardIndexWriter(file, compressionType, totalDocs, - numDocsPerChunk, totalMaxLength, - writerVersion); + int numDocsPerChunk = Math.max( + TARGET_MAX_CHUNK_SIZE / (totalMaxLength + VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE), + 1); + _indexWriter = new VarByteChunkSVForwardIndexWriter(file, compressionType, totalDocs, numDocsPerChunk, + totalMaxLength, writerVersion); _valueType = valueType; } - private static int getNumDocsPerChunk(int lengthOfLongestEntry) { - int overheadPerEntry = - lengthOfLongestEntry + VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE; - return Math.max(TARGET_MAX_CHUNK_SIZE / overheadPerEntry, 1); - } - @Override public boolean isDictionaryEncoded() { return false; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java index 9ab1e7d..5d8657b 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java @@ -45,7 +45,9 @@ import org.apache.pinot.segment.local.segment.index.readers.StringDictionary; import org.apache.pinot.segment.local.segment.index.readers.bloom.BloomFilterReaderFactory; import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitMVForwardIndexReader; import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitSVForwardIndexReaderV2; +import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkMVForwardIndexReader; import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader; +import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader; import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader; import org.apache.pinot.segment.local.segment.index.readers.geospatial.ImmutableH3IndexReader; import org.apache.pinot.segment.local.segment.index.readers.json.ImmutableJsonIndexReader; @@ -194,7 +196,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer } } else { // Raw index - _forwardIndex = loadRawForwardIndex(fwdIndexBuffer, metadata.getDataType()); + _forwardIndex = loadRawForwardIndex(fwdIndexBuffer, metadata.getDataType(), metadata.isSingleValue()); _dictionary = null; _rangeIndex = null; _invertedIndex = null; @@ -294,17 +296,20 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer } } - private static ForwardIndexReader<?> loadRawForwardIndex(PinotDataBuffer forwardIndexBuffer, DataType dataType) { + private static ForwardIndexReader<?> loadRawForwardIndex(PinotDataBuffer forwardIndexBuffer, DataType dataType, + boolean isSingleValue) { DataType storedType = dataType.getStoredType(); switch (storedType) { case INT: case LONG: case FLOAT: case DOUBLE: - return new FixedByteChunkSVForwardIndexReader(forwardIndexBuffer, storedType); + return isSingleValue ? new FixedByteChunkSVForwardIndexReader(forwardIndexBuffer, storedType) + : new FixedByteChunkMVForwardIndexReader(forwardIndexBuffer, storedType); case STRING: case BYTES: - return new VarByteChunkSVForwardIndexReader(forwardIndexBuffer, storedType); + return isSingleValue ? new VarByteChunkSVForwardIndexReader(forwardIndexBuffer, storedType) + : new VarByteChunkMVForwardIndexReader(forwardIndexBuffer, storedType); default: throw new IllegalStateException("Illegal data type for raw forward index: " + dataType); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkMVForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkMVForwardIndexReader.java new file mode 100644 index 0000000..b2e745d1 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkMVForwardIndexReader.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.index.readers.forward; + +import java.nio.ByteBuffer; +import javax.annotation.Nullable; +import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; +import org.apache.pinot.spi.data.FieldSpec.DataType; + + +/** + * Chunk-based multi-value raw (non-dictionary-encoded) forward index reader for values of + * fixed length data type (INT, LONG, FLOAT, DOUBLE). + * <p>For data layout, please refer to the documentation for {@link VarByteChunkSVForwardIndexWriter} + */ +public final class FixedByteChunkMVForwardIndexReader extends BaseChunkSVForwardIndexReader { + + private static final int ROW_OFFSET_SIZE = VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE; + + private final int _maxChunkSize; + + public FixedByteChunkMVForwardIndexReader(PinotDataBuffer dataBuffer, DataType valueType) { + super(dataBuffer, valueType); + _maxChunkSize = _numDocsPerChunk * (ROW_OFFSET_SIZE + _lengthOfLongestEntry); + } + + @Nullable + @Override + public ChunkReaderContext createContext() { + if (_isCompressed) { + return new ChunkReaderContext(_maxChunkSize); + } else { + return null; + } + } + + @Override + public int getIntMV(int docId, int[] valueBuffer, ChunkReaderContext context) { + ByteBuffer byteBuffer = slice(docId, context); + int numValues = byteBuffer.getInt(); + for (int i = 0; i < numValues; i++) { + valueBuffer[i] = byteBuffer.getInt(); + } + return numValues; + } + + @Override + public int getLongMV(int docId, long[] valueBuffer, ChunkReaderContext context) { + ByteBuffer byteBuffer = slice(docId, context); + int numValues = byteBuffer.getInt(); + for (int i = 0; i < numValues; i++) { + valueBuffer[i] = byteBuffer.getLong(); + } + return numValues; + } + + @Override + public int getFloatMV(int docId, float[] valueBuffer, ChunkReaderContext context) { + ByteBuffer byteBuffer = slice(docId, context); + int numValues = byteBuffer.getInt(); + for (int i = 0; i < numValues; i++) { + valueBuffer[i] = byteBuffer.getFloat(); + } + return numValues; + } + + @Override + public int getDoubleMV(int docId, double[] valueBuffer, ChunkReaderContext context) { + ByteBuffer byteBuffer = slice(docId, context); + int numValues = byteBuffer.getInt(); + for (int i = 0; i < numValues; i++) { + valueBuffer[i] = byteBuffer.getDouble(); + } + return numValues; + } + + private ByteBuffer slice(int docId, ChunkReaderContext context) { + if (_isCompressed) { + return sliceBytesCompressed(docId, context); + } else { + return sliceBytesUncompressed(docId); + } + } + + /** + * Helper method to read BYTES value from the compressed index. + */ + private ByteBuffer sliceBytesCompressed(int docId, ChunkReaderContext context) { + int chunkRowId = docId % _numDocsPerChunk; + ByteBuffer chunkBuffer = getChunkBuffer(docId, context); + + // These offsets are offset in the chunk buffer + int valueStartOffset = chunkBuffer.getInt(chunkRowId * ROW_OFFSET_SIZE); + int valueEndOffset = getValueEndOffset(chunkRowId, chunkBuffer); + // cast only for JDK8 compilation profile + return (ByteBuffer) chunkBuffer.duplicate().position(valueStartOffset).limit(valueEndOffset); + } + + /** + * Helper method to read BYTES value from the uncompressed index. + */ + private ByteBuffer sliceBytesUncompressed(int docId) { + int chunkId = docId / _numDocsPerChunk; + int chunkRowId = docId % _numDocsPerChunk; + + // These offsets are offset in the data buffer + long chunkStartOffset = getChunkPosition(chunkId); + long valueStartOffset = + chunkStartOffset + _dataBuffer.getInt(chunkStartOffset + (long) chunkRowId * ROW_OFFSET_SIZE); + long valueEndOffset = getValueEndOffset(chunkId, chunkRowId, chunkStartOffset); + return _dataBuffer.toDirectByteBuffer(valueStartOffset, (int) (valueEndOffset - valueStartOffset)); + } + + /** + * Helper method to compute the end offset of the value in the chunk buffer. + */ + private int getValueEndOffset(int rowId, ByteBuffer chunkBuffer) { + if (rowId == _numDocsPerChunk - 1) { + // Last row in the chunk + return chunkBuffer.limit(); + } else { + int valueEndOffset = chunkBuffer.getInt((rowId + 1) * ROW_OFFSET_SIZE); + if (valueEndOffset == 0) { + // Last row in the last chunk (chunk is incomplete, which stores 0 as the offset for the absent rows) + return chunkBuffer.limit(); + } else { + return valueEndOffset; + } + } + } + + /** + * Helper method to compute the end offset of the value in the data buffer. + */ + private long getValueEndOffset(int chunkId, int chunkRowId, long chunkStartOffset) { + if (chunkId == _numChunks - 1) { + // Last chunk + if (chunkRowId == _numDocsPerChunk - 1) { + // Last row in the last chunk + return _dataBuffer.size(); + } else { + int valueEndOffsetInChunk = _dataBuffer + .getInt(chunkStartOffset + (long) (chunkRowId + 1) * ROW_OFFSET_SIZE); + if (valueEndOffsetInChunk == 0) { + // Last row in the last chunk (chunk is incomplete, which stores 0 as the offset for the absent rows) + return _dataBuffer.size(); + } else { + return chunkStartOffset + valueEndOffsetInChunk; + } + } + } else { + if (chunkRowId == _numDocsPerChunk - 1) { + // Last row in the chunk + return getChunkPosition(chunkId + 1); + } else { + return chunkStartOffset + _dataBuffer + .getInt(chunkStartOffset + (long) (chunkRowId + 1) * ROW_OFFSET_SIZE); + } + } + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java new file mode 100644 index 0000000..99b4c3f --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.index.creator; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteOrder; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.function.IntFunction; +import java.util.function.ToIntFunction; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueFixedByteRawIndexCreator; +import org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader.ChunkReaderContext; +import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkMVForwardIndexReader; +import org.apache.pinot.segment.spi.V1Constants.Indexes; +import org.apache.pinot.segment.spi.compression.ChunkCompressionType; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class MultiValueFixedByteRawIndexCreatorTest { + + private static final String OUTPUT_DIR = + System.getProperty("java.io.tmpdir") + File.separator + "mvFixedRawTest"; + + private static final Random RANDOM = new Random(); + + @BeforeClass + public void setup() throws Exception { + FileUtils.forceMkdir(new File(OUTPUT_DIR)); + } + + /** + * Clean up after test + */ + @AfterClass + public void cleanup() { + FileUtils.deleteQuietly(new File(OUTPUT_DIR)); + } + + @Test + public void testMVInt() throws IOException { + testMV(DataType.INT, ints(), x -> x.length, int[]::new, MultiValueFixedByteRawIndexCreator::putIntMV, + (reader, context, docId, buffer) -> { + int length = reader.getIntMV(docId, buffer, context); + return Arrays.copyOf(buffer, length); + }); + } + + @Test + public void testMVLong() throws IOException { + testMV(DataType.LONG, longs(), x -> x.length, long[]::new, MultiValueFixedByteRawIndexCreator::putLongMV, + (reader, context, docId, buffer) -> { + int length = reader.getLongMV(docId, buffer, context); + return Arrays.copyOf(buffer, length); + }); + } + + @Test + public void testMVFloat() throws IOException { + testMV(DataType.FLOAT, floats(), x -> x.length, float[]::new, MultiValueFixedByteRawIndexCreator::putFloatMV, + (reader, context, docId, buffer) -> { + int length = reader.getFloatMV(docId, buffer, context); + return Arrays.copyOf(buffer, length); + }); + } + + @Test + public void testMVDouble() throws IOException { + testMV(DataType.DOUBLE, doubles(), x -> x.length, double[]::new, MultiValueFixedByteRawIndexCreator::putDoubleMV, + (reader, context, docId, buffer) -> { + int length = reader.getDoubleMV(docId, buffer, context); + return Arrays.copyOf(buffer, length); + }); + } + + + public <T> void testMV(DataType dataType, List<T> inputs, ToIntFunction<T> sizeof, IntFunction<T> constructor, + Injector<T> injector, Extractor<T> extractor) + throws IOException { + String column = "testCol_" + dataType; + int numDocs = inputs.size(); + int maxElements = inputs.stream().mapToInt(sizeof).max().orElseThrow(RuntimeException::new); + File file = new File(OUTPUT_DIR, column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION); + file.delete(); + MultiValueFixedByteRawIndexCreator creator = new MultiValueFixedByteRawIndexCreator(new File(OUTPUT_DIR), + ChunkCompressionType.SNAPPY, column, numDocs, dataType, maxElements); + inputs.forEach(input -> injector.inject(creator, input)); + creator.close(); + + //read + final PinotDataBuffer buffer = PinotDataBuffer + .mapFile(file, true, 0, file.length(), ByteOrder.BIG_ENDIAN, ""); + FixedByteChunkMVForwardIndexReader reader = new FixedByteChunkMVForwardIndexReader(buffer, DataType.BYTES); + final ChunkReaderContext context = reader.createContext(); + T valueBuffer = constructor.apply(maxElements); + for (int i = 0; i < numDocs; i++) { + Assert.assertEquals(inputs.get(i), extractor.extract(reader, context, i, valueBuffer)); + } + } + + interface Extractor<T> { + T extract(FixedByteChunkMVForwardIndexReader reader, ChunkReaderContext context, int offset, T buffer); + } + + interface Injector<T> { + void inject(MultiValueFixedByteRawIndexCreator creator, T input); + } + + private static List<int[]> ints() { + return IntStream.range(0, 1000) + .mapToObj(i -> new int[RANDOM.nextInt(50)]) + .peek(array -> { + for (int i = 0; i < array.length; i++) { + array[i] = RANDOM.nextInt(); + } + }) + .collect(Collectors.toList()); + } + + private static List<long[]> longs() { + return IntStream.range(0, 1000) + .mapToObj(i -> new long[RANDOM.nextInt(50)]) + .peek(array -> { + for (int i = 0; i < array.length; i++) { + array[i] = RANDOM.nextLong(); + } + }) + .collect(Collectors.toList()); + } + + private static List<float[]> floats() { + return IntStream.range(0, 1000) + .mapToObj(i -> new float[RANDOM.nextInt(50)]) + .peek(array -> { + for (int i = 0; i < array.length; i++) { + array[i] = RANDOM.nextFloat(); + } + }) + .collect(Collectors.toList()); + } + + private static List<double[]> doubles() { + return IntStream.range(0, 1000) + .mapToObj(i -> new double[RANDOM.nextInt(50)]) + .peek(array -> { + for (int i = 0; i < array.length; i++) { + array[i] = RANDOM.nextDouble(); + } + }) + .collect(Collectors.toList()); + } + +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java index a1f6e2c..c496e91 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java @@ -64,8 +64,8 @@ public class MultiValueVarByteRawIndexCreatorTest { int maxTotalLength = 500; File file = new File(OUTPUT_DIR, column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION); file.delete(); - MultiValueVarByteRawIndexCreator creator = new MultiValueVarByteRawIndexCreator( - new File(OUTPUT_DIR), ChunkCompressionType.SNAPPY, column, numDocs, DataType.STRING, maxTotalLength); + MultiValueVarByteRawIndexCreator creator = new MultiValueVarByteRawIndexCreator(new File(OUTPUT_DIR), + ChunkCompressionType.SNAPPY, column, numDocs, DataType.STRING, maxTotalLength, maxElements); List<String[]> inputs = new ArrayList<>(); Random random = new Random(); for (int i = 0; i < numDocs; i++) { @@ -106,7 +106,7 @@ public class MultiValueVarByteRawIndexCreatorTest { file.delete(); MultiValueVarByteRawIndexCreator creator = new MultiValueVarByteRawIndexCreator( new File(OUTPUT_DIR), ChunkCompressionType.SNAPPY, column, numDocs, DataType.BYTES, - maxTotalLength); + maxTotalLength, maxElements); List<byte[][]> inputs = new ArrayList<>(); Random random = new Random(); for (int i = 0; i < numDocs; i++) { diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java index 6393aaf..941522c 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java @@ -257,8 +257,4 @@ public interface ForwardIndexReader<T extends ForwardIndexReaderContext> extends throw new UnsupportedOperationException(); } - default int getFloatMV(int docId, float[] valueBuffer, T context, int[] parentIndices) { - throw new UnsupportedOperationException(); - } - } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org