This is an automated email from the ASF dual-hosted git repository. kishoreg pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new aed1307 Add MV raw forward index and MV `BYTES` data type (#7595) aed1307 is described below commit aed13072dac0d8dae29056fac77f0f457be7adba Author: Richard Startin <rich...@startree.ai> AuthorDate: Fri Oct 22 15:50:36 2021 +0100 Add MV raw forward index and MV `BYTES` data type (#7595) * Initial code for MultiValue forward Index * Wiring in the segment creation driver Impl * cleanup * finish off adding BYTES_ARRAY type * use less memory and fewer passes during encoding * reduce memory requirement for forwardindexwriter * track size in bytes of largest row so chunks can be sized to accommodate it * remove TODOs * force derivation of number of docs for raw MV columns * specify character encoding * leave changes to integration tests to MV TEXT index implementation * fix javadoc * don't use StringUtils * fix formatting after rebase * fix javadoc formatting again * use zstd's compress bound Co-authored-by: kishoreg <g.kish...@gmail.com> --- .../org/apache/pinot/common/utils/DataSchema.java | 9 +- .../apache/pinot/common/utils/PinotDataType.java | 37 +++- .../apache/pinot/common/utils/DataSchemaTest.java | 19 +- .../pinot/common/utils/PinotDataTypeTest.java | 6 +- .../pinot/core/minion/RawIndexConverter.java | 2 +- .../local/io/compression/LZ4Compressor.java | 5 + .../io/compression/PassThroughCompressor.java | 5 + .../local/io/compression/SnappyCompressor.java | 5 + .../local/io/compression/ZstandardCompressor.java | 5 + .../writer/impl/BaseChunkSVForwardIndexWriter.java | 60 ++++--- .../impl/FixedByteChunkSVForwardIndexWriter.java | 4 +- .../impl/VarByteChunkSVForwardIndexWriter.java | 77 ++++++-- .../creator/impl/SegmentColumnarIndexCreator.java | 192 +++++++++++++++++--- .../fwd/MultiValueFixedByteRawIndexCreator.java | 181 +++++++++++++++++++ .../impl/fwd/MultiValueVarByteRawIndexCreator.java | 122 +++++++++++++ .../stats/AbstractColumnStatisticsCollector.java | 5 + .../stats/BytesColumnPredIndexStatsCollector.java | 44 +++-- .../stats/StringColumnPreIndexStatsCollector.java | 10 ++ .../forward/VarByteChunkMVForwardIndexReader.java | 193 +++++++++++++++++++++ .../local/segment/store/FilePerIndexDirectory.java | 6 +- .../MultiValueVarByteRawIndexCreatorTest.java | 141 +++++++++++++++ .../segment/index/creator/RawIndexCreatorTest.java | 135 +++++++++++--- .../org/apache/pinot/segment/spi/V1Constants.java | 1 + .../segment/spi/compression/ChunkCompressor.java | 2 + .../spi/creator/ColumnIndexCreationInfo.java | 4 + .../segment/spi/creator/ColumnStatistics.java | 7 + .../spi/index/creator/ForwardIndexCreator.java | 9 + .../spi/index/reader/ForwardIndexReader.java | 19 ++ .../converter/DictionaryToRawIndexConverter.java | 2 +- 29 files changed, 1187 insertions(+), 120 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java index 6b61cfc..37fb392 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java @@ -255,12 +255,13 @@ public class DataSchema { DOUBLE_ARRAY, BOOLEAN_ARRAY /* Stored as INT_ARRAY */, TIMESTAMP_ARRAY /* Stored as LONG_ARRAY */, + BYTES_ARRAY, STRING_ARRAY; private static final EnumSet<ColumnDataType> NUMERIC_TYPES = EnumSet.of(INT, LONG, FLOAT, DOUBLE); private static final EnumSet<ColumnDataType> INTEGRAL_TYPES = EnumSet.of(INT, LONG); private static final EnumSet<ColumnDataType> ARRAY_TYPES = EnumSet.of(INT_ARRAY, LONG_ARRAY, FLOAT_ARRAY, - DOUBLE_ARRAY, STRING_ARRAY, BOOLEAN_ARRAY, TIMESTAMP_ARRAY); + DOUBLE_ARRAY, STRING_ARRAY, BOOLEAN_ARRAY, TIMESTAMP_ARRAY, BYTES_ARRAY); private static final EnumSet<ColumnDataType> NUMERIC_ARRAY_TYPES = EnumSet.of(INT_ARRAY, LONG_ARRAY, FLOAT_ARRAY, DOUBLE_ARRAY); private static final EnumSet<ColumnDataType> INTEGRAL_ARRAY_TYPES = EnumSet.of(INT_ARRAY, LONG_ARRAY); @@ -368,6 +369,8 @@ public class DataSchema { return toBooleanArray(value); case TIMESTAMP_ARRAY: return toTimestampArray(value); + case BYTES_ARRAY: + return (byte[][]) value; default: throw new IllegalStateException(String.format("Cannot convert: '%s' to type: %s", value, this)); } @@ -424,6 +427,8 @@ public class DataSchema { return toBooleanArray(value); case TIMESTAMP_ARRAY: return formatTimestampArray(value); + case BYTES_ARRAY: + return (byte[][]) value; default: throw new IllegalStateException(String.format("Cannot convert and format: '%s' to type: %s", value, this)); } @@ -541,6 +546,8 @@ public class DataSchema { return BOOLEAN_ARRAY; case TIMESTAMP: return TIMESTAMP_ARRAY; + case BYTES: + return BYTES_ARRAY; default: throw new IllegalStateException("Unsupported data type: " + dataType); } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java index f64bf4a..5352820 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java @@ -784,6 +784,13 @@ public enum PinotDataType { } }, + BYTES_ARRAY { + @Override + public byte[][] convert(Object value, PinotDataType sourceType) { + return sourceType.toBytesArray(value); + } + }, + OBJECT_ARRAY; /** @@ -1034,6 +1041,24 @@ public enum PinotDataType { } } + public byte[][] toBytesArray(Object value) { + if (value instanceof byte[][]) { + return (byte[][]) value; + } + if (isSingleValue()) { + return new byte[][]{toBytes(value)}; + } else { + Object[] valueArray = toObjectArray(value); + int length = valueArray.length; + byte[][] bytesArray = new byte[length][]; + PinotDataType singleValueType = getSingleValueType(); + for (int i = 0; i < length; i++) { + bytesArray[i] = singleValueType.toBytes(valueArray[i]); + } + return bytesArray; + } + } + private static Object[] toObjectArray(Object array) { Class<?> componentType = array.getClass().getComponentType(); if (componentType.isPrimitive()) { @@ -1132,6 +1157,8 @@ public enum PinotDataType { return DOUBLE; case STRING_ARRAY: return STRING; + case BYTES_ARRAY: + return BYTES; case OBJECT_ARRAY: return OBJECT; case BOOLEAN_ARRAY: @@ -1205,6 +1232,9 @@ public enum PinotDataType { if (cls == Short.class) { return SHORT_ARRAY; } + if (cls == byte[].class) { + return BYTES_ARRAY; + } if (cls == Boolean.class) { return BOOLEAN_ARRAY; } @@ -1233,7 +1263,6 @@ public enum PinotDataType { /** * Returns the {@link PinotDataType} for the given {@link FieldSpec} for data ingestion purpose. Returns object array * type for multi-valued types. - * TODO: Add MV support for BYTES */ public static PinotDataType getPinotDataTypeForIngestion(FieldSpec fieldSpec) { DataType dataType = fieldSpec.getDataType(); @@ -1259,11 +1288,7 @@ public enum PinotDataType { case STRING: return fieldSpec.isSingleValueField() ? STRING : STRING_ARRAY; case BYTES: - if (fieldSpec.isSingleValueField()) { - return BYTES; - } else { - throw new IllegalStateException("There is no multi-value type for BYTES"); - } + return fieldSpec.isSingleValueField() ? BYTES : BYTES_ARRAY; default: throw new UnsupportedOperationException( "Unsupported data type: " + dataType + " in field: " + fieldSpec.getName()); diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java index 421e2ea..04355b8 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java @@ -29,18 +29,18 @@ import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.*; public class DataSchemaTest { private static final String[] COLUMN_NAMES = { "int", "long", "float", "double", "string", "object", "int_array", "long_array", "float_array", "double_array", - "string_array", "boolean_array", "timestamp_array" + "string_array", "boolean_array", "timestamp_array", "bytes_array" }; private static final int NUM_COLUMNS = COLUMN_NAMES.length; private static final DataSchema.ColumnDataType[] COLUMN_DATA_TYPES = {INT, LONG, FLOAT, DOUBLE, STRING, OBJECT, INT_ARRAY, LONG_ARRAY, FLOAT_ARRAY, DOUBLE_ARRAY, STRING_ARRAY, - BOOLEAN_ARRAY, TIMESTAMP_ARRAY}; + BOOLEAN_ARRAY, TIMESTAMP_ARRAY, BYTES_ARRAY}; private static final DataSchema.ColumnDataType[] COMPATIBLE_COLUMN_DATA_TYPES = {LONG, FLOAT, DOUBLE, INT, STRING, OBJECT, LONG_ARRAY, FLOAT_ARRAY, DOUBLE_ARRAY, INT_ARRAY, STRING_ARRAY, - BOOLEAN_ARRAY, TIMESTAMP_ARRAY}; + BOOLEAN_ARRAY, TIMESTAMP_ARRAY, BYTES_ARRAY}; private static final DataSchema.ColumnDataType[] UPGRADED_COLUMN_DATA_TYPES = { LONG, DOUBLE, DOUBLE, DOUBLE, STRING, OBJECT, LONG_ARRAY, DOUBLE_ARRAY, DOUBLE_ARRAY, DOUBLE_ARRAY, STRING_ARRAY, - BOOLEAN_ARRAY, TIMESTAMP_ARRAY + BOOLEAN_ARRAY, TIMESTAMP_ARRAY, BYTES_ARRAY }; @Test @@ -92,7 +92,7 @@ public class DataSchemaTest { Assert.assertEquals(dataSchema.toString(), "[int(INT),long(LONG),float(FLOAT),double(DOUBLE),string(STRING),object(OBJECT),int_array(INT_ARRAY)," + "long_array(LONG_ARRAY),float_array(FLOAT_ARRAY),double_array(DOUBLE_ARRAY),string_array(STRING_ARRAY)," - + "boolean_array(BOOLEAN_ARRAY),timestamp_array(TIMESTAMP_ARRAY)]"); + + "boolean_array(BOOLEAN_ARRAY),timestamp_array(TIMESTAMP_ARRAY),bytes_array(BYTES_ARRAY)]"); } @Test @@ -107,6 +107,7 @@ public class DataSchemaTest { Assert.assertFalse(columnDataType.isCompatible(STRING)); Assert.assertFalse(columnDataType.isCompatible(DOUBLE_ARRAY)); Assert.assertFalse(columnDataType.isCompatible(STRING_ARRAY)); + Assert.assertFalse(columnDataType.isCompatible(BYTES_ARRAY)); } for (DataSchema.ColumnDataType columnDataType : new DataSchema.ColumnDataType[]{FLOAT, DOUBLE}) { @@ -119,6 +120,7 @@ public class DataSchemaTest { Assert.assertFalse(columnDataType.isCompatible(STRING)); Assert.assertFalse(columnDataType.isCompatible(LONG_ARRAY)); Assert.assertFalse(columnDataType.isCompatible(STRING_ARRAY)); + Assert.assertFalse(columnDataType.isCompatible(BYTES_ARRAY)); } Assert.assertFalse(STRING.isNumber()); @@ -130,6 +132,7 @@ public class DataSchemaTest { Assert.assertTrue(STRING.isCompatible(STRING)); Assert.assertFalse(STRING.isCompatible(DOUBLE_ARRAY)); Assert.assertFalse(STRING.isCompatible(STRING_ARRAY)); + Assert.assertFalse(STRING.isCompatible(BYTES_ARRAY)); Assert.assertFalse(OBJECT.isNumber()); Assert.assertFalse(OBJECT.isWholeNumber()); @@ -140,6 +143,7 @@ public class DataSchemaTest { Assert.assertFalse(OBJECT.isCompatible(STRING)); Assert.assertFalse(OBJECT.isCompatible(DOUBLE_ARRAY)); Assert.assertFalse(OBJECT.isCompatible(STRING_ARRAY)); + Assert.assertFalse(OBJECT.isCompatible(BYTES_ARRAY)); Assert.assertTrue(OBJECT.isCompatible(OBJECT)); for (DataSchema.ColumnDataType columnDataType : new DataSchema.ColumnDataType[]{INT_ARRAY, LONG_ARRAY}) { @@ -152,6 +156,7 @@ public class DataSchemaTest { Assert.assertFalse(columnDataType.isCompatible(STRING)); Assert.assertTrue(columnDataType.isCompatible(DOUBLE_ARRAY)); Assert.assertFalse(columnDataType.isCompatible(STRING_ARRAY)); + Assert.assertFalse(columnDataType.isCompatible(BYTES_ARRAY)); } for (DataSchema.ColumnDataType columnDataType : new DataSchema.ColumnDataType[]{FLOAT_ARRAY, DOUBLE_ARRAY}) { @@ -164,10 +169,11 @@ public class DataSchemaTest { Assert.assertFalse(columnDataType.isCompatible(STRING)); Assert.assertTrue(columnDataType.isCompatible(LONG_ARRAY)); Assert.assertFalse(columnDataType.isCompatible(STRING_ARRAY)); + Assert.assertFalse(columnDataType.isCompatible(BYTES_ARRAY)); } for (DataSchema.ColumnDataType columnDataType : new DataSchema.ColumnDataType[]{STRING_ARRAY, BOOLEAN_ARRAY, - TIMESTAMP_ARRAY}) { + TIMESTAMP_ARRAY, BYTES_ARRAY}) { Assert.assertFalse(columnDataType.isNumber()); Assert.assertFalse(columnDataType.isWholeNumber()); Assert.assertTrue(columnDataType.isArray()); @@ -192,5 +198,6 @@ public class DataSchemaTest { Assert.assertEquals(fromDataType(FieldSpec.DataType.STRING, false), STRING_ARRAY); Assert.assertEquals(fromDataType(FieldSpec.DataType.BOOLEAN, false), BOOLEAN_ARRAY); Assert.assertEquals(fromDataType(FieldSpec.DataType.TIMESTAMP, false), TIMESTAMP_ARRAY); + Assert.assertEquals(fromDataType(FieldSpec.DataType.BYTES, false), BYTES_ARRAY); } } diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/PinotDataTypeTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/PinotDataTypeTest.java index 649bb9e..1ddb766 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/utils/PinotDataTypeTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/PinotDataTypeTest.java @@ -25,6 +25,7 @@ import java.util.Map; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.pinot.common.utils.PinotDataType.*; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -130,7 +131,9 @@ public class PinotDataTypeTest { {LONG_ARRAY, TIMESTAMP_ARRAY, new long[] {1000000L, 2000000L}, new Timestamp[] { new Timestamp(1000000L), new Timestamp(2000000L) }}, {TIMESTAMP_ARRAY, TIMESTAMP_ARRAY, new Timestamp[] { new Timestamp(1000000L), new Timestamp(2000000L) }, - new Timestamp[] { new Timestamp(1000000L), new Timestamp(2000000L) }} + new Timestamp[] { new Timestamp(1000000L), new Timestamp(2000000L) }}, + {BYTES_ARRAY, BYTES_ARRAY, new byte[][] { "foo".getBytes(UTF_8), "bar".getBytes(UTF_8) }, + new byte[][] { "foo".getBytes(UTF_8), "bar".getBytes(UTF_8) }} }; } @@ -257,6 +260,7 @@ public class PinotDataTypeTest { testCases.put(String.class, STRING_ARRAY); testCases.put(Boolean.class, BOOLEAN_ARRAY); testCases.put(Timestamp.class, TIMESTAMP_ARRAY); + testCases.put(byte[].class, BYTES_ARRAY); for (Map.Entry<Class<?>, PinotDataType> tc : testCases.entrySet()) { assertEquals(getMultiValueType(tc.getKey()), tc.getValue()); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java b/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java index f1c7fbb..a3b2e24 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java @@ -207,7 +207,7 @@ public class RawIndexConverter { int numDocs = _originalSegmentMetadata.getTotalDocs(); int lengthOfLongestEntry = _originalSegmentMetadata.getColumnMetadataFor(columnName).getColumnMaxLength(); try (ForwardIndexCreator rawIndexCreator = SegmentColumnarIndexCreator - .getRawIndexCreatorForColumn(_convertedIndexDir, ChunkCompressionType.SNAPPY, columnName, storedType, numDocs, + .getRawIndexCreatorForSVColumn(_convertedIndexDir, ChunkCompressionType.SNAPPY, columnName, storedType, numDocs, lengthOfLongestEntry, false, BaseChunkSVForwardIndexWriter.DEFAULT_VERSION); ForwardIndexReaderContext readerContext = reader.createContext()) { switch (storedType) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4Compressor.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4Compressor.java index e0198b1..bc9de7a 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4Compressor.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/LZ4Compressor.java @@ -48,4 +48,9 @@ public class LZ4Compressor implements ChunkCompressor { outCompressed.flip(); return outCompressed.limit(); } + + @Override + public int maxCompressedSize(int uncompressedSize) { + return _lz4Factory.fastCompressor().maxCompressedLength(uncompressedSize); + } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/PassThroughCompressor.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/PassThroughCompressor.java index 30c69c8..b7d876b 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/PassThroughCompressor.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/PassThroughCompressor.java @@ -39,4 +39,9 @@ public class PassThroughCompressor implements ChunkCompressor { outCompressed.flip(); return outCompressed.limit(); } + + @Override + public int maxCompressedSize(int uncompressedSize) { + return uncompressedSize; + } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/SnappyCompressor.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/SnappyCompressor.java index e183db7..0b87afe 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/SnappyCompressor.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/SnappyCompressor.java @@ -34,4 +34,9 @@ public class SnappyCompressor implements ChunkCompressor { throws IOException { return Snappy.compress(inDecompressed, outCompressed); } + + @Override + public int maxCompressedSize(int uncompressedSize) { + return Snappy.maxCompressedLength(uncompressedSize); + } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ZstandardCompressor.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ZstandardCompressor.java index 33c607c..931f969 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ZstandardCompressor.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ZstandardCompressor.java @@ -40,4 +40,9 @@ public class ZstandardCompressor implements ChunkCompressor { outCompressed.flip(); return compressedSize; } + + @Override + public int maxCompressedSize(int uncompressedSize) { + return (int) Zstd.compressBound(uncompressedSize); + } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkSVForwardIndexWriter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkSVForwardIndexWriter.java index 5be72b0..1e92e1f 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkSVForwardIndexWriter.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/BaseChunkSVForwardIndexWriter.java @@ -21,14 +21,16 @@ package org.apache.pinot.segment.local.io.writer.impl; import com.google.common.base.Preconditions; import java.io.Closeable; import java.io.File; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.nio.channels.FileChannel; import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory; import org.apache.pinot.segment.spi.compression.ChunkCompressionType; import org.apache.pinot.segment.spi.compression.ChunkCompressor; +import org.apache.pinot.segment.spi.memory.CleanerUtil; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,10 +47,10 @@ public abstract class BaseChunkSVForwardIndexWriter implements Closeable { private static final int FILE_HEADER_ENTRY_CHUNK_OFFSET_SIZE_V1V2 = Integer.BYTES; private static final int FILE_HEADER_ENTRY_CHUNK_OFFSET_SIZE_V3 = Long.BYTES; - protected final FileChannel _dataFile; - protected ByteBuffer _header; + private final File _file; + private final FileChannel _dataChannel; + private final ByteBuffer _header; protected final ByteBuffer _chunkBuffer; - protected final ByteBuffer _compressedBuffer; protected final ChunkCompressor _chunkCompressor; protected int _chunkSize; @@ -66,19 +68,21 @@ public abstract class BaseChunkSVForwardIndexWriter implements Closeable { * @param chunkSize Size of chunk * @param sizeOfEntry Size of entry (in bytes), max size for variable byte implementation. * @param version version of File - * @throws FileNotFoundException + * @throws IOException if the file isn't found or can't be mapped */ protected BaseChunkSVForwardIndexWriter(File file, ChunkCompressionType compressionType, int totalDocs, int numDocsPerChunk, int chunkSize, int sizeOfEntry, int version) - throws FileNotFoundException { + throws IOException { Preconditions.checkArgument(version == DEFAULT_VERSION || version == CURRENT_VERSION); + _file = file; + _headerEntryChunkOffsetSize = getHeaderEntryChunkOffsetSize(version); + _dataOffset = headerSize(totalDocs, numDocsPerChunk, _headerEntryChunkOffsetSize); _chunkSize = chunkSize; _chunkCompressor = ChunkCompressorFactory.getCompressor(compressionType); - _headerEntryChunkOffsetSize = getHeaderEntryChunkOffsetSize(version); - _dataOffset = writeHeader(compressionType, totalDocs, numDocsPerChunk, sizeOfEntry, version); _chunkBuffer = ByteBuffer.allocateDirect(chunkSize); - _compressedBuffer = ByteBuffer.allocateDirect(chunkSize * 2); - _dataFile = new RandomAccessFile(file, "rw").getChannel(); + _dataChannel = new RandomAccessFile(file, "rw").getChannel(); + _header = _dataChannel.map(FileChannel.MapMode.READ_WRITE, 0, _dataOffset); + writeHeader(compressionType, totalDocs, numDocsPerChunk, sizeOfEntry, version); } public static int getHeaderEntryChunkOffsetSize(int version) { @@ -102,10 +106,13 @@ public abstract class BaseChunkSVForwardIndexWriter implements Closeable { writeChunk(); } - // Write the header and close the file. - _header.flip(); - _dataFile.write(_header, 0); - _dataFile.close(); + if (CleanerUtil.UNMAP_SUPPORTED) { + CleanerUtil.getCleaner().freeBuffer(_header); + } + + // we will have overmapped by (maxCompressedSize - actualCompressedSize) for the most recent chunk + _dataChannel.truncate(_dataOffset); + _dataChannel.close(); } /** @@ -116,14 +123,10 @@ public abstract class BaseChunkSVForwardIndexWriter implements Closeable { * @param numDocsPerChunk Number of documents per chunk * @param sizeOfEntry Size of each entry * @param version Version of file - * @return Size of header */ - private int writeHeader(ChunkCompressionType compressionType, int totalDocs, int numDocsPerChunk, int sizeOfEntry, + private void writeHeader(ChunkCompressionType compressionType, int totalDocs, int numDocsPerChunk, int sizeOfEntry, int version) { int numChunks = (totalDocs + numDocsPerChunk - 1) / numDocsPerChunk; - int headerSize = (7 * Integer.BYTES) + (numChunks * _headerEntryChunkOffsetSize); - - _header = ByteBuffer.allocateDirect(headerSize); int offset = 0; _header.putInt(version); @@ -151,8 +154,11 @@ public abstract class BaseChunkSVForwardIndexWriter implements Closeable { int dataHeaderStart = offset + Integer.BYTES; _header.putInt(dataHeaderStart); } + } - return headerSize; + private static int headerSize(int totalDocs, int numDocsPerChunk, int headerEntryChunkOffsetSize) { + int numChunks = (totalDocs + numDocsPerChunk - 1) / numDocsPerChunk; + return (7 * Integer.BYTES) + (numChunks * headerEntryChunkOffsetSize); } /** @@ -166,13 +172,15 @@ public abstract class BaseChunkSVForwardIndexWriter implements Closeable { * */ protected void writeChunk() { - int sizeToWrite; + int sizeWritten; _chunkBuffer.flip(); - try { - sizeToWrite = _chunkCompressor.compress(_chunkBuffer, _compressedBuffer); - _dataFile.write(_compressedBuffer, _dataOffset); - _compressedBuffer.clear(); + int maxCompressedSize = _chunkCompressor.maxCompressedSize(_chunkBuffer.limit()); + // compress directly in to the mapped output rather keep a large buffer to compress into + try (PinotDataBuffer compressedBuffer = PinotDataBuffer.mapFile(_file, false, _dataOffset, + maxCompressedSize, ByteOrder.BIG_ENDIAN, "forward index chunk")) { + ByteBuffer view = compressedBuffer.toDirectByteBuffer(0, maxCompressedSize); + sizeWritten = _chunkCompressor.compress(_chunkBuffer, view); } catch (IOException e) { LOGGER.error("Exception caught while compressing/writing data chunk", e); throw new RuntimeException(e); @@ -184,7 +192,7 @@ public abstract class BaseChunkSVForwardIndexWriter implements Closeable { _header.putLong(_dataOffset); } - _dataOffset += sizeToWrite; + _dataOffset += sizeWritten; _chunkBuffer.clear(); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteChunkSVForwardIndexWriter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteChunkSVForwardIndexWriter.java index 359c48e..8d9ad7e 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteChunkSVForwardIndexWriter.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedByteChunkSVForwardIndexWriter.java @@ -20,6 +20,7 @@ package org.apache.pinot.segment.local.io.writer.impl; import java.io.File; import java.io.FileNotFoundException; +import java.io.IOException; import javax.annotation.concurrent.NotThreadSafe; import org.apache.pinot.segment.spi.compression.ChunkCompressionType; @@ -65,10 +66,11 @@ public class FixedByteChunkSVForwardIndexWriter extends BaseChunkSVForwardIndexW * @param sizeOfEntry Size of entry (in bytes) * @param writerVersion writer format version * @throws FileNotFoundException Throws {@link FileNotFoundException} if the specified file is not found. + * @throws IOException Throws {@link IOException} if there are any errors mapping the underlying ByteBuffer. */ public FixedByteChunkSVForwardIndexWriter(File file, ChunkCompressionType compressionType, int totalDocs, int numDocsPerChunk, int sizeOfEntry, int writerVersion) - throws FileNotFoundException { + throws IOException { super(file, compressionType, totalDocs, numDocsPerChunk, (sizeOfEntry * numDocsPerChunk), sizeOfEntry, writerVersion); _chunkDataOffset = 0; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java index c06e528..fed1200 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java @@ -54,6 +54,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; */ @NotThreadSafe public class VarByteChunkSVForwardIndexWriter extends BaseChunkSVForwardIndexWriter { + public static final int CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE = Integer.BYTES; private final int _chunkHeaderSize; @@ -69,11 +70,13 @@ public class VarByteChunkSVForwardIndexWriter extends BaseChunkSVForwardIndexWri * @param numDocsPerChunk Number of documents per chunk. * @param lengthOfLongestEntry Length of longest entry (in bytes) * @param writerVersion writer format version - * @throws FileNotFoundException Throws {@link FileNotFoundException} if the specified file is not found. + * @throws FileNotFoundException Throws {@link FileNotFoundException} if the specified file is + * not found. */ - public VarByteChunkSVForwardIndexWriter(File file, ChunkCompressionType compressionType, int totalDocs, + public VarByteChunkSVForwardIndexWriter(File file, ChunkCompressionType compressionType, + int totalDocs, int numDocsPerChunk, int lengthOfLongestEntry, int writerVersion) - throws FileNotFoundException { + throws IOException { super(file, compressionType, totalDocs, numDocsPerChunk, numDocsPerChunk * (CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE + lengthOfLongestEntry), // chunkSize @@ -96,25 +99,66 @@ public class VarByteChunkSVForwardIndexWriter extends BaseChunkSVForwardIndexWri _chunkBuffer.put(value); _chunkDataOffSet += value.length; - // If buffer filled, then compress and write to file. - if (_chunkHeaderOffset == _chunkHeaderSize) { - writeChunk(); + writeChunkIfNecessary(); + } + + // Note: some duplication is tolerated between these overloads for the sake of memory efficiency + + public void putStrings(String[] values) { + // the entire String[] will be encoded as a single string, write the header here + _chunkBuffer.putInt(_chunkHeaderOffset, _chunkDataOffSet); + _chunkHeaderOffset += CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE; + // write all the strings into the data buffer as if it's a single string, + // but with its own embedded header so offsets to strings within the body + // can be located + int headerPosition = _chunkDataOffSet; + int headerSize = Integer.BYTES + Integer.BYTES * values.length; + int bodyPosition = headerPosition + headerSize; + _chunkBuffer.position(bodyPosition); + int bodySize = 0; + for (int i = 0, h = headerPosition + Integer.BYTES; i < values.length; i++, h += Integer.BYTES) { + byte[] utf8 = values[i].getBytes(UTF_8); + _chunkBuffer.putInt(h, utf8.length); + _chunkBuffer.put(utf8); + bodySize += utf8.length; } + _chunkDataOffSet += headerSize + bodySize; + // go back to write the number of strings embedded in the big string + _chunkBuffer.putInt(headerPosition, values.length); + + writeChunkIfNecessary(); } - @Override - public void close() - throws IOException { + public void putByteArrays(byte[][] values) { + // the entire byte[][] will be encoded as a single string, write the header here + _chunkBuffer.putInt(_chunkHeaderOffset, _chunkDataOffSet); + _chunkHeaderOffset += CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE; + // write all the byte[]s into the data buffer as if it's a single byte[], + // but with its own embedded header so offsets to byte[]s within the body + // can be located + int headerPosition = _chunkDataOffSet; + int headerSize = Integer.BYTES + Integer.BYTES * values.length; + int bodyPosition = headerPosition + headerSize; + _chunkBuffer.position(bodyPosition); + int bodySize = 0; + for (int i = 0, h = headerPosition + Integer.BYTES; i < values.length; i++, h += Integer.BYTES) { + byte[] utf8 = values[i]; + _chunkBuffer.putInt(h, utf8.length); + _chunkBuffer.put(utf8); + bodySize += utf8.length; + } + _chunkDataOffSet += headerSize + bodySize; + // go back to write the number of byte[]s embedded in the big byte[] + _chunkBuffer.putInt(headerPosition, values.length); + + writeChunkIfNecessary(); + } - // Write the chunk if it is non-empty. - if (_chunkBuffer.position() > 0) { + private void writeChunkIfNecessary() { + // If buffer filled, then compress and write to file. + if (_chunkHeaderOffset == _chunkHeaderSize) { writeChunk(); } - - // Write the header and close the file. - _header.flip(); - _dataFile.write(_header, 0); - _dataFile.close(); } /** @@ -125,7 +169,6 @@ public class VarByteChunkSVForwardIndexWriter extends BaseChunkSVForwardIndexWri * <li> Updates the header with the current chunks offset. </li> * <li> Clears up the buffers, so that they can be reused. </li> * </ul> - * */ protected void writeChunk() { // For partially filled chunks, we still need to clear the offsets for remaining rows, as we reuse this buffer. diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java index 2829e8e..dcc0ea2 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java @@ -23,6 +23,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import java.io.File; import java.io.IOException; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -34,7 +35,9 @@ import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.pinot.common.utils.FileUtils; import org.apache.pinot.segment.local.io.util.PinotDataBitSet; import org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter; +import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueFixedByteRawIndexCreator; import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueUnsortedForwardIndexCreator; +import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator; import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueFixedByteRawIndexCreator; import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueSortedForwardIndexCreator; import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueUnsortedForwardIndexCreator; @@ -228,10 +231,6 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { } } else { // Create raw index - - // TODO: add support to multi-value column and inverted index - Preconditions.checkState(fieldSpec.isSingleValueField(), "Cannot create raw index for multi-value column: %s", - columnName); Preconditions.checkState(!invertedIndexColumns.contains(columnName), "Cannot create inverted index for raw index column: %s", columnName); @@ -241,9 +240,16 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { boolean deriveNumDocsPerChunk = shouldDeriveNumDocsPerChunk(columnName, segmentCreationSpec.getColumnProperties()); int writerVersion = rawIndexWriterVersion(columnName, segmentCreationSpec.getColumnProperties()); - _forwardIndexCreatorMap.put(columnName, - getRawIndexCreatorForColumn(_indexDir, compressionType, columnName, storedType, _totalDocs, - indexCreationInfo.getLengthOfLongestEntry(), deriveNumDocsPerChunk, writerVersion)); + if (fieldSpec.isSingleValueField()) { + _forwardIndexCreatorMap.put(columnName, + getRawIndexCreatorForSVColumn(_indexDir, compressionType, columnName, storedType, _totalDocs, + indexCreationInfo.getLengthOfLongestEntry(), deriveNumDocsPerChunk, writerVersion)); + } else { + _forwardIndexCreatorMap.put(columnName, + getRawIndexCreatorForMVColumn(_indexDir, compressionType, columnName, storedType, _totalDocs, + indexCreationInfo.getMaxNumberOfMultiValueElements(), deriveNumDocsPerChunk, writerVersion, + indexCreationInfo.getMaxRowLengthInBytes())); + } } if (textIndexColumns.contains(columnName)) { @@ -366,10 +372,6 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { String column = spec.getName(); if (config.getRawIndexCreationColumns().contains(column) || config.getRawIndexCompressionType() .containsKey(column)) { - if (!spec.isSingleValueField()) { - throw new RuntimeException( - "Creation of indices without dictionaries is supported for single valued columns only."); - } return false; } return info.isCreateDictionary(); @@ -387,16 +389,19 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { throw new RuntimeException("Null value for column:" + columnName); } - boolean isSingleValue = _schema.getFieldSpecFor(columnName).isSingleValueField(); + FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName); + + //get dictionaryCreator, will be null if column is not dictionaryEncoded SegmentDictionaryCreator dictionaryCreator = _dictionaryCreatorMap.get(columnName); - if (isSingleValue) { - // SV column - // text-index enabled SV column - TextIndexCreator textIndexCreator = _textIndexCreatorMap.get(columnName); - if (textIndexCreator != null) { - textIndexCreator.add((String) columnValueToIndex); - } + // text-index + TextIndexCreator textIndexCreator = _textIndexCreatorMap.get(columnName); + if (textIndexCreator != null) { + textIndexCreator.add((String) columnValueToIndex); + } + + if (fieldSpec.isSingleValueField()) { + // Single Value column JsonIndexCreator jsonIndexCreator = _jsonIndexCreatorMap.get(columnName); if (jsonIndexCreator != null) { jsonIndexCreator.add((String) columnValueToIndex); @@ -452,12 +457,107 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { } } } else { - // MV column (always dictionary encoded) - int[] dictIds = dictionaryCreator.indexOfMV(columnValueToIndex); - forwardIndexCreator.putDictIdMV(dictIds); - DictionaryBasedInvertedIndexCreator invertedIndexCreator = _invertedIndexCreatorMap.get(columnName); - if (invertedIndexCreator != null) { - invertedIndexCreator.add(dictIds, dictIds.length); + if (dictionaryCreator != null) { + //dictionary encoded + int[] dictIds = dictionaryCreator.indexOfMV(columnValueToIndex); + forwardIndexCreator.putDictIdMV(dictIds); + DictionaryBasedInvertedIndexCreator invertedIndexCreator = _invertedIndexCreatorMap + .get(columnName); + if (invertedIndexCreator != null) { + invertedIndexCreator.add(dictIds, dictIds.length); + } + } else { + // for text index on raw columns, check the config to determine if actual raw value should + // be stored or not + if (textIndexCreator != null && !shouldStoreRawValueForTextIndex(columnName)) { + Object value = _columnProperties.get(columnName) + .get(FieldConfig.TEXT_INDEX_RAW_VALUE); + if (value == null) { + value = FieldConfig.TEXT_INDEX_DEFAULT_RAW_VALUE; + } + if (forwardIndexCreator.getValueType().getStoredType() == DataType.STRING) { + value = String.valueOf(value); + int length = ((String[]) columnValueToIndex).length; + columnValueToIndex = new String[length]; + Arrays.fill((String[]) columnValueToIndex, value); + } else if (forwardIndexCreator.getValueType().getStoredType() == DataType.BYTES) { + int length = ((byte[][]) columnValueToIndex).length; + columnValueToIndex = new byte[length][]; + Arrays.fill((byte[][]) columnValueToIndex, String.valueOf(value).getBytes()); + } else { + throw new RuntimeException("Text Index is only supported for STRING and BYTES stored type"); + } + } + switch (forwardIndexCreator.getValueType()) { + case INT: + if (columnValueToIndex instanceof int[]) { + forwardIndexCreator.putIntMV((int[]) columnValueToIndex); + } else if (columnValueToIndex instanceof Object[]) { + int[] array = new int[((Object[]) columnValueToIndex).length]; + for (int i = 0; i < array.length; i++) { + array[i] = (Integer) ((Object[]) columnValueToIndex)[i]; + } + forwardIndexCreator.putIntMV(array); + } + break; + case LONG: + if (columnValueToIndex instanceof long[]) { + forwardIndexCreator.putLongMV((long[]) columnValueToIndex); + } else if (columnValueToIndex instanceof Object[]) { + long[] array = new long[((Object[]) columnValueToIndex).length]; + for (int i = 0; i < array.length; i++) { + array[i] = (Long) ((Object[]) columnValueToIndex)[i]; + } + forwardIndexCreator.putLongMV(array); + } + break; + case FLOAT: + if (columnValueToIndex instanceof float[]) { + forwardIndexCreator.putFloatMV((float[]) columnValueToIndex); + } else if (columnValueToIndex instanceof Object[]) { + float[] array = new float[((Object[]) columnValueToIndex).length]; + for (int i = 0; i < array.length; i++) { + array[i] = (Float) ((Object[]) columnValueToIndex)[i]; + } + forwardIndexCreator.putFloatMV(array); + } + break; + case DOUBLE: + if (columnValueToIndex instanceof double[]) { + forwardIndexCreator.putDoubleMV((double[]) columnValueToIndex); + } else if (columnValueToIndex instanceof Object[]) { + double[] array = new double[((Object[]) columnValueToIndex).length]; + for (int i = 0; i < array.length; i++) { + array[i] = (Double) ((Object[]) columnValueToIndex)[i]; + } + forwardIndexCreator.putDoubleMV(array); + } + break; + case STRING: + if (columnValueToIndex instanceof String[]) { + forwardIndexCreator.putStringMV((String[]) columnValueToIndex); + } else if (columnValueToIndex instanceof Object[]) { + String[] array = new String[((Object[]) columnValueToIndex).length]; + for (int i = 0; i < array.length; i++) { + array[i] = (String) ((Object[]) columnValueToIndex)[i]; + } + forwardIndexCreator.putStringMV(array); + } + break; + case BYTES: + if (columnValueToIndex instanceof byte[][]) { + forwardIndexCreator.putBytesMV((byte[][]) columnValueToIndex); + } else if (columnValueToIndex instanceof Object[]) { + byte[][] array = new byte[((Object[]) columnValueToIndex).length][]; + for (int i = 0; i < array.length; i++) { + array[i] = (byte[]) ((Object[]) columnValueToIndex)[i]; + } + forwardIndexCreator.putBytesMV(array); + } + break; + default: + throw new IllegalStateException(); + } } } @@ -734,10 +834,11 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { * @param deriveNumDocsPerChunk true if varbyte writer should auto-derive the number of rows per chunk * @param writerVersion version to use for the raw index writer * @return raw index creator - * @throws IOException */ - public static ForwardIndexCreator getRawIndexCreatorForColumn(File file, ChunkCompressionType compressionType, - String column, DataType dataType, int totalDocs, int lengthOfLongestEntry, boolean deriveNumDocsPerChunk, + public static ForwardIndexCreator getRawIndexCreatorForSVColumn(File file, + ChunkCompressionType compressionType, + String column, DataType dataType, int totalDocs, int lengthOfLongestEntry, + boolean deriveNumDocsPerChunk, int writerVersion) throws IOException { switch (dataType.getStoredType()) { @@ -756,6 +857,41 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { } } + /** + * Helper method to build the raw index creator for the column. + * Assumes that column to be indexed is single valued. + * + * @param file Output index file + * @param column Column name + * @param totalDocs Total number of documents to index + * @param deriveNumDocsPerChunk true if varbyte writer should auto-derive the number of rows + * per chunk + * @param writerVersion version to use for the raw index writer + * @param maxRowLengthInBytes the length of the longest row in bytes + * @return raw index creator + */ + public static ForwardIndexCreator getRawIndexCreatorForMVColumn(File file, ChunkCompressionType compressionType, + String column, DataType dataType, final int totalDocs, + final int maxNumberOfMultiValueElements, boolean deriveNumDocsPerChunk, int writerVersion, + int maxRowLengthInBytes) + throws IOException { + switch (dataType.getStoredType()) { + case INT: + case LONG: + case FLOAT: + case DOUBLE: + return new MultiValueFixedByteRawIndexCreator(file, compressionType, column, totalDocs, dataType, + dataType.getStoredType().size(), maxNumberOfMultiValueElements, deriveNumDocsPerChunk, writerVersion); + case STRING: + case BYTES: + return new MultiValueVarByteRawIndexCreator(file, compressionType, column, totalDocs, dataType, writerVersion, + maxRowLengthInBytes); + default: + throw new UnsupportedOperationException( + "Data type not supported for raw indexing: " + dataType); + } + } + @Override public void close() throws IOException { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java new file mode 100644 index 0000000..572c793 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.creator.impl.fwd; + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter; +import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter; +import org.apache.pinot.segment.spi.V1Constants.Indexes; +import org.apache.pinot.segment.spi.compression.ChunkCompressionType; +import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator; +import org.apache.pinot.spi.data.FieldSpec.DataType; + + +/** + * Forward index creator for raw (non-dictionary-encoded) single-value column of variable length + * data type (STRING, + * BYTES). + */ +public class MultiValueFixedByteRawIndexCreator implements ForwardIndexCreator { + + private static final int DEFAULT_NUM_DOCS_PER_CHUNK = 1000; + private static final int TARGET_MAX_CHUNK_SIZE = 1024 * 1024; + + private final VarByteChunkSVForwardIndexWriter _indexWriter; + private final DataType _valueType; + + /** + * Create a var-byte raw index creator for the given column + * + * @param baseIndexDir Index directory + * @param compressionType Type of compression to use + * @param column Name of column to index + * @param totalDocs Total number of documents to index + * @param valueType Type of the values + */ + public MultiValueFixedByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType, + String column, + int totalDocs, DataType valueType, final int maxLengthOfEachEntry, + final int maxNumberOfMultiValueElements) + throws IOException { + this(baseIndexDir, compressionType, column, totalDocs, valueType, maxLengthOfEachEntry, + maxNumberOfMultiValueElements, false, + BaseChunkSVForwardIndexWriter.DEFAULT_VERSION); + } + + /** + * Create a var-byte raw index creator for the given column + * + * @param baseIndexDir Index directory + * @param compressionType Type of compression to use + * @param column Name of column to index + * @param totalDocs Total number of documents to index + * @param valueType Type of the values + * @param maxLengthOfEachEntry length of longest entry (in bytes) + * @param deriveNumDocsPerChunk true if writer should auto-derive the number of rows per chunk + * @param writerVersion writer format version + */ + public MultiValueFixedByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType, + String column, int totalDocs, DataType valueType, final int maxLengthOfEachEntry, + final int maxNumberOfMultiValueElements, boolean deriveNumDocsPerChunk, + int writerVersion) + throws IOException { + File file = new File(baseIndexDir, + column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION); + FileUtils.deleteQuietly(file); + int totalMaxLength = maxNumberOfMultiValueElements * maxLengthOfEachEntry; + int numDocsPerChunk = + deriveNumDocsPerChunk ? getNumDocsPerChunk(totalMaxLength) : DEFAULT_NUM_DOCS_PER_CHUNK; + _indexWriter = new VarByteChunkSVForwardIndexWriter(file, compressionType, totalDocs, + numDocsPerChunk, totalMaxLength, writerVersion); + _valueType = valueType; + } + + @VisibleForTesting + public static int getNumDocsPerChunk(int lengthOfLongestEntry) { + int overheadPerEntry = + lengthOfLongestEntry + VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE; + return Math.max(TARGET_MAX_CHUNK_SIZE / overheadPerEntry, 1); + } + + @Override + public boolean isDictionaryEncoded() { + return false; + } + + @Override + public boolean isSingleValue() { + return false; + } + + @Override + public DataType getValueType() { + return _valueType; + } + + @Override + public void putIntMV(final int[] values) { + + byte[] bytes = new byte[Integer.BYTES + + values.length * Integer.BYTES]; //numValues, bytes required to store the content + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + //write the length + byteBuffer.putInt(values.length); + //write the content of each element + for (final int value : values) { + byteBuffer.putInt(value); + } + _indexWriter.putBytes(bytes); + } + + @Override + public void putLongMV(final long[] values) { + + byte[] bytes = new byte[Integer.BYTES + + values.length * Long.BYTES]; //numValues, bytes required to store the content + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + //write the length + byteBuffer.putInt(values.length); + //write the content of each element + for (final long value : values) { + byteBuffer.putLong(value); + } + _indexWriter.putBytes(bytes); + } + + @Override + public void putFloatMV(final float[] values) { + + byte[] bytes = new byte[Integer.BYTES + + values.length * Float.BYTES]; //numValues, bytes required to store the content + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + //write the length + byteBuffer.putInt(values.length); + //write the content of each element + for (final float value : values) { + byteBuffer.putFloat(value); + } + _indexWriter.putBytes(bytes); + } + + @Override + public void putDoubleMV(final double[] values) { + + byte[] bytes = new byte[Integer.BYTES + + values.length * Long.BYTES]; //numValues, bytes required to store the content + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + //write the length + byteBuffer.putInt(values.length); + //write the content of each element + for (final double value : values) { + byteBuffer.putDouble(value); + } + _indexWriter.putBytes(bytes); + } + + @Override + public void close() + throws IOException { + _indexWriter.close(); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java new file mode 100644 index 0000000..5d5b3cf --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.creator.impl.fwd; + +import java.io.File; +import java.io.IOException; +import org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter; +import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter; +import org.apache.pinot.segment.spi.V1Constants.Indexes; +import org.apache.pinot.segment.spi.compression.ChunkCompressionType; +import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator; +import org.apache.pinot.spi.data.FieldSpec.DataType; + + +/** + * Forward index creator for raw (non-dictionary-encoded) single-value column of variable length + * data type (STRING, + * BYTES). + */ +public class MultiValueVarByteRawIndexCreator implements ForwardIndexCreator { + + private static final int TARGET_MAX_CHUNK_SIZE = 1024 * 1024; + + private final VarByteChunkSVForwardIndexWriter _indexWriter; + private final DataType _valueType; + + /** + * Create a var-byte raw index creator for the given column + * + * @param baseIndexDir Index directory + * @param compressionType Type of compression to use + * @param column Name of column to index + * @param totalDocs Total number of documents to index + * @param valueType Type of the values + * @param maxRowLengthInBytes the length in bytes of the largest row + */ + public MultiValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType, + String column, + int totalDocs, DataType valueType, int maxRowLengthInBytes) + throws IOException { + this(baseIndexDir, compressionType, column, totalDocs, valueType, + BaseChunkSVForwardIndexWriter.DEFAULT_VERSION, maxRowLengthInBytes); + } + + /** + * Create a var-byte raw index creator for the given column + * + * @param baseIndexDir Index directory + * @param compressionType Type of compression to use + * @param column Name of column to index + * @param totalDocs Total number of documents to index + * @param valueType Type of the values + * @param maxRowLengthInBytes the size in bytes of the largest row, the chunk size cannot be smaller than this + * @param writerVersion writer format version + */ + public MultiValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType, + String column, int totalDocs, DataType valueType, int writerVersion, int maxRowLengthInBytes) + throws IOException { + //we will prepend the actual content with numElements and length array containing length of each element + int totalMaxLength = Integer.BYTES + Math.max(maxRowLengthInBytes, TARGET_MAX_CHUNK_SIZE); + File file = new File(baseIndexDir, + column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION); + int numDocsPerChunk = getNumDocsPerChunk(totalMaxLength); + _indexWriter = new VarByteChunkSVForwardIndexWriter(file, compressionType, totalDocs, + numDocsPerChunk, totalMaxLength, + writerVersion); + _valueType = valueType; + } + + private static int getNumDocsPerChunk(int lengthOfLongestEntry) { + int overheadPerEntry = + lengthOfLongestEntry + VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE; + return Math.max(TARGET_MAX_CHUNK_SIZE / overheadPerEntry, 1); + } + + @Override + public boolean isDictionaryEncoded() { + return false; + } + + @Override + public boolean isSingleValue() { + return false; + } + + @Override + public DataType getValueType() { + return _valueType; + } + + @Override + public void putStringMV(final String[] values) { + _indexWriter.putStrings(values); + } + + @Override + public void putBytesMV(final byte[][] values) { + _indexWriter.putByteArrays(values); + } + + @Override + public void close() + throws IOException { + _indexWriter.close(); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java index 284bf69..6407b55 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java @@ -47,6 +47,7 @@ public abstract class AbstractColumnStatisticsCollector implements ColumnStatist protected int _totalNumberOfEntries = 0; protected int _maxNumberOfMultiValues = 0; + protected int _maxLengthOfMultiValues = 0; private PartitionFunction _partitionFunction; private final int _numPartitions; private final Set<Integer> _partitions; @@ -72,6 +73,10 @@ public abstract class AbstractColumnStatisticsCollector implements ColumnStatist return _maxNumberOfMultiValues; } + public int getMaxLengthOfMultiValues() { + return _maxLengthOfMultiValues; + } + void addressSorted(Object entry) { if (_isSorted) { if (_previousValue != null) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java index a0cfd66..411238d 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java @@ -33,6 +33,7 @@ public class BytesColumnPredIndexStatsCollector extends AbstractColumnStatistics private int _minLength = Integer.MAX_VALUE; private int _maxLength = 0; + private int _maxRowLength = 0; private ByteArray[] _sortedValues; private boolean _sealed = false; @@ -42,16 +43,32 @@ public class BytesColumnPredIndexStatsCollector extends AbstractColumnStatistics @Override public void collect(Object entry) { - ByteArray value = new ByteArray((byte[]) entry); - addressSorted(value); - updatePartition(value); - _values.add(value); - - int length = value.length(); - _minLength = Math.min(_minLength, length); - _maxLength = Math.max(_maxLength, length); - - _totalNumberOfEntries++; + if (entry instanceof Object[]) { + Object[] values = (Object[]) entry; + int rowLength = 0; + for (Object obj : values) { + ByteArray value = new ByteArray((byte[]) obj); + _values.add(value); + int length = value.length(); + _minLength = Math.min(_minLength, length); + _maxLength = Math.max(_maxLength, length); + rowLength += length; + } + _maxNumberOfMultiValues = Math.max(_maxNumberOfMultiValues, values.length); + _maxRowLength = Math.max(_maxRowLength, rowLength); + updateTotalNumberOfEntries(values); + } else { + ByteArray value = new ByteArray((byte[]) entry); + addressSorted(value); + updatePartition(value); + _values.add(value); + + int length = value.length(); + _minLength = Math.min(_minLength, length); + _maxLength = Math.max(_maxLength, length); + _maxRowLength = _maxLength; + _totalNumberOfEntries++; + } } @Override @@ -92,6 +109,11 @@ public class BytesColumnPredIndexStatsCollector extends AbstractColumnStatistics } @Override + public int getMaxRowLengthInBytes() { + return _maxRowLength; + } + + @Override public int getCardinality() { if (_sealed) { return _sortedValues.length; @@ -106,7 +128,7 @@ public class BytesColumnPredIndexStatsCollector extends AbstractColumnStatistics @Override public void seal() { - _sortedValues = _values.toArray(new ByteArray[_values.size()]); + _sortedValues = _values.toArray(new ByteArray[0]); Arrays.sort(_sortedValues); _sealed = true; } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java index 068a11e..1350677 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java @@ -31,6 +31,7 @@ public class StringColumnPreIndexStatsCollector extends AbstractColumnStatistics private int _minLength = Integer.MAX_VALUE; private int _maxLength = 0; + private int _maxRowLength = 0; private String[] _sortedValues; private boolean _sealed = false; @@ -42,6 +43,7 @@ public class StringColumnPreIndexStatsCollector extends AbstractColumnStatistics public void collect(Object entry) { if (entry instanceof Object[]) { Object[] values = (Object[]) entry; + int rowLength = 0; for (Object obj : values) { String value = (String) obj; _values.add(value); @@ -49,9 +51,11 @@ public class StringColumnPreIndexStatsCollector extends AbstractColumnStatistics int length = value.getBytes(UTF_8).length; _minLength = Math.min(_minLength, length); _maxLength = Math.max(_maxLength, length); + rowLength += length; } _maxNumberOfMultiValues = Math.max(_maxNumberOfMultiValues, values.length); + _maxRowLength = Math.max(_maxRowLength, rowLength); updateTotalNumberOfEntries(values); } else { String value = (String) entry; @@ -62,6 +66,7 @@ public class StringColumnPreIndexStatsCollector extends AbstractColumnStatistics int valueLength = value.getBytes(UTF_8).length; _minLength = Math.min(_minLength, valueLength); _maxLength = Math.max(_maxLength, valueLength); + _maxRowLength = _maxLength; _totalNumberOfEntries++; } @@ -100,6 +105,11 @@ public class StringColumnPreIndexStatsCollector extends AbstractColumnStatistics } @Override + public int getMaxRowLengthInBytes() { + return _maxRowLength; + } + + @Override public int getCardinality() { if (_sealed) { return _sortedValues.length; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java new file mode 100644 index 0000000..1844957 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.index.readers.forward; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import javax.annotation.Nullable; +import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; +import org.apache.pinot.spi.data.FieldSpec.DataType; + +/** + * Chunk-based single-value raw (non-dictionary-encoded) forward index reader for values of + * variable + * length data type + * (STRING, BYTES). + * <p>For data layout, please refer to the documentation for {@link VarByteChunkSVForwardIndexWriter} + */ +public final class VarByteChunkMVForwardIndexReader extends BaseChunkSVForwardIndexReader { + + private static final int ROW_OFFSET_SIZE = VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE; + + private final int _maxChunkSize; + + public VarByteChunkMVForwardIndexReader(PinotDataBuffer dataBuffer, DataType valueType) { + super(dataBuffer, valueType); + _maxChunkSize = _numDocsPerChunk * (ROW_OFFSET_SIZE + _lengthOfLongestEntry); + } + + @Nullable + @Override + public ChunkReaderContext createContext() { + if (_isCompressed) { + return new ChunkReaderContext(_maxChunkSize); + } else { + return null; + } + } + + @Override + public int getStringMV(final int docId, final String[] valueBuffer, + final ChunkReaderContext context) { + byte[] compressedBytes; + if (_isCompressed) { + compressedBytes = getBytesCompressed(docId, context); + } else { + compressedBytes = getBytesUncompressed(docId); + } + ByteBuffer byteBuffer = ByteBuffer.wrap(compressedBytes); + int numValues = byteBuffer.getInt(); + int contentOffset = (numValues + 1) * Integer.BYTES; + for (int i = 0; i < numValues; i++) { + int length = byteBuffer.getInt((i + 1) * Integer.BYTES); + byte[] bytes = new byte[length]; + byteBuffer.position(contentOffset); + byteBuffer.get(bytes, 0, length); + valueBuffer[i] = new String(bytes, StandardCharsets.UTF_8); + contentOffset += length; + } + return numValues; + } + + @Override + public int getBytesMV(final int docId, final byte[][] valueBuffer, + final ChunkReaderContext context) { + byte[] compressedBytes; + if (_isCompressed) { + compressedBytes = getBytesCompressed(docId, context); + } else { + compressedBytes = getBytesUncompressed(docId); + } + ByteBuffer byteBuffer = ByteBuffer.wrap(compressedBytes); + int numValues = byteBuffer.getInt(); + int contentOffset = (numValues + 1) * Integer.BYTES; + for (int i = 0; i < numValues; i++) { + int length = byteBuffer.getInt((i + 1) * Integer.BYTES); + byte[] bytes = new byte[length]; + byteBuffer.position(contentOffset); + byteBuffer.get(bytes, 0, length); + valueBuffer[i] = bytes; + contentOffset += length; + } + return numValues; + } + + @Override + public byte[] getBytes(int docId, ChunkReaderContext context) { + if (_isCompressed) { + return getBytesCompressed(docId, context); + } else { + return getBytesUncompressed(docId); + } + } + + /** + * Helper method to read BYTES value from the compressed index. + */ + private byte[] getBytesCompressed(int docId, ChunkReaderContext context) { + int chunkRowId = docId % _numDocsPerChunk; + ByteBuffer chunkBuffer = getChunkBuffer(docId, context); + + // These offsets are offset in the chunk buffer + int valueStartOffset = chunkBuffer.getInt(chunkRowId * ROW_OFFSET_SIZE); + int valueEndOffset = getValueEndOffset(chunkRowId, chunkBuffer); + + byte[] bytes = new byte[valueEndOffset - valueStartOffset]; + chunkBuffer.position(valueStartOffset); + chunkBuffer.get(bytes); + return bytes; + } + + /** + * Helper method to read BYTES value from the uncompressed index. + */ + private byte[] getBytesUncompressed(int docId) { + int chunkId = docId / _numDocsPerChunk; + int chunkRowId = docId % _numDocsPerChunk; + + // These offsets are offset in the data buffer + long chunkStartOffset = getChunkPosition(chunkId); + long valueStartOffset = + chunkStartOffset + _dataBuffer.getInt(chunkStartOffset + (long) chunkRowId * ROW_OFFSET_SIZE); + long valueEndOffset = getValueEndOffset(chunkId, chunkRowId, chunkStartOffset); + + byte[] bytes = new byte[(int) (valueEndOffset - valueStartOffset)]; + _dataBuffer.copyTo(valueStartOffset, bytes); + return bytes; + } + + /** + * Helper method to compute the end offset of the value in the chunk buffer. + */ + private int getValueEndOffset(int rowId, ByteBuffer chunkBuffer) { + if (rowId == _numDocsPerChunk - 1) { + // Last row in the chunk + return chunkBuffer.limit(); + } else { + int valueEndOffset = chunkBuffer.getInt((rowId + 1) * ROW_OFFSET_SIZE); + if (valueEndOffset == 0) { + // Last row in the last chunk (chunk is incomplete, which stores 0 as the offset for the absent rows) + return chunkBuffer.limit(); + } else { + return valueEndOffset; + } + } + } + + /** + * Helper method to compute the end offset of the value in the data buffer. + */ + private long getValueEndOffset(int chunkId, int chunkRowId, long chunkStartOffset) { + if (chunkId == _numChunks - 1) { + // Last chunk + if (chunkRowId == _numDocsPerChunk - 1) { + // Last row in the last chunk + return _dataBuffer.size(); + } else { + int valueEndOffsetInChunk = _dataBuffer + .getInt(chunkStartOffset + (long) (chunkRowId + 1) * ROW_OFFSET_SIZE); + if (valueEndOffsetInChunk == 0) { + // Last row in the last chunk (chunk is incomplete, which stores 0 as the offset for the absent rows) + return _dataBuffer.size(); + } else { + return chunkStartOffset + valueEndOffsetInChunk; + } + } + } else { + if (chunkRowId == _numDocsPerChunk - 1) { + // Last row in the chunk + return getChunkPosition(chunkId + 1); + } else { + return chunkStartOffset + _dataBuffer + .getInt(chunkStartOffset + (long) (chunkRowId + 1) * ROW_OFFSET_SIZE); + } + } + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java index aba55ca..15b02d6 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java @@ -167,7 +167,11 @@ class FilePerIndexDirectory extends ColumnIndexDirectory { fileExtension = V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION; } } else { - fileExtension = V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION; + if (!columnMetadata.hasDictionary()) { + fileExtension = V1Constants.Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION; + } else { + fileExtension = V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION; + } } break; case INVERTED_INDEX: diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java new file mode 100644 index 0000000..a1f6e2c --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.index.creator; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteOrder; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator; +import org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader.ChunkReaderContext; +import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader; +import org.apache.pinot.segment.spi.V1Constants.Indexes; +import org.apache.pinot.segment.spi.compression.ChunkCompressionType; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class MultiValueVarByteRawIndexCreatorTest { + + private static final String OUTPUT_DIR = + System.getProperty("java.io.tmpdir") + File.separator + "mvVarRawTest"; + + @BeforeClass + public void setup() throws Exception { + FileUtils.forceMkdir(new File(OUTPUT_DIR)); + } + + /** + * Clean up after test + */ + @AfterClass + public void cleanup() { + FileUtils.deleteQuietly(new File(OUTPUT_DIR)); + } + + @Test + public void testMVString() throws IOException { + String column = "testCol"; + int numDocs = 1000; + int maxElements = 50; + int maxTotalLength = 500; + File file = new File(OUTPUT_DIR, column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION); + file.delete(); + MultiValueVarByteRawIndexCreator creator = new MultiValueVarByteRawIndexCreator( + new File(OUTPUT_DIR), ChunkCompressionType.SNAPPY, column, numDocs, DataType.STRING, maxTotalLength); + List<String[]> inputs = new ArrayList<>(); + Random random = new Random(); + for (int i = 0; i < numDocs; i++) { + //int length = 1; + int length = random.nextInt(10); + String[] values = new String[length]; + for (int j = 0; j < length; j++) { + char[] value = new char[length]; + Arrays.fill(value, 'a'); + values[j] = new String(value); + } + inputs.add(values); + creator.putStringMV(values); + } + creator.close(); + + //read + final PinotDataBuffer buffer = PinotDataBuffer + .mapFile(file, true, 0, file.length(), ByteOrder.BIG_ENDIAN, ""); + VarByteChunkMVForwardIndexReader reader = new VarByteChunkMVForwardIndexReader(buffer, + DataType.STRING); + final ChunkReaderContext context = reader.createContext(); + String[] values = new String[maxElements]; + for (int i = 0; i < numDocs; i++) { + int length = reader.getStringMV(i, values, context); + String[] readValue = Arrays.copyOf(values, length); + Assert.assertEquals(inputs.get(i), readValue); + } + } + + @Test + public void testMVBytes() throws IOException { + String column = "testCol"; + int numDocs = 1000; + int maxElements = 50; + int maxTotalLength = 500; + File file = new File(OUTPUT_DIR, column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION); + file.delete(); + MultiValueVarByteRawIndexCreator creator = new MultiValueVarByteRawIndexCreator( + new File(OUTPUT_DIR), ChunkCompressionType.SNAPPY, column, numDocs, DataType.BYTES, + maxTotalLength); + List<byte[][]> inputs = new ArrayList<>(); + Random random = new Random(); + for (int i = 0; i < numDocs; i++) { + //int length = 1; + int length = random.nextInt(10); + byte[][] values = new byte[length][]; + for (int j = 0; j < length; j++) { + char[] value = new char[length]; + Arrays.fill(value, 'a'); + values[j] = new String(value).getBytes(); + } + inputs.add(values); + creator.putBytesMV(values); + } + creator.close(); + + //read + final PinotDataBuffer buffer = PinotDataBuffer + .mapFile(file, true, 0, file.length(), ByteOrder.BIG_ENDIAN, ""); + VarByteChunkMVForwardIndexReader reader = new VarByteChunkMVForwardIndexReader(buffer, + DataType.BYTES); + final ChunkReaderContext context = reader.createContext(); + byte[][] values = new byte[maxElements][]; + for (int i = 0; i < numDocs; i++) { + int length = reader.getBytesMV(i, values, context); + byte[][] readValue = Arrays.copyOf(values, length); + for (int j = 0; j < length; j++) { + Assert.assertTrue(Arrays.equals(inputs.get(i)[j], readValue[j])); + } + } + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java index 9f515e8..d8cafdc 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java @@ -21,6 +21,7 @@ package org.apache.pinot.segment.local.segment.index.creator; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -32,6 +33,7 @@ import org.apache.pinot.segment.local.loader.LocalSegmentDirectoryLoader; import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; import org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader; import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader; +import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader; import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader; import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader; import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; @@ -60,6 +62,7 @@ import org.testng.annotations.Test; * Class for testing Raw index creators. */ public class RawIndexCreatorTest { + private static final int NUM_ROWS = 10009; private static final int MAX_STRING_LENGTH = 101; @@ -71,6 +74,12 @@ public class RawIndexCreatorTest { private static final String FLOAT_COLUMN = "floatColumn"; private static final String DOUBLE_COLUMN = "doubleColumn"; private static final String STRING_COLUMN = "stringColumn"; + private static final String INT_MV_COLUMN = "intMVColumn"; + private static final String LONG_MV_COLUMN = "longMVColumn"; + private static final String FLOAT_MV_COLUMN = "floatMVColumn"; + private static final String DOUBLE_MV_COLUMN = "doubleMVColumn"; + private static final String STRING_MV_COLUMN = "stringMVColumn"; + private static final String BYTES_MV_COLUMN = "bytesMVColumn"; Random _random; private RecordReader _recordReader; @@ -79,8 +88,6 @@ public class RawIndexCreatorTest { /** * Setup to build a segment with raw indexes (no-dictionary) of various data types. - * - * @throws Exception */ @BeforeClass public void setup() @@ -91,8 +98,15 @@ public class RawIndexCreatorTest { schema.addField(new DimensionFieldSpec(FLOAT_COLUMN, DataType.FLOAT, true)); schema.addField(new DimensionFieldSpec(DOUBLE_COLUMN, DataType.DOUBLE, true)); schema.addField(new DimensionFieldSpec(STRING_COLUMN, DataType.STRING, true)); + schema.addField(new DimensionFieldSpec(INT_MV_COLUMN, DataType.INT, false)); + schema.addField(new DimensionFieldSpec(LONG_MV_COLUMN, DataType.LONG, false)); + schema.addField(new DimensionFieldSpec(FLOAT_MV_COLUMN, DataType.FLOAT, false)); + schema.addField(new DimensionFieldSpec(DOUBLE_MV_COLUMN, DataType.DOUBLE, false)); + schema.addField(new DimensionFieldSpec(STRING_MV_COLUMN, DataType.STRING, false)); + schema.addField(new DimensionFieldSpec(BYTES_MV_COLUMN, DataType.BYTES, false)); - TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("test").build(); + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("test") + .build(); _random = new Random(System.nanoTime()); _recordReader = buildIndex(tableConfig, schema); @@ -109,7 +123,6 @@ public class RawIndexCreatorTest { /** * Test for int raw index creator. * Compares values read from the raw index against expected value. - * @throws Exception */ @Test public void testIntRawIndexCreator() @@ -120,7 +133,6 @@ public class RawIndexCreatorTest { /** * Test for long raw index creator. * Compares values read from the raw index against expected value. - * @throws Exception */ @Test public void testLongRawIndexCreator() @@ -131,7 +143,6 @@ public class RawIndexCreatorTest { /** * Test for float raw index creator. * Compares values read from the raw index against expected value. - * @throws Exception */ @Test public void testFloatRawIndexCreator() @@ -142,7 +153,6 @@ public class RawIndexCreatorTest { /** * Test for double raw index creator. * Compares values read from the raw index against expected value. - * @throws Exception */ @Test public void testDoubleRawIndexCreator() @@ -153,19 +163,21 @@ public class RawIndexCreatorTest { /** * Test for string raw index creator. * Compares values read from the raw index against expected value. - * @throws Exception */ @Test public void testStringRawIndexCreator() throws Exception { PinotDataBuffer indexBuffer = getIndexBufferForColumn(STRING_COLUMN); - try (VarByteChunkSVForwardIndexReader rawIndexReader = new VarByteChunkSVForwardIndexReader(indexBuffer, + try (VarByteChunkSVForwardIndexReader rawIndexReader = new VarByteChunkSVForwardIndexReader( + indexBuffer, DataType.STRING); - BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext = rawIndexReader.createContext()) { + BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext = rawIndexReader + .createContext()) { _recordReader.rewind(); for (int row = 0; row < NUM_ROWS; row++) { GenericRow expectedRow = _recordReader.next(); - Assert.assertEquals(rawIndexReader.getString(row, readerContext), expectedRow.getValue(STRING_COLUMN)); + Assert.assertEquals(rawIndexReader.getString(row, readerContext), + expectedRow.getValue(STRING_COLUMN)); } } } @@ -175,17 +187,88 @@ public class RawIndexCreatorTest { * * @param column Column for which to perform the test * @param dataType Data type of the column - * @throws Exception */ private void testFixedLengthRawIndexCreator(String column, DataType dataType) throws Exception { PinotDataBuffer indexBuffer = getIndexBufferForColumn(column); - try (FixedByteChunkSVForwardIndexReader rawIndexReader = new FixedByteChunkSVForwardIndexReader(indexBuffer, - dataType); BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext = rawIndexReader.createContext()) { + try (FixedByteChunkSVForwardIndexReader rawIndexReader = new FixedByteChunkSVForwardIndexReader( + indexBuffer, + dataType); BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext = rawIndexReader + .createContext()) { + _recordReader.rewind(); + for (int row = 0; row < NUM_ROWS; row++) { + GenericRow expectedRow = _recordReader.next(); + Assert.assertEquals(readValueFromIndex(rawIndexReader, readerContext, row), + expectedRow.getValue(column)); + } + } + } + + /** + * Test for multi value string raw index creator. + * Compares values read from the raw index against expected value. + */ + @Test + public void testStringMVRawIndexCreator() + throws Exception { + PinotDataBuffer indexBuffer = getIndexBufferForColumn(STRING_MV_COLUMN); + try (VarByteChunkMVForwardIndexReader rawIndexReader = new VarByteChunkMVForwardIndexReader( + indexBuffer, + DataType.STRING); + BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext = rawIndexReader + .createContext()) { _recordReader.rewind(); + int maxNumberOfMultiValues = _segmentDirectory.getSegmentMetadata() + .getColumnMetadataFor(STRING_MV_COLUMN).getMaxNumberOfMultiValues(); + final String[] valueBuffer = new String[maxNumberOfMultiValues]; for (int row = 0; row < NUM_ROWS; row++) { GenericRow expectedRow = _recordReader.next(); - Assert.assertEquals(readValueFromIndex(rawIndexReader, readerContext, row), expectedRow.getValue(column)); + + int length = rawIndexReader.getStringMV(row, valueBuffer, readerContext); + String[] readValue = Arrays.copyOf(valueBuffer, length); + Object[] writtenValue = (Object[]) expectedRow.getValue(STRING_MV_COLUMN); + if (writtenValue == null || writtenValue.length == 0) { + writtenValue = new String[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_STRING}; + } + for (int i = 0; i < length; i++) { + Object expected = writtenValue[i]; + Object actual = readValue[i]; + Assert.assertEquals(expected, actual); + } + } + } + } + + /** + * Test for multi value string raw index creator. + * Compares values read from the raw index against expected value. + */ + @Test + public void testBytesMVRawIndexCreator() + throws Exception { + PinotDataBuffer indexBuffer = getIndexBufferForColumn(BYTES_MV_COLUMN); + try (VarByteChunkMVForwardIndexReader rawIndexReader = new VarByteChunkMVForwardIndexReader( + indexBuffer, DataType.BYTES); + BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext = rawIndexReader + .createContext()) { + _recordReader.rewind(); + int maxNumberOfMultiValues = _segmentDirectory.getSegmentMetadata() + .getColumnMetadataFor(BYTES_MV_COLUMN).getMaxNumberOfMultiValues(); + final byte[][] valueBuffer = new byte[maxNumberOfMultiValues][]; + for (int row = 0; row < NUM_ROWS; row++) { + GenericRow expectedRow = _recordReader.next(); + + int length = rawIndexReader.getBytesMV(row, valueBuffer, readerContext); + byte[][] readValue = Arrays.copyOf(valueBuffer, length); + Object[] writtenValue = (Object[]) expectedRow.getValue(BYTES_MV_COLUMN); + if (writtenValue == null || writtenValue.length == 0) { + writtenValue = new byte[][]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_BYTES}; + } + for (int i = 0; i < length; i++) { + Object expected = writtenValue[i]; + Object actual = readValue[i]; + Assert.assertTrue(Arrays.equals((byte[]) expected, (byte[]) actual)); + } } } } @@ -205,7 +288,6 @@ public class RawIndexCreatorTest { * Helper method to build a segment containing a single valued string column with RAW (no-dictionary) index. * * @return Array of string values for the rows in the generated index. - * @throws Exception */ private RecordReader buildIndex(TableConfig tableConfig, Schema schema) throws Exception { @@ -221,9 +303,17 @@ public class RawIndexCreatorTest { for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) { Object value; - - value = getRandomValue(_random, fieldSpec.getDataType()); - map.put(fieldSpec.getName(), value); + if (fieldSpec.isSingleValueField()) { + value = getRandomValue(_random, fieldSpec.getDataType()); + map.put(fieldSpec.getName(), value); + } else { + int length = _random.nextInt(50); + Object[] values = new Object[length]; + for (int j = 0; j < length; j++) { + values[j] = getRandomValue(_random, fieldSpec.getDataType()); + } + map.put(fieldSpec.getName(), values); + } } GenericRow genericRow = new GenericRow(); @@ -263,8 +353,13 @@ public class RawIndexCreatorTest { case STRING: return StringUtil .sanitizeStringValue(RandomStringUtils.random(random.nextInt(MAX_STRING_LENGTH)), Integer.MAX_VALUE); + case BYTES: + return StringUtil + .sanitizeStringValue(RandomStringUtils.random(random.nextInt(MAX_STRING_LENGTH)), Integer.MAX_VALUE) + .getBytes(); default: - throw new UnsupportedOperationException("Unsupported data type for random value generator: " + dataType); + throw new UnsupportedOperationException( + "Unsupported data type for random value generator: " + dataType); } } diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java index 1d1a8a5..744f0bc 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java @@ -39,6 +39,7 @@ public class V1Constants { public static final String UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION = ".sv.unsorted.fwd"; public static final String SORTED_SV_FORWARD_INDEX_FILE_EXTENSION = ".sv.sorted.fwd"; public static final String RAW_SV_FORWARD_INDEX_FILE_EXTENSION = ".sv.raw.fwd"; + public static final String RAW_MV_FORWARD_INDEX_FILE_EXTENSION = ".mv.raw.fwd"; public static final String UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION = ".mv.fwd"; public static final String BITMAP_INVERTED_INDEX_FILE_EXTENSION = ".bitmap.inv"; public static final String BITMAP_RANGE_INDEX_FILE_EXTENSION = ".bitmap.range"; diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressor.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressor.java index 9415de8..d636fb1 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressor.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/ChunkCompressor.java @@ -38,4 +38,6 @@ public interface ChunkCompressor { */ int compress(ByteBuffer inUncompressed, ByteBuffer outCompressed) throws IOException; + + int maxCompressedSize(int uncompressedSize); } diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ColumnIndexCreationInfo.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ColumnIndexCreationInfo.java index da30237..9707107 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ColumnIndexCreationInfo.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ColumnIndexCreationInfo.java @@ -85,6 +85,10 @@ public class ColumnIndexCreationInfo implements Serializable { return _columnStatistics.getMaxNumberOfMultiValues(); } + public int getMaxRowLengthInBytes() { + return _columnStatistics.getMaxRowLengthInBytes(); + } + public boolean isAutoGenerated() { return _isAutoGenerated; } diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ColumnStatistics.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ColumnStatistics.java index 825c331..70bd584 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ColumnStatistics.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ColumnStatistics.java @@ -83,6 +83,13 @@ public interface ColumnStatistics extends Serializable { int getMaxNumberOfMultiValues(); /** + * @return the length of the largest row in bytes for variable length types + */ + default int getMaxRowLengthInBytes() { + return -1; + } + + /** * @return Returns if any of the values have nulls in the segments. */ boolean hasNull(); diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/ForwardIndexCreator.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/ForwardIndexCreator.java index dee4db1..e5a21e9 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/ForwardIndexCreator.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/ForwardIndexCreator.java @@ -173,4 +173,13 @@ public interface ForwardIndexCreator extends Closeable { default void putStringMV(String[] values) { throw new UnsupportedOperationException(); } + + /** + * Writes the next byte[] type multi-value into the forward index. + * + * @param values Values to write + */ + default void putBytesMV(byte[][] values) { + throw new UnsupportedOperationException(); + } } diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java index fb92bec..6393aaf 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java @@ -242,4 +242,23 @@ public interface ForwardIndexReader<T extends ForwardIndexReaderContext> extends default int getStringMV(int docId, String[] valueBuffer, T context) { throw new UnsupportedOperationException(); } + + /** + * Reads the bytes type multi-value at the given document id into the passed in value buffer (the buffer size must + * be enough to hold all the values for the multi-value entry) and returns the number of values within the multi-value + * entry. + * + * @param docId Document id + * @param valueBuffer Value buffer + * @param context Reader context + * @return Number of values within the multi-value entry + */ + default int getBytesMV(int docId, byte[][] valueBuffer, T context) { + throw new UnsupportedOperationException(); + } + + default int getFloatMV(int docId, float[] valueBuffer, T context, int[] parentIndices) { + throw new UnsupportedOperationException(); + } + } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java index b506106..aa41810 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java @@ -309,7 +309,7 @@ public class DictionaryToRawIndexConverter { int lengthOfLongestEntry = (storedType == DataType.STRING) ? getLengthOfLongestEntry(dictionary) : -1; try (ForwardIndexCreator rawIndexCreator = SegmentColumnarIndexCreator - .getRawIndexCreatorForColumn(newSegment, compressionType, column, storedType, numDocs, lengthOfLongestEntry, + .getRawIndexCreatorForSVColumn(newSegment, compressionType, column, storedType, numDocs, lengthOfLongestEntry, false, BaseChunkSVForwardIndexWriter.DEFAULT_VERSION); ForwardIndexReaderContext readerContext = reader.createContext()) { switch (storedType) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org