This is an automated email from the ASF dual-hosted git repository. kishoreg pushed a commit to branch mv-fwd-index in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 10b8d0ac8c8a4e911c5355335ed1e182d0c38543 Author: kishoreg <g.kish...@gmail.com> AuthorDate: Sun Oct 17 00:19:04 2021 -0700 Initial code for MultiValue forward Index --- .../fwd/MultiValueFixedByteRawIndexCreator.java | 179 +++++++++++++++++ .../impl/fwd/MultiValueVarByteRawIndexCreator.java | 214 +++++++++++++++++++++ .../stats/AbstractColumnStatisticsCollector.java | 5 + .../forward/VarByteChunkMVForwardIndexReader.java | 197 +++++++++++++++++++ .../MultiValueVarByteRawIndexCreatorTest.java | 81 ++++++++ .../org/apache/pinot/segment/spi/V1Constants.java | 1 + .../spi/index/creator/ForwardIndexCreator.java | 9 + .../spi/index/reader/ForwardIndexReader.java | 19 ++ 8 files changed, 705 insertions(+) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java new file mode 100644 index 0000000..d608a65 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.creator.impl.fwd; + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter; +import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter; +import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.compression.ChunkCompressionType; +import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator; +import org.apache.pinot.spi.data.FieldSpec.DataType; + +/** + * Forward index creator for raw (non-dictionary-encoded) single-value column of variable length + * data type (STRING, + * BYTES). + */ +public class MultiValueFixedByteRawIndexCreator implements ForwardIndexCreator { + + private static final int DEFAULT_NUM_DOCS_PER_CHUNK = 1000; + private static final int TARGET_MAX_CHUNK_SIZE = 1024 * 1024; + + private final VarByteChunkSVForwardIndexWriter _indexWriter; + private final DataType _valueType; + + /** + * Create a var-byte raw index creator for the given column + * + * @param baseIndexDir Index directory + * @param compressionType Type of compression to use + * @param column Name of column to index + * @param totalDocs Total number of documents to index + * @param valueType Type of the values + * @param maxLength length of longest entry (in bytes) + */ + public MultiValueFixedByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType, + String column, + int totalDocs, DataType valueType, int maxLength) + throws IOException { + this(baseIndexDir, compressionType, column, totalDocs, valueType, maxLength, false, + BaseChunkSVForwardIndexWriter.DEFAULT_VERSION); + } + + /** + * Create a var-byte raw index creator for the given column + * + * @param baseIndexDir Index directory + * @param compressionType Type of compression to use + * @param column Name of column to index + * @param totalDocs Total number of documents to index + * @param valueType Type of the values + * @param maxLength length of longest entry (in bytes) + * @param deriveNumDocsPerChunk true if writer should auto-derive the number of rows per chunk + * @param writerVersion writer format version + */ + public MultiValueFixedByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType, + String column, + int totalDocs, DataType valueType, int maxLength, boolean deriveNumDocsPerChunk, + int writerVersion) + throws IOException { + File file = new File(baseIndexDir, + column + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION); + FileUtils.deleteQuietly(file); + int numDocsPerChunk = + deriveNumDocsPerChunk ? getNumDocsPerChunk(maxLength) : DEFAULT_NUM_DOCS_PER_CHUNK; + _indexWriter = new VarByteChunkSVForwardIndexWriter(file, compressionType, totalDocs, + numDocsPerChunk, maxLength, + writerVersion); + _valueType = valueType; + } + + @VisibleForTesting + public static int getNumDocsPerChunk(int lengthOfLongestEntry) { + int overheadPerEntry = + lengthOfLongestEntry + VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE; + return Math.max(TARGET_MAX_CHUNK_SIZE / overheadPerEntry, 1); + } + + @Override + public boolean isDictionaryEncoded() { + return false; + } + + @Override + public boolean isSingleValue() { + return false; + } + + @Override + public DataType getValueType() { + return _valueType; + } + + @Override + public void putIntMV(final int[] values) { + + byte[] bytes = new byte[Integer.BYTES + + values.length * Integer.BYTES];//numValues, bytes required to store the content + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + //write the length + byteBuffer.putInt(values.length); + //write the content of each element + for (final int value : values) { + byteBuffer.putInt(value); + } + _indexWriter.putBytes(bytes); + } + + @Override + public void putLongMV(final long[] values) { + + byte[] bytes = new byte[Integer.BYTES + + values.length * Long.BYTES];//numValues, bytes required to store the content + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + //write the length + byteBuffer.putInt(values.length); + //write the content of each element + for (final long value : values) { + byteBuffer.putLong(value); + } + _indexWriter.putBytes(bytes); + } + + @Override + public void putFloatMV(final float[] values) { + + byte[] bytes = new byte[Integer.BYTES + + values.length * Float.BYTES];//numValues, bytes required to store the content + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + //write the length + byteBuffer.putInt(values.length); + //write the content of each element + for (final float value : values) { + byteBuffer.putFloat(value); + } + _indexWriter.putBytes(bytes); + } + + @Override + public void putDoubleMV(final double[] values) { + + byte[] bytes = new byte[Integer.BYTES + + values.length * Long.BYTES];//numValues, bytes required to store the content + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + //write the length + byteBuffer.putInt(values.length); + //write the content of each element + for (final double value : values) { + byteBuffer.putDouble(value); + } + _indexWriter.putBytes(bytes); + } + + @Override + public void close() + throws IOException { + _indexWriter.close(); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java new file mode 100644 index 0000000..465b5f7 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.creator.impl.fwd; + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; +import java.util.Random; +import org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter; +import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter; +import org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader.ChunkReaderContext; +import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader; +import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader; +import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.V1Constants.Indexes; +import org.apache.pinot.segment.spi.compression.ChunkCompressionType; +import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; +import org.apache.pinot.spi.data.FieldSpec.DataType; + +/** + * Forward index creator for raw (non-dictionary-encoded) single-value column of variable length + * data type (STRING, + * BYTES). + */ +public class MultiValueVarByteRawIndexCreator implements ForwardIndexCreator { + + private static final int DEFAULT_NUM_DOCS_PER_CHUNK = 1000; + private static final int TARGET_MAX_CHUNK_SIZE = 1024 * 1024; + + private final VarByteChunkSVForwardIndexWriter _indexWriter; + private final DataType _valueType; + + /** + * Create a var-byte raw index creator for the given column + * + * @param baseIndexDir Index directory + * @param compressionType Type of compression to use + * @param column Name of column to index + * @param totalDocs Total number of documents to index + * @param valueType Type of the values + * @param maxTotalContentLength max total content length + * @param maxElements max number of elements + */ + public MultiValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType, + String column, + int totalDocs, DataType valueType, int maxTotalContentLength, int maxElements) + throws IOException { + this(baseIndexDir, compressionType, column, totalDocs, valueType, false, maxTotalContentLength, + maxElements, + BaseChunkSVForwardIndexWriter.DEFAULT_VERSION); + } + + /** + * Create a var-byte raw index creator for the given column + * + * @param baseIndexDir Index directory + * @param compressionType Type of compression to use + * @param column Name of column to index + * @param totalDocs Total number of documents to index + * @param valueType Type of the values + * @param deriveNumDocsPerChunk true if writer should auto-derive the number of rows per chunk + * @param maxTotalContentLength max total content length + * @param maxElements max number of elements + * @param writerVersion writer format version + */ + public MultiValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType, + String column, + int totalDocs, DataType valueType, boolean deriveNumDocsPerChunk, int maxTotalContentLength, + int maxElements, + int writerVersion) + throws IOException { + //we will prepend the actual content with numElements and length array containing length of each element + int maxLength = Integer.BYTES + maxElements * Integer.BYTES + maxTotalContentLength; + File file = new File(baseIndexDir, + column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION); + int numDocsPerChunk = + deriveNumDocsPerChunk ? getNumDocsPerChunk(maxLength) : DEFAULT_NUM_DOCS_PER_CHUNK; + _indexWriter = new VarByteChunkSVForwardIndexWriter(file, compressionType, totalDocs, + numDocsPerChunk, maxLength, + writerVersion); + _valueType = valueType; + } + + @VisibleForTesting + public static int getNumDocsPerChunk(int lengthOfLongestEntry) { + int overheadPerEntry = + lengthOfLongestEntry + VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE; + return Math.max(TARGET_MAX_CHUNK_SIZE / overheadPerEntry, 1); + } + + @Override + public boolean isDictionaryEncoded() { + return false; + } + + @Override + public boolean isSingleValue() { + return false; + } + + @Override + public DataType getValueType() { + return _valueType; + } + + @Override + public void putStringMV(final String[] values) { + int totalBytes = 0; + for (int i = 0; i < values.length; i++) { + final String value = values[i]; + int length = value.getBytes().length; + totalBytes += length; + } + byte[] bytes = new byte[Integer.BYTES + Integer.BYTES * values.length + + totalBytes];//numValues, length array, concatenated bytes + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + //write the length + byteBuffer.putInt(values.length); + //write the length of each element + for (final String value : values) { + byteBuffer.putInt(value.getBytes().length); + } + //write the content of each element + //todo:maybe there is a smart way to avoid 3 loops but at the cost of allocating more memory upfront and resize as needed + for (final String value : values) { + byteBuffer.put(value.getBytes()); + } +// System.out.println("Inserting bytes of length:" + bytes.length); + _indexWriter.putBytes(bytes); + } + + @Override + public void putBytesMV(final byte[][] values) { + int totalBytes = 0; + for (int i = 0; i < values.length; i++) { + int length = values[i].length; + totalBytes += length; + } + byte[] bytes = new byte[Integer.BYTES + Integer.BYTES * values.length + + totalBytes];//numValues, length array, concatenated bytes + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + //write the length + byteBuffer.putInt(values.length); + //write the length of each element + for (final byte[] value : values) { + byteBuffer.putInt(value.length); + } + //write the content of each element + //todo:maybe there is a smart way to avoid 3 loops but at the cost of allocating more memory upfront and resize as needed + for (final byte[] value : values) { + byteBuffer.put(value); + } +// System.out.println("Inserting bytes of length:" + bytes.length); + _indexWriter.putBytes(bytes); + } + + @Override + public void close() + throws IOException { + _indexWriter.close(); + } + + private static void testSV() throws IOException { + final File dir = new File(System.getProperty("java.io.tmpdir")); + + String column = "testCol"; + int numDocs = 10000; + int maxLength = 100; + File file = new File(dir, column + Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION); + file.delete(); + SingleValueVarByteRawIndexCreator creator = new SingleValueVarByteRawIndexCreator(dir, + ChunkCompressionType.SNAPPY, column, numDocs, DataType.STRING, maxLength, true, + BaseChunkSVForwardIndexWriter.DEFAULT_VERSION); + Random random = new Random(); + for (int i = 0; i < numDocs; i++) { + int length = random.nextInt(maxLength); + char[] value = new char[length]; + Arrays.fill(value, 'a'); + creator.putString(new String(value)); + } + creator.close(); + + //read + final PinotDataBuffer buffer = PinotDataBuffer + .mapFile(file, true, 0, file.length(), ByteOrder.BIG_ENDIAN, ""); + VarByteChunkSVForwardIndexReader reader = new VarByteChunkSVForwardIndexReader(buffer, + DataType.STRING); + final ChunkReaderContext context = reader.createContext(); + for (int i = 0; i < numDocs; i++) { + String value = reader.getString(i, context); + System.out.println("value = " + value); + } + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java index 284bf69..6407b55 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java @@ -47,6 +47,7 @@ public abstract class AbstractColumnStatisticsCollector implements ColumnStatist protected int _totalNumberOfEntries = 0; protected int _maxNumberOfMultiValues = 0; + protected int _maxLengthOfMultiValues = 0; private PartitionFunction _partitionFunction; private final int _numPartitions; private final Set<Integer> _partitions; @@ -72,6 +73,10 @@ public abstract class AbstractColumnStatisticsCollector implements ColumnStatist return _maxNumberOfMultiValues; } + public int getMaxLengthOfMultiValues() { + return _maxLengthOfMultiValues; + } + void addressSorted(Object entry) { if (_isSorted) { if (_previousValue != null) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java new file mode 100644 index 0000000..eef396c --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.index.readers.forward; + +import java.nio.ByteBuffer; +import javax.annotation.Nullable; +import org.apache.pinot.common.utils.StringUtil; +import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; +import org.apache.pinot.spi.data.FieldSpec.DataType; + +/** + * Chunk-based single-value raw (non-dictionary-encoded) forward index reader for values of + * variable + * length data type + * (STRING, BYTES). + * <p>For data layout, please refer to the documentation for {@link VarByteChunkSVForwardIndexWriter} + */ +public final class VarByteChunkMVForwardIndexReader extends BaseChunkSVForwardIndexReader { + + private static final int ROW_OFFSET_SIZE = VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE; + + private final int _maxChunkSize; + + // Thread local (reusable) byte[] to read bytes from data file. + private final ThreadLocal<byte[]> _reusableBytes = ThreadLocal + .withInitial(() -> new byte[_lengthOfLongestEntry]); + + public VarByteChunkMVForwardIndexReader(PinotDataBuffer dataBuffer, DataType valueType) { + super(dataBuffer, valueType); + _maxChunkSize = _numDocsPerChunk * (ROW_OFFSET_SIZE + _lengthOfLongestEntry); + } + + @Nullable + @Override + public ChunkReaderContext createContext() { + if (_isCompressed) { + return new ChunkReaderContext(_maxChunkSize); + } else { + return null; + } + } + + @Override + public int getStringMV(final int docId, final String[] valueBuffer, + final ChunkReaderContext context) { + byte[] compressedBytes; + if (_isCompressed) { + compressedBytes = getBytesCompressed(docId, context); + } else { + compressedBytes = getBytesUncompressed(docId); + } + ByteBuffer byteBuffer = ByteBuffer.wrap(compressedBytes); + int numValues = byteBuffer.getInt(); + int contentOffset = (numValues + 1) * Integer.BYTES; + for (int i = 0; i < numValues; i++) { + int length = byteBuffer.getInt((i + 1) * Integer.BYTES); + byte[] bytes = new byte[length]; + byteBuffer.position(contentOffset); + byteBuffer.get(bytes, 0, length); + valueBuffer[i] = new String(bytes); + contentOffset += length; + } + return numValues; + } + + @Override + public int getBytesMV(final int docId, final byte[][] valueBuffer, + final ChunkReaderContext context) { + byte[] compressedBytes; + if (_isCompressed) { + compressedBytes = getBytesCompressed(docId, context); + } else { + compressedBytes = getBytesUncompressed(docId); + } + ByteBuffer byteBuffer = ByteBuffer.wrap(compressedBytes); + int numValues = byteBuffer.getInt(); + int contentOffset = (numValues + 1) * Integer.BYTES; + for (int i = 0; i < numValues; i++) { + int length = byteBuffer.getInt((i + 1) * Integer.BYTES); + byte[] bytes = new byte[length]; + byteBuffer.position(contentOffset); + byteBuffer.get(bytes, 0, length); + valueBuffer[i] = bytes; + contentOffset += length; + } + return numValues; + } + + @Override + public byte[] getBytes(int docId, ChunkReaderContext context) { + if (_isCompressed) { + return getBytesCompressed(docId, context); + } else { + return getBytesUncompressed(docId); + } + } + + /** + * Helper method to read BYTES value from the compressed index. + */ + private byte[] getBytesCompressed(int docId, ChunkReaderContext context) { + int chunkRowId = docId % _numDocsPerChunk; + ByteBuffer chunkBuffer = getChunkBuffer(docId, context); + + // These offsets are offset in the chunk buffer + int valueStartOffset = chunkBuffer.getInt(chunkRowId * ROW_OFFSET_SIZE); + int valueEndOffset = getValueEndOffset(chunkRowId, chunkBuffer); + + byte[] bytes = new byte[valueEndOffset - valueStartOffset]; + chunkBuffer.position(valueStartOffset); + chunkBuffer.get(bytes); + return bytes; + } + + /** + * Helper method to read BYTES value from the uncompressed index. + */ + private byte[] getBytesUncompressed(int docId) { + int chunkId = docId / _numDocsPerChunk; + int chunkRowId = docId % _numDocsPerChunk; + + // These offsets are offset in the data buffer + long chunkStartOffset = getChunkPosition(chunkId); + long valueStartOffset = + chunkStartOffset + _dataBuffer.getInt(chunkStartOffset + chunkRowId * ROW_OFFSET_SIZE); + long valueEndOffset = getValueEndOffset(chunkId, chunkRowId, chunkStartOffset); + + byte[] bytes = new byte[(int) (valueEndOffset - valueStartOffset)]; + _dataBuffer.copyTo(valueStartOffset, bytes); + return bytes; + } + + /** + * Helper method to compute the end offset of the value in the chunk buffer. + */ + private int getValueEndOffset(int rowId, ByteBuffer chunkBuffer) { + if (rowId == _numDocsPerChunk - 1) { + // Last row in the chunk + return chunkBuffer.limit(); + } else { + int valueEndOffset = chunkBuffer.getInt((rowId + 1) * ROW_OFFSET_SIZE); + if (valueEndOffset == 0) { + // Last row in the last chunk (chunk is incomplete, which stores 0 as the offset for the absent rows) + return chunkBuffer.limit(); + } else { + return valueEndOffset; + } + } + } + + /** + * Helper method to compute the end offset of the value in the data buffer. + */ + private long getValueEndOffset(int chunkId, int chunkRowId, long chunkStartOffset) { + if (chunkId == _numChunks - 1) { + // Last chunk + if (chunkRowId == _numDocsPerChunk - 1) { + // Last row in the last chunk + return _dataBuffer.size(); + } else { + int valueEndOffsetInChunk = _dataBuffer + .getInt(chunkStartOffset + (chunkRowId + 1) * ROW_OFFSET_SIZE); + if (valueEndOffsetInChunk == 0) { + // Last row in the last chunk (chunk is incomplete, which stores 0 as the offset for the absent rows) + return _dataBuffer.size(); + } else { + return chunkStartOffset + valueEndOffsetInChunk; + } + } + } else { + if (chunkRowId == _numDocsPerChunk - 1) { + // Last row in the chunk + return getChunkPosition(chunkId + 1); + } else { + return chunkStartOffset + _dataBuffer + .getInt(chunkStartOffset + (chunkRowId + 1) * ROW_OFFSET_SIZE); + } + } + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java new file mode 100644 index 0000000..373c3a9 --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java @@ -0,0 +1,81 @@ +package org.apache.pinot.segment.local.segment.index.creator; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteOrder; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator; +import org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader.ChunkReaderContext; +import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader; +import org.apache.pinot.segment.spi.V1Constants.Indexes; +import org.apache.pinot.segment.spi.compression.ChunkCompressionType; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class MultiValueVarByteRawIndexCreatorTest { + + private static final String OUTPUT_DIR = + System.getProperty("java.io.tmpdir") + File.separator + "mvVarRawTest"; + + @BeforeClass + public void setup() throws Exception { + FileUtils.forceMkdir(new File(OUTPUT_DIR)); + } + + /** + * Clean up after test + */ + @AfterClass + public void cleanup() { + FileUtils.deleteQuietly(new File(OUTPUT_DIR)); + } + + @Test + public void testMV() throws IOException { + String column = "testCol"; + int numDocs = 1000; + int maxElements = 50; + int maxTotalLength = 500; + File file = new File(OUTPUT_DIR, column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION); + file.delete(); + MultiValueVarByteRawIndexCreator creator = new MultiValueVarByteRawIndexCreator( + new File(OUTPUT_DIR), ChunkCompressionType.SNAPPY, column, numDocs, DataType.STRING, + maxTotalLength, maxElements); + List<String[]> inputs = new ArrayList<>(); + Random random = new Random(); + for (int i = 0; i < numDocs; i++) { + //int length = 1; + int length = random.nextInt(10); + String[] values = new String[length]; + for (int j = 0; j < length; j++) { + char[] value = new char[length]; + Arrays.fill(value, 'a'); + values[j] = new String(value); + } + inputs.add(values); + creator.putStringMV(values); + } + creator.close(); + + //read + final PinotDataBuffer buffer = PinotDataBuffer + .mapFile(file, true, 0, file.length(), ByteOrder.BIG_ENDIAN, ""); + VarByteChunkMVForwardIndexReader reader = new VarByteChunkMVForwardIndexReader(buffer, + DataType.STRING); + final ChunkReaderContext context = reader.createContext(); + String[] values = new String[maxElements]; + for (int i = 0; i < numDocs; i++) { + int length = reader.getStringMV(i, values, context); + String[] readValue = Arrays.copyOf(values, length); + Assert.assertEquals(inputs.get(i), readValue); + } + } +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java index ac7b704..e037544 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java @@ -39,6 +39,7 @@ public class V1Constants { public static final String UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION = ".sv.unsorted.fwd"; public static final String SORTED_SV_FORWARD_INDEX_FILE_EXTENSION = ".sv.sorted.fwd"; public static final String RAW_SV_FORWARD_INDEX_FILE_EXTENSION = ".sv.raw.fwd"; + public static final String RAW_MV_FORWARD_INDEX_FILE_EXTENSION = ".mv.raw.fwd"; public static final String UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION = ".mv.fwd"; public static final String BITMAP_INVERTED_INDEX_FILE_EXTENSION = ".bitmap.inv"; public static final String BITMAP_RANGE_INDEX_FILE_EXTENSION = ".bitmap.range"; diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/ForwardIndexCreator.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/ForwardIndexCreator.java index dee4db1..e5a21e9 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/ForwardIndexCreator.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/ForwardIndexCreator.java @@ -173,4 +173,13 @@ public interface ForwardIndexCreator extends Closeable { default void putStringMV(String[] values) { throw new UnsupportedOperationException(); } + + /** + * Writes the next byte[] type multi-value into the forward index. + * + * @param values Values to write + */ + default void putBytesMV(byte[][] values) { + throw new UnsupportedOperationException(); + } } diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java index fb92bec..6393aaf 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java @@ -242,4 +242,23 @@ public interface ForwardIndexReader<T extends ForwardIndexReaderContext> extends default int getStringMV(int docId, String[] valueBuffer, T context) { throw new UnsupportedOperationException(); } + + /** + * Reads the bytes type multi-value at the given document id into the passed in value buffer (the buffer size must + * be enough to hold all the values for the multi-value entry) and returns the number of values within the multi-value + * entry. + * + * @param docId Document id + * @param valueBuffer Value buffer + * @param context Reader context + * @return Number of values within the multi-value entry + */ + default int getBytesMV(int docId, byte[][] valueBuffer, T context) { + throw new UnsupportedOperationException(); + } + + default int getFloatMV(int docId, float[] valueBuffer, T context, int[] parentIndices) { + throw new UnsupportedOperationException(); + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org