This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new d654cc9db2 Add a new MV forward index to only store unique MV values (#11993) d654cc9db2 is described below commit d654cc9db2c8561d8b7856c316c6a00c73cc7ff5 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Tue Nov 28 15:40:42 2023 -0800 Add a new MV forward index to only store unique MV values (#11993) --- .../pinot/core/startree/v2/BaseStarTreeV2Test.java | 7 +- .../tests/OfflineClusterIntegrationTest.java | 48 +++- .../local/io/util/FixedBitIntReaderWriter.java | 8 + .../segment/local/io/util/PinotDataBitSet.java | 6 + .../FixedBitMVEntryDictForwardIndexWriter.java | 126 +++++++++ .../MultiValueEntryDictForwardIndexCreator.java | 67 +++++ .../index/forward/ForwardIndexCreatorFactory.java | 10 +- .../index/forward/ForwardIndexReaderFactory.java | 11 +- .../segment/index/forward/ForwardIndexType.java | 78 ++---- .../segment/index/loader/ForwardIndexHandler.java | 284 ++++++++++----------- .../segment/index/loader/IndexLoadingConfig.java | 15 +- .../defaultcolumn/BaseDefaultColumnHandler.java | 19 +- .../FixedBitMVEntryDictForwardIndexReader.java | 155 +++++++++++ .../segment/local/utils/TableConfigUtils.java | 106 ++++---- .../FixedBitMVEntryDictForwardIndexTest.java | 123 +++++++++ .../index/forward/ForwardIndexTypeTest.java | 56 ++-- .../index/loader/ForwardIndexHandlerTest.java | 151 ++++++++--- .../index/loader/SegmentPreProcessorTest.java | 27 +- .../segment/local/utils/TableConfigUtilsTest.java | 60 +++-- .../spi/compression/DictIdCompressionType.java | 47 ++++ .../segment/spi/index/ForwardIndexConfig.java | 81 ++++-- .../spi/index/reader/ForwardIndexReader.java | 11 +- .../apache/pinot/spi/config/table/FieldConfig.java | 24 +- 23 files changed, 1121 insertions(+), 399 deletions(-) diff --git a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java index 3787363d17..63cc2c2bda 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java @@ -462,7 +462,12 @@ abstract class BaseStarTreeV2Test<R, A> { */ CompressionCodec getCompressionCodec() { CompressionCodec[] compressionCodecs = CompressionCodec.values(); - return compressionCodecs[RANDOM.nextInt(compressionCodecs.length)]; + while (true) { + CompressionCodec compressionCodec = compressionCodecs[RANDOM.nextInt(compressionCodecs.length)]; + if (compressionCodec.isApplicableToRawIndex()) { + return compressionCodec; + } + } } abstract ValueAggregator<R, A> getValueAggregator(); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index 824991d86f..86bafbd2f0 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -64,6 +64,8 @@ import org.apache.pinot.core.operator.query.NonScanBasedAggregationOperator; import org.apache.pinot.segment.spi.index.StandardIndexes; import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair; import org.apache.pinot.spi.config.instance.InstanceType; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec; import org.apache.pinot.spi.config.table.IndexingConfig; import org.apache.pinot.spi.config.table.QueryConfig; import org.apache.pinot.spi.config.table.StarTreeIndexConfig; @@ -154,7 +156,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet private static final String COLUMN_LENGTH_MAP_KEY = "columnLengthMap"; private static final String COLUMN_CARDINALITY_MAP_KEY = "columnCardinalityMap"; private static final String MAX_NUM_MULTI_VALUES_MAP_KEY = "maxNumMultiValuesMap"; - private static final int DISK_SIZE_IN_BYTES = 20798784; + private static final int DISK_SIZE_IN_BYTES = 20277762; private static final int NUM_ROWS = 115545; private final List<ServiceStatus.ServiceStatusCallback> _serviceStatusCallbacks = @@ -178,6 +180,13 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet return _schemaFileName; } + @Override + protected List<FieldConfig> getFieldConfigs() { + return Collections.singletonList( + new FieldConfig("DivAirports", FieldConfig.EncodingType.DICTIONARY, Collections.emptyList(), + CompressionCodec.MV_ENTRY_DICT, null)); + } + @BeforeClass public void setUp() throws Exception { @@ -645,7 +654,8 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet } @Test - public void testMaxQueryResponseSizeTableConfig() throws Exception { + public void testMaxQueryResponseSizeTableConfig() + throws Exception { TableConfig tableConfig = getOfflineTableConfig(); tableConfig.setQueryConfig(new QueryConfig(null, false, null, null, 1000L, null)); updateTableConfig(tableConfig); @@ -677,7 +687,8 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet } @Test - public void testMaxServerResponseSizeTableConfig() throws Exception { + public void testMaxServerResponseSizeTableConfig() + throws Exception { TableConfig tableConfig = getOfflineTableConfig(); tableConfig.setQueryConfig(new QueryConfig(null, false, null, null, null, 1000L)); updateTableConfig(tableConfig); @@ -709,7 +720,8 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet } @Test - public void testMaxResponseSizeTableConfigOrdering() throws Exception { + public void testMaxResponseSizeTableConfigOrdering() + throws Exception { TableConfig tableConfig = getOfflineTableConfig(); tableConfig.setQueryConfig(new QueryConfig(null, false, null, null, 1000000L, 1000L)); updateTableConfig(tableConfig); @@ -1569,12 +1581,39 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet IngestionConfig ingestionConfig = new IngestionConfig(); ingestionConfig.setTransformConfigs(transformConfigs); tableConfig.setIngestionConfig(ingestionConfig); + List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList(); + assertNotNull(fieldConfigList); + fieldConfigList.add( + new FieldConfig("NewAddedDerivedDivAirportSeqIDs", FieldConfig.EncodingType.DICTIONARY, Collections.emptyList(), + CompressionCodec.MV_ENTRY_DICT, null)); + fieldConfigList.add(new FieldConfig("NewAddedDerivedDivAirportSeqIDsString", FieldConfig.EncodingType.DICTIONARY, + Collections.emptyList(), CompressionCodec.MV_ENTRY_DICT, null)); updateTableConfig(tableConfig); // Trigger reload reloadAllSegments(TEST_EXTRA_COLUMNS_QUERY, false, numTotalDocs); assertEquals(postQuery(TEST_EXTRA_COLUMNS_QUERY).get("resultTable").get("rows").get(0).get(0).asLong(), numTotalDocs); + + // Verify the index sizes + JsonNode columnIndexSizeMap = JsonUtils.stringToJsonNode(sendGetRequest( + getControllerBaseApiUrl() + "/tables/mytable/metadata?columns=DivAirportSeqIDs" + + "&columns=NewAddedDerivedDivAirportSeqIDs&columns=NewAddedDerivedDivAirportSeqIDsString")) + .get("columnIndexSizeMap"); + assertEquals(columnIndexSizeMap.size(), 3); + JsonNode originalColumnIndexSizes = columnIndexSizeMap.get("DivAirportSeqIDs"); + JsonNode derivedColumnIndexSizes = columnIndexSizeMap.get("NewAddedDerivedDivAirportSeqIDs"); + JsonNode derivedStringColumnIndexSizes = columnIndexSizeMap.get("NewAddedDerivedDivAirportSeqIDsString"); + + // Derived int column should have the same dictionary size as the original column + double originalColumnDictionarySize = originalColumnIndexSizes.get("dictionary").asDouble(); + assertEquals(derivedColumnIndexSizes.get("dictionary").asDouble(), originalColumnDictionarySize); + // Derived string column should have larger dictionary size than the original column + assertTrue(derivedStringColumnIndexSizes.get("dictionary").asDouble() > originalColumnDictionarySize); + // Both derived columns should have smaller forward index size than the original column because of compression + double derivedColumnForwardIndexSize = derivedColumnIndexSizes.get("forward_index").asDouble(); + assertTrue(derivedColumnForwardIndexSize < originalColumnIndexSizes.get("forward_index").asDouble()); + assertEquals(derivedStringColumnIndexSizes.get("forward_index").asDouble(), derivedColumnForwardIndexSize); } private void reloadWithMissingColumns() @@ -1582,6 +1621,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet // Remove columns from the table config first to pass the validation of the table config TableConfig tableConfig = getOfflineTableConfig(); tableConfig.setIngestionConfig(null); + tableConfig.setFieldConfigList(getFieldConfigs()); updateTableConfig(tableConfig); // Need to first delete then add the schema because removing columns is backward-incompatible change diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/FixedBitIntReaderWriter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/FixedBitIntReaderWriter.java index 76e1a154c8..3ef165740f 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/FixedBitIntReaderWriter.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/FixedBitIntReaderWriter.java @@ -50,6 +50,14 @@ public final class FixedBitIntReaderWriter implements Closeable { _dataBitSet.writeInt(startIndex, _numBitsPerValue, length, values); } + public int getStartByteOffset(int index) { + return (int) (((long) index * _numBitsPerValue) / Byte.SIZE); + } + + public int getEndByteOffset(int index) { + return (int) (((long) (index + 1) * _numBitsPerValue - 1) / Byte.SIZE) + 1; + } + @Override public void close() { _dataBitSet.close(); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/PinotDataBitSet.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/PinotDataBitSet.java index 4ed84a2c0c..a42665e941 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/PinotDataBitSet.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/PinotDataBitSet.java @@ -102,6 +102,9 @@ public final class PinotDataBitSet implements Closeable { } public void readInt(int startIndex, int numBitsPerValue, int length, int[] buffer) { + if (length == 0) { + return; + } long startBitOffset = (long) startIndex * numBitsPerValue; int byteOffset = (int) (startBitOffset / Byte.SIZE); int bitOffsetInFirstByte = (int) (startBitOffset % Byte.SIZE); @@ -167,6 +170,9 @@ public final class PinotDataBitSet implements Closeable { } public void writeInt(int startIndex, int numBitsPerValue, int length, int[] values) { + if (length == 0) { + return; + } long startBitOffset = (long) startIndex * numBitsPerValue; int byteOffset = (int) (startBitOffset / Byte.SIZE); int bitOffsetInFirstByte = (int) (startBitOffset % Byte.SIZE); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedBitMVEntryDictForwardIndexWriter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedBitMVEntryDictForwardIndexWriter.java new file mode 100644 index 0000000000..80a9a3dfa3 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedBitMVEntryDictForwardIndexWriter.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.io.writer.impl; + +import it.unimi.dsi.fastutil.ints.IntArrayList; +import it.unimi.dsi.fastutil.objects.Object2IntMap; +import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.nio.ByteOrder; +import org.apache.pinot.segment.local.io.util.FixedBitIntReaderWriter; +import org.apache.pinot.segment.local.io.util.PinotDataBitSet; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; + + +/** + * Bit-compressed dictionary-encoded forward index writer for multi-value columns, where a second level dictionary + * encoding for multi-value entries (instead of individual values within the entry) are maintained within the forward + * index. + * + * Index layout: + * - Index header (24 bytes) + * - ID buffer (stores the multi-value entry id for each doc id) + * - Offset buffer (stores the start offset of each multi-value entry, followed by end offset of the last value) + * - Value buffer (stores the individual values) + * + * Header layout: + * - Magic marker (4 bytes) + * - Version (2 bytes) + * - Bits per value (1 byte) + * - Bits per id (1 byte) + * - Number of unique MV entries (4 bytes) + * - Number of total values (4 bytes) + * - Start offset of offset buffer (4 bytes) + * - Start offset of value buffer (4 bytes) + */ +public class FixedBitMVEntryDictForwardIndexWriter implements Closeable { + public static final int MAGIC_MARKER = 0xffabcdef; + public static final short VERSION = 1; + public static final int HEADER_SIZE = 24; + + private final Object2IntOpenHashMap<IntArrayList> _entryToIdMap = new Object2IntOpenHashMap<>(); + private final File _file; + private final int _numBitsPerValue; + private final IntArrayList _ids; + + public FixedBitMVEntryDictForwardIndexWriter(File file, int numDocs, int numBitsPerValue) { + _file = file; + _numBitsPerValue = numBitsPerValue; + _ids = new IntArrayList(numDocs); + } + + public void putDictIds(int[] dictIds) { + // Lookup the map, and create a new id when the entry is not found. + _ids.add(_entryToIdMap.computeIntIfAbsent(IntArrayList.wrap(dictIds), k -> _entryToIdMap.size())); + } + + @Override + public void close() + throws IOException { + int numUniqueEntries = _entryToIdMap.size(); + int[][] idToDictIdsMap = new int[numUniqueEntries][]; + int numTotalValues = 0; + for (Object2IntMap.Entry<IntArrayList> entry : _entryToIdMap.object2IntEntrySet()) { + int id = entry.getIntValue(); + int[] dictIds = entry.getKey().elements(); + idToDictIdsMap[id] = dictIds; + numTotalValues += dictIds.length; + } + int[] ids = _ids.elements(); + int numBitsPerId = PinotDataBitSet.getNumBitsPerValue(numUniqueEntries - 1); + int idBufferSize = (int) (((long) ids.length * numBitsPerId + 7) / 8); + int numBitsPerOffset = PinotDataBitSet.getNumBitsPerValue(numTotalValues); + int offsetBufferSize = (int) (((long) (numUniqueEntries + 1) * numBitsPerOffset + 7) / 8); + int valueBufferSize = (int) (((long) numTotalValues * _numBitsPerValue + 7) / 8); + int offsetBufferOffset = HEADER_SIZE + idBufferSize; + int valueBufferOffset = offsetBufferOffset + offsetBufferSize; + int indexSize = valueBufferOffset + valueBufferSize; + try (PinotDataBuffer indexBuffer = PinotDataBuffer.mapFile(_file, false, 0, indexSize, ByteOrder.BIG_ENDIAN, + getClass().getSimpleName())) { + indexBuffer.putInt(0, MAGIC_MARKER); + indexBuffer.putShort(4, VERSION); + indexBuffer.putByte(6, (byte) _numBitsPerValue); + indexBuffer.putByte(7, (byte) numBitsPerId); + indexBuffer.putInt(8, numUniqueEntries); + indexBuffer.putInt(12, numTotalValues); + indexBuffer.putInt(16, offsetBufferOffset); + indexBuffer.putInt(20, valueBufferOffset); + + try (FixedBitIntReaderWriter idWriter = new FixedBitIntReaderWriter( + indexBuffer.view(HEADER_SIZE, offsetBufferOffset), ids.length, numBitsPerId)) { + idWriter.writeInt(0, ids.length, ids); + } + try (FixedBitIntReaderWriter offsetWriter = new FixedBitIntReaderWriter( + indexBuffer.view(offsetBufferOffset, valueBufferOffset), numUniqueEntries + 1, numBitsPerOffset); + FixedBitIntReaderWriter valueWriter = new FixedBitIntReaderWriter( + indexBuffer.view(valueBufferOffset, indexSize), numTotalValues, _numBitsPerValue)) { + int startOffset = 0; + for (int i = 0; i < numUniqueEntries; i++) { + offsetWriter.writeInt(i, startOffset); + int[] dictIds = idToDictIdsMap[i]; + valueWriter.writeInt(startOffset, dictIds.length, dictIds); + startOffset += dictIds.length; + } + offsetWriter.writeInt(numUniqueEntries, startOffset); + } + } + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueEntryDictForwardIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueEntryDictForwardIndexCreator.java new file mode 100644 index 0000000000..2db269f038 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueEntryDictForwardIndexCreator.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.creator.impl.fwd; + +import java.io.File; +import java.io.IOException; +import org.apache.pinot.segment.local.io.util.PinotDataBitSet; +import org.apache.pinot.segment.local.io.writer.impl.FixedBitMVEntryDictForwardIndexWriter; +import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator; +import org.apache.pinot.spi.data.FieldSpec.DataType; + + +/** + * Forward index creator for dictionary-encoded multi-value column, where multi-value entries are dictionary encoded. + */ +public class MultiValueEntryDictForwardIndexCreator implements ForwardIndexCreator { + private final FixedBitMVEntryDictForwardIndexWriter _writer; + + public MultiValueEntryDictForwardIndexCreator(File outputDir, String column, int cardinality, int numDocs) { + File indexFile = new File(outputDir, column + V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION); + int numBitsPerValue = PinotDataBitSet.getNumBitsPerValue(cardinality - 1); + _writer = new FixedBitMVEntryDictForwardIndexWriter(indexFile, numDocs, numBitsPerValue); + } + + @Override + public boolean isDictionaryEncoded() { + return true; + } + + @Override + public boolean isSingleValue() { + return false; + } + + @Override + public DataType getValueType() { + return DataType.INT; + } + + @Override + public void putDictIdMV(int[] dictIds) { + _writer.putDictIds(dictIds); + } + + @Override + public void close() + throws IOException { + _writer.close(); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java index a57158b44f..ded5000b0c 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java @@ -21,6 +21,7 @@ package org.apache.pinot.segment.local.segment.index.forward; import java.io.File; import java.io.IOException; +import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueEntryDictForwardIndexCreator; import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueFixedByteRawIndexCreator; import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueUnsortedForwardIndexCreator; import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator; @@ -29,6 +30,7 @@ import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueSorted import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueUnsortedForwardIndexCreator; import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueVarByteRawIndexCreator; import org.apache.pinot.segment.spi.compression.ChunkCompressionType; +import org.apache.pinot.segment.spi.compression.DictIdCompressionType; import org.apache.pinot.segment.spi.creator.IndexCreationContext; import org.apache.pinot.segment.spi.index.ForwardIndexConfig; import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator; @@ -57,8 +59,12 @@ public class ForwardIndexCreatorFactory { return new SingleValueUnsortedForwardIndexCreator(indexDir, columnName, cardinality, numTotalDocs); } } else { - return new MultiValueUnsortedForwardIndexCreator(indexDir, columnName, cardinality, numTotalDocs, - context.getTotalNumberOfEntries()); + if (indexConfig.getDictIdCompressionType() == DictIdCompressionType.MV_ENTRY_DICT) { + return new MultiValueEntryDictForwardIndexCreator(indexDir, columnName, cardinality, numTotalDocs); + } else { + return new MultiValueUnsortedForwardIndexCreator(indexDir, columnName, cardinality, numTotalDocs, + context.getTotalNumberOfEntries()); + } } } else { // Dictionary disabled columns diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java index 8408a9026d..b5ca2b83c1 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java @@ -20,6 +20,7 @@ package org.apache.pinot.segment.local.segment.index.forward; import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4; +import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitMVEntryDictForwardIndexReader; import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitMVForwardIndexReader; import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitSVForwardIndexReaderV2; import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkMVForwardIndexReader; @@ -72,8 +73,14 @@ public class ForwardIndexReaderFactory extends IndexReaderFactory.Default<Forwar return new FixedBitSVForwardIndexReaderV2(dataBuffer, metadata.getTotalDocs(), metadata.getBitsPerElement()); } } else { - return new FixedBitMVForwardIndexReader(dataBuffer, metadata.getTotalDocs(), metadata.getTotalNumberOfEntries(), - metadata.getBitsPerElement()); + if (dataBuffer.size() > Integer.BYTES + && dataBuffer.getInt(0) == FixedBitMVEntryDictForwardIndexReader.MAGIC_MARKER) { + return new FixedBitMVEntryDictForwardIndexReader(dataBuffer, metadata.getTotalDocs(), + metadata.getBitsPerElement()); + } else { + return new FixedBitMVForwardIndexReader(dataBuffer, metadata.getTotalDocs(), + metadata.getTotalNumberOfEntries(), metadata.getBitsPerElement()); + } } } else { return createRawIndexReader(dataBuffer, metadata.getDataType().getStoredType(), metadata.isSingleValue()); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java index 369341be6c..a454bf9cfb 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java @@ -21,7 +21,6 @@ package org.apache.pinot.segment.local.segment.index.forward; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import java.io.IOException; import java.util.Collection; import java.util.Collections; @@ -29,7 +28,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.function.Supplier; import javax.annotation.Nullable; import org.apache.pinot.segment.local.realtime.impl.forward.FixedByteMVMutableForwardIndex; import org.apache.pinot.segment.local.realtime.impl.forward.FixedByteSVMutableForwardIndex; @@ -59,6 +57,7 @@ import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; import org.apache.pinot.segment.spi.memory.PinotDataBuffer; import org.apache.pinot.segment.spi.store.SegmentDirectory; import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; @@ -91,54 +90,29 @@ public class ForwardIndexType extends AbstractIndexType<ForwardIndexConfig, Forw @Override public Map<String, ForwardIndexConfig> fromIndexLoadingConfig(IndexLoadingConfig indexLoadingConfig) { - Set<String> disabledCols = indexLoadingConfig.getForwardIndexDisabledColumns(); + Set<String> disabledColumns = indexLoadingConfig.getForwardIndexDisabledColumns(); + Map<String, CompressionCodec> compressionCodecMap = indexLoadingConfig.getCompressionConfigs(); Map<String, ForwardIndexConfig> result = new HashMap<>(); - Set<String> allColumns = Sets.union(disabledCols, indexLoadingConfig.getAllKnownColumns()); - for (String column : allColumns) { - ChunkCompressionType compressionType = - indexLoadingConfig.getCompressionConfigs() != null - ? indexLoadingConfig.getCompressionConfigs().get(column) - : null; - Supplier<ForwardIndexConfig> defaultConfig = () -> { - if (compressionType == null) { - return ForwardIndexConfig.DEFAULT; - } else { - return new ForwardIndexConfig.Builder().withCompressionType(compressionType).build(); - } - }; - if (!disabledCols.contains(column)) { - TableConfig tableConfig = indexLoadingConfig.getTableConfig(); - if (tableConfig == null) { - result.put(column, defaultConfig.get()); - } else { - List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList(); - if (fieldConfigList == null) { - result.put(column, defaultConfig.get()); - continue; - } - FieldConfig fieldConfig = fieldConfigList.stream() - .filter(fc -> fc.getName().equals(column)) - .findAny() - .orElse(null); - if (fieldConfig == null) { - result.put(column, defaultConfig.get()); - continue; - } - ForwardIndexConfig.Builder builder = new ForwardIndexConfig.Builder(); - if (compressionType != null) { - builder.withCompressionType(compressionType); - } else { - FieldConfig.CompressionCodec compressionCodec = fieldConfig.getCompressionCodec(); - if (compressionCodec != null) { - builder.withCompressionType(ChunkCompressionType.valueOf(compressionCodec.name())); + for (String column : indexLoadingConfig.getAllKnownColumns()) { + ForwardIndexConfig forwardIndexConfig; + if (!disabledColumns.contains(column)) { + CompressionCodec compressionCodec = compressionCodecMap.get(column); + if (compressionCodec == null) { + TableConfig tableConfig = indexLoadingConfig.getTableConfig(); + if (tableConfig != null && tableConfig.getFieldConfigList() != null) { + FieldConfig fieldConfig = + tableConfig.getFieldConfigList().stream().filter(fc -> fc.getName().equals(column)).findAny() + .orElse(null); + if (fieldConfig != null) { + compressionCodec = fieldConfig.getCompressionCodec(); } } - - result.put(column, builder.build()); } + forwardIndexConfig = new ForwardIndexConfig.Builder().withCompressionCodec(compressionCodec).build(); } else { - result.put(column, ForwardIndexConfig.DISABLED); + forwardIndexConfig = ForwardIndexConfig.DISABLED; } + result.put(column, forwardIndexConfig); } return result; } @@ -169,13 +143,10 @@ public class ForwardIndexType extends AbstractIndexType<ForwardIndexConfig, Forw if (properties != null && isDisabled(properties)) { fwdConfig.put(fieldConfig.getName(), ForwardIndexConfig.DISABLED); } else { - DictionaryIndexConfig dictConfig = dictConfigs.get(fieldConfig.getName()); - if (dictConfig != null && dictConfig.isDisabled()) { - fwdConfig.put(fieldConfig.getName(), createConfigFromFieldConfig(fieldConfig)); + ForwardIndexConfig config = createConfigFromFieldConfig(fieldConfig); + if (!config.equals(ForwardIndexConfig.DEFAULT)) { + fwdConfig.put(fieldConfig.getName(), config); } - // On other case encoding is DICTIONARY. We create the default forward index by default. That means that if - // field config indicates for example a compression while using encoding dictionary, the compression is - // ignored. // It is important to do not explicitly add the default value here in order to avoid exclusive problems with // the default `fromIndexes` deserializer. } @@ -193,17 +164,12 @@ public class ForwardIndexType extends AbstractIndexType<ForwardIndexConfig, Forw } private ForwardIndexConfig createConfigFromFieldConfig(FieldConfig fieldConfig) { - FieldConfig.CompressionCodec compressionCodec = fieldConfig.getCompressionCodec(); ForwardIndexConfig.Builder builder = new ForwardIndexConfig.Builder(); - if (compressionCodec != null) { - builder.withCompressionType(ChunkCompressionType.valueOf(compressionCodec.name())); - } - + builder.withCompressionCodec(fieldConfig.getCompressionCodec()); Map<String, String> properties = fieldConfig.getProperties(); if (properties != null) { builder.withLegacyProperties(properties); } - return builder.build(); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java index 5b5ad75de6..ac32a0a354 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java @@ -25,7 +25,6 @@ import java.math.BigDecimal; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -47,6 +46,7 @@ import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader; import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.V1Constants; import org.apache.pinot.segment.spi.compression.ChunkCompressionType; +import org.apache.pinot.segment.spi.compression.DictIdCompressionType; import org.apache.pinot.segment.spi.creator.IndexCreationContext; import org.apache.pinot.segment.spi.creator.SegmentVersion; import org.apache.pinot.segment.spi.creator.StatsCollectorConfig; @@ -98,11 +98,7 @@ public class ForwardIndexHandler extends BaseIndexHandler { private final Schema _schema; protected enum Operation { - DISABLE_FORWARD_INDEX, - ENABLE_FORWARD_INDEX, - DISABLE_DICTIONARY, - ENABLE_DICTIONARY, - CHANGE_RAW_INDEX_COMPRESSION_TYPE, + DISABLE_FORWARD_INDEX, ENABLE_FORWARD_INDEX, DISABLE_DICTIONARY, ENABLE_DICTIONARY, CHANGE_INDEX_COMPRESSION_TYPE } @VisibleForTesting @@ -147,8 +143,8 @@ public class ForwardIndexHandler extends BaseIndexHandler { ColumnMetadata columnMetadata = createForwardIndexIfNeeded(segmentWriter, column, false); if (columnMetadata.hasDictionary()) { if (!segmentWriter.hasIndexFor(column, StandardIndexes.dictionary())) { - throw new IllegalStateException(String.format("Dictionary should still exist after rebuilding " - + "forward index for dictionary column: %s", column)); + throw new IllegalStateException(String.format( + "Dictionary should still exist after rebuilding forward index for dictionary column: %s", column)); } } else { if (segmentWriter.hasIndexFor(column, StandardIndexes.dictionary())) { @@ -159,8 +155,9 @@ public class ForwardIndexHandler extends BaseIndexHandler { } break; case DISABLE_DICTIONARY: - Set<String> newForwardIndexDisabledColumns = FieldIndexConfigsUtil.columnsWithIndexDisabled( - _fieldIndexConfigs.keySet(), StandardIndexes.forward(), _fieldIndexConfigs); + Set<String> newForwardIndexDisabledColumns = + FieldIndexConfigsUtil.columnsWithIndexDisabled(_fieldIndexConfigs.keySet(), StandardIndexes.forward(), + _fieldIndexConfigs); if (newForwardIndexDisabledColumns.contains(column)) { removeDictionaryFromForwardIndexDisabledColumn(column, segmentWriter); if (segmentWriter.hasIndexFor(column, StandardIndexes.dictionary())) { @@ -177,8 +174,8 @@ public class ForwardIndexHandler extends BaseIndexHandler { throw new IllegalStateException(String.format("Forward index was not created for column: %s", column)); } break; - case CHANGE_RAW_INDEX_COMPRESSION_TYPE: - rewriteRawForwardIndexForCompressionChange(column, segmentWriter); + case CHANGE_INDEX_COMPRESSION_TYPE: + rewriteForwardIndexForCompressionChange(column, segmentWriter); break; default: throw new IllegalStateException("Unsupported operation for column " + column); @@ -197,26 +194,9 @@ public class ForwardIndexHandler extends BaseIndexHandler { return columnOperationsMap; } - // From existing column config. Set<String> existingAllColumns = _segmentDirectory.getSegmentMetadata().getAllColumns(); - Set<String> existingDictColumns = - segmentReader.toSegmentDirectory().getColumnsWithIndex(StandardIndexes.dictionary()); - Set<String> existingNoDictColumns = new HashSet<>(); - for (String column : existingAllColumns) { - if (!existingDictColumns.contains(column)) { - existingNoDictColumns.add(column); - } - } - - // Get list of columns with forward index and those without forward index - Set<String> existingForwardIndexColumns = - segmentReader.toSegmentDirectory().getColumnsWithIndex(StandardIndexes.forward()); - Set<String> existingForwardIndexDisabledColumns = new HashSet<>(); - for (String column : existingAllColumns) { - if (!existingForwardIndexColumns.contains(column)) { - existingForwardIndexDisabledColumns.add(column); - } - } + Set<String> existingDictColumns = _segmentDirectory.getColumnsWithIndex(StandardIndexes.dictionary()); + Set<String> existingForwardIndexColumns = _segmentDirectory.getColumnsWithIndex(StandardIndexes.forward()); for (String column : existingAllColumns) { if (_schema != null && !_schema.hasColumn(column)) { @@ -224,12 +204,14 @@ public class ForwardIndexHandler extends BaseIndexHandler { LOGGER.info("Column {} is not in schema, skipping updating forward index", column); continue; } + boolean existingHasDict = existingDictColumns.contains(column); + boolean existingHasFwd = existingForwardIndexColumns.contains(column); FieldIndexConfigs newConf = _fieldIndexConfigs.get(column); boolean newIsFwd = newConf.getConfig(StandardIndexes.forward()).isEnabled(); boolean newIsDict = newConf.getConfig(StandardIndexes.dictionary()).isEnabled(); boolean newIsRange = newConf.getConfig(StandardIndexes.range()).isEnabled(); - if (existingForwardIndexColumns.contains(column) && !newIsFwd) { + if (existingHasFwd && !newIsFwd) { // Existing column has a forward index. New column config disables the forward index ColumnMetadata columnMetadata = _segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column); @@ -240,13 +222,13 @@ public class ForwardIndexHandler extends BaseIndexHandler { continue; } - if (existingDictColumns.contains(column)) { + if (existingHasDict) { if (!newIsDict) { // Dictionary was also disabled. Just disable the dictionary and remove it along with the forward index // If range index exists, don't try to regenerate it on toggling the dictionary, throw an error instead - Preconditions.checkState(!newIsRange, - String.format("Must disable range (enabled) index to disable the dictionary and forward index for " - + "column: %s or refresh / back-fill the forward index", column)); + Preconditions.checkState(!newIsRange, String.format( + "Must disable range (enabled) index to disable the dictionary and forward index for column: %s or " + + "refresh / back-fill the forward index", column)); columnOperationsMap.put(column, Arrays.asList(Operation.DISABLE_FORWARD_INDEX, Operation.DISABLE_DICTIONARY)); } else { @@ -263,7 +245,7 @@ public class ForwardIndexHandler extends BaseIndexHandler { Arrays.asList(Operation.DISABLE_FORWARD_INDEX, Operation.ENABLE_DICTIONARY)); } } - } else if (existingForwardIndexDisabledColumns.contains(column) && newIsFwd) { + } else if (!existingHasFwd && newIsFwd) { // Existing column does not have a forward index. New column config enables the forward index ColumnMetadata columnMetadata = _segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column); @@ -277,37 +259,37 @@ public class ForwardIndexHandler extends BaseIndexHandler { // Get list of columns with inverted index Set<String> existingInvertedIndexColumns = segmentReader.toSegmentDirectory().getColumnsWithIndex(StandardIndexes.inverted()); - if (!existingDictColumns.contains(column) || !existingInvertedIndexColumns.contains(column)) { + if (!existingHasDict || !existingInvertedIndexColumns.contains(column)) { // If either dictionary or inverted index is missing on the column there is no way to re-generate the forward // index. Treat this as a no-op and log a warning. LOGGER.warn("Trying to enable the forward index for a column {} missing either the dictionary ({}) and / or " + "the inverted index ({}) is not possible. Either a refresh or back-fill is required to get the " - + "forward index, ignoring", column, existingDictColumns.contains(column) ? "enabled" : "disabled", + + "forward index, ignoring", column, existingHasDict ? "enabled" : "disabled", existingInvertedIndexColumns.contains(column) ? "enabled" : "disabled"); continue; } columnOperationsMap.put(column, Collections.singletonList(Operation.ENABLE_FORWARD_INDEX)); - } else if (existingForwardIndexDisabledColumns.contains(column) && !newIsFwd) { + } else if (!existingHasFwd) { // Forward index is disabled for the existing column and should remain disabled based on the latest config // Need some checks to see whether the dictionary is being enabled or disabled here and take appropriate actions // If the dictionary is not enabled on the existing column it must be on the new noDictionary column list. // Cannot enable the dictionary for a column with forward index disabled. - Preconditions.checkState(existingDictColumns.contains(column) || !newIsDict, + Preconditions.checkState(existingHasDict || !newIsDict, String.format("Cannot regenerate the dictionary for column %s with forward index disabled. Please " + "refresh or back-fill the data to add back the forward index", column)); - if (existingDictColumns.contains(column) && !newIsDict) { + if (existingHasDict && !newIsDict) { // Dictionary is currently enabled on this column but is supposed to be disabled. Remove the dictionary // and update the segment metadata If the range index exists then throw an error since we are not // regenerating the range index on toggling the dictionary - Preconditions.checkState(!newIsRange, - String.format("Must disable range (enabled) index to disable the dictionary for a forwardIndexDisabled " - + "column: %s or refresh / back-fill the forward index", column)); + Preconditions.checkState(!newIsRange, String.format( + "Must disable range (enabled) index to disable the dictionary for a forwardIndexDisabled column: %s or " + + "refresh / back-fill the forward index", column)); columnOperationsMap.put(column, Collections.singletonList(Operation.DISABLE_DICTIONARY)); } - } else if (existingNoDictColumns.contains(column) && newIsDict) { + } else if (!existingHasDict && newIsDict) { // Existing column is RAW. New column is dictionary enabled. if (_schema == null || _tableConfig == null) { // This can only happen in tests. @@ -315,25 +297,28 @@ public class ForwardIndexHandler extends BaseIndexHandler { continue; } ColumnMetadata existingColumnMetadata = _segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column); - if (DictionaryIndexType.ignoreDictionaryOverride( - _tableConfig.getIndexingConfig().isOptimizeDictionary(), + if (DictionaryIndexType.ignoreDictionaryOverride(_tableConfig.getIndexingConfig().isOptimizeDictionary(), _tableConfig.getIndexingConfig().isOptimizeDictionaryForMetrics(), - _tableConfig.getIndexingConfig().getNoDictionarySizeRatioThreshold(), - existingColumnMetadata.getFieldSpec(), _fieldIndexConfigs.get(column), - existingColumnMetadata.getCardinality(), + _tableConfig.getIndexingConfig().getNoDictionarySizeRatioThreshold(), existingColumnMetadata.getFieldSpec(), + _fieldIndexConfigs.get(column), existingColumnMetadata.getCardinality(), existingColumnMetadata.getTotalNumberOfEntries())) { columnOperationsMap.put(column, Collections.singletonList(Operation.ENABLE_DICTIONARY)); } - } else if (existingDictColumns.contains(column) && !newIsDict) { + } else if (existingHasDict && !newIsDict) { // Existing column has dictionary. New config for the column is RAW. if (shouldDisableDictionary(column, _segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column))) { columnOperationsMap.put(column, Collections.singletonList(Operation.DISABLE_DICTIONARY)); } - } else if (existingNoDictColumns.contains(column) && !newIsDict) { + } else if (!existingHasDict) { // Both existing and new column is RAW forward index encoded. Check if compression needs to be changed. // TODO: Also check if raw index version needs to be changed - if (shouldChangeCompressionType(column, segmentReader)) { - columnOperationsMap.put(column, Collections.singletonList(Operation.CHANGE_RAW_INDEX_COMPRESSION_TYPE)); + if (shouldChangeRawCompressionType(column, segmentReader)) { + columnOperationsMap.put(column, Collections.singletonList(Operation.CHANGE_INDEX_COMPRESSION_TYPE)); + } + } else { + // Both existing and new column is dictionary encoded. Check if compression needs to be changed. + if (shouldChangeDictIdCompressionType(column, segmentReader)) { + columnOperationsMap.put(column, Collections.singletonList(Operation.CHANGE_INDEX_COMPRESSION_TYPE)); } } } @@ -346,8 +331,9 @@ public class ForwardIndexHandler extends BaseIndexHandler { // Remove the dictionary and update the metadata to indicate that the dictionary is no longer present segmentWriter.removeIndex(column, StandardIndexes.dictionary()); String segmentName = _segmentDirectory.getSegmentMetadata().getName(); - LOGGER.info("Removed dictionary for noForwardIndex column. Updating metadata properties for segment={} and " - + "column={}", segmentName, column); + LOGGER.info( + "Removed dictionary for noForwardIndex column. Updating metadata properties for segment={} and column={}", + segmentName, column); Map<String, String> metadataProperties = new HashMap<>(); metadataProperties.put(getKeyFor(column, HAS_DICTIONARY), String.valueOf(false)); metadataProperties.put(getKeyFor(column, DICTIONARY_ELEMENT_SIZE), String.valueOf(0)); @@ -381,37 +367,63 @@ public class ForwardIndexHandler extends BaseIndexHandler { return true; } - private boolean shouldChangeCompressionType(String column, SegmentDirectory.Reader segmentReader) + private boolean shouldChangeRawCompressionType(String column, SegmentDirectory.Reader segmentReader) throws Exception { - ColumnMetadata existingColMetadata = _segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column); - // The compression type for an existing segment can only be determined by reading the forward index header. + ColumnMetadata existingColMetadata = _segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column); + ChunkCompressionType existingCompressionType; try (ForwardIndexReader<?> fwdIndexReader = ForwardIndexType.read(segmentReader, existingColMetadata)) { - ChunkCompressionType existingCompressionType = fwdIndexReader.getCompressionType(); + existingCompressionType = fwdIndexReader.getCompressionType(); Preconditions.checkState(existingCompressionType != null, "Existing compressionType cannot be null for raw forward index column=" + column); + } - // Get the new compression type. - ChunkCompressionType newCompressionType = _fieldIndexConfigs.get(column).getConfig(StandardIndexes.forward()) - .getChunkCompressionType(); + // Get the new compression type. + ChunkCompressionType newCompressionType = + _fieldIndexConfigs.get(column).getConfig(StandardIndexes.forward()).getChunkCompressionType(); - // Note that default compression type (PASS_THROUGH for metric and LZ4 for dimension) is not considered if the - // compressionType is not explicitly provided in tableConfig. This is to avoid incorrectly rewriting all the - // forward indexes during segmentReload when the default compressionType changes. - return newCompressionType != null && existingCompressionType != newCompressionType; + // Note that default compression type (PASS_THROUGH for metric and LZ4 for dimension) is not considered if the + // compressionType is not explicitly provided in tableConfig. This is to avoid incorrectly rewriting all the + // forward indexes during segmentReload when the default compressionType changes. + return newCompressionType != null && existingCompressionType != newCompressionType; + } + + private boolean shouldChangeDictIdCompressionType(String column, SegmentDirectory.Reader segmentReader) + throws Exception { + // The compression type for an existing segment can only be determined by reading the forward index header. + ColumnMetadata existingColMetadata = _segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column); + DictIdCompressionType existingCompressionType; + try (ForwardIndexReader<?> fwdIndexReader = ForwardIndexType.read(segmentReader, existingColMetadata)) { + existingCompressionType = fwdIndexReader.getDictIdCompressionType(); } + + // Get the new compression type. + DictIdCompressionType newCompressionType = + _fieldIndexConfigs.get(column).getConfig(StandardIndexes.forward()).getDictIdCompressionType(); + if (newCompressionType != null && !newCompressionType.isApplicable(existingColMetadata.isSingleValue())) { + newCompressionType = null; + } + + return existingCompressionType != newCompressionType; } - private void rewriteRawForwardIndexForCompressionChange(String column, SegmentDirectory.Writer segmentWriter) + private void rewriteForwardIndexForCompressionChange(String column, SegmentDirectory.Writer segmentWriter) throws Exception { ColumnMetadata existingColMetadata = _segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column); boolean isSingleValue = existingColMetadata.isSingleValue(); + boolean hasDictionary = existingColMetadata.hasDictionary(); File indexDir = _segmentDirectory.getSegmentMetadata().getIndexDir(); String segmentName = _segmentDirectory.getSegmentMetadata().getName(); File inProgress = new File(indexDir, column + ".fwd.inprogress"); - String fileExtension = isSingleValue ? V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION - : V1Constants.Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION; + String fileExtension; + if (isSingleValue) { + fileExtension = hasDictionary ? V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION + : V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION; + } else { + fileExtension = hasDictionary ? V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION + : V1Constants.Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION; + } File fwdIndexFile = new File(indexDir, column + fileExtension); if (!inProgress.exists()) { @@ -425,20 +437,7 @@ public class ForwardIndexHandler extends BaseIndexHandler { } LOGGER.info("Creating new forward index for segment={} and column={}", segmentName, column); - - // At this point, compressionConfigs is guaranteed to contain the column. If there's no entry in the map, we - // wouldn't have computed the CHANGE_RAW_COMPRESSION_TYPE operation for this column as compressionType changes - // are processed only if a valid compressionType is specified in fieldConfig. - ChunkCompressionType newCompressionType = _fieldIndexConfigs.get(column) - .getConfig(StandardIndexes.forward()).getChunkCompressionType(); - - if (isSingleValue) { - rewriteRawSVForwardIndexForCompressionChange(column, existingColMetadata, indexDir, segmentWriter, - newCompressionType); - } else { - rewriteRawMVForwardIndexForCompressionChange(column, existingColMetadata, indexDir, segmentWriter, - newCompressionType); - } + rewriteForwardIndexForCompressionChange(column, existingColMetadata, indexDir, segmentWriter); // We used the existing forward index to generate a new forward index. The existing forward index will be in V3 // format and the new forward index will be in V1 format. Remove the existing forward index as it is not needed @@ -454,61 +453,32 @@ public class ForwardIndexHandler extends BaseIndexHandler { LOGGER.info("Created forward index for segment: {}, column: {}", segmentName, column); } - private void rewriteRawMVForwardIndexForCompressionChange(String column, ColumnMetadata existingColMetadata, - File indexDir, SegmentDirectory.Writer segmentWriter, ChunkCompressionType newCompressionType) + private void rewriteForwardIndexForCompressionChange(String column, ColumnMetadata columnMetadata, File indexDir, + SegmentDirectory.Writer segmentWriter) throws Exception { - try (ForwardIndexReader<?> reader = ForwardIndexType.read(segmentWriter, existingColMetadata)) { - // For VarByte MV columns like String and Bytes, the storage representation of each row contains the following - // components: - // 1. bytes required to store the actual elements of the MV row (A) - // 2. bytes required to store the number of elements in the MV row (B) - // 3. bytes required to store the length of each MV element (C) - // - // lengthOfLongestEntry = A + B + C - // maxRowLengthInBytes = A + try (ForwardIndexReader<?> reader = ForwardIndexType.read(segmentWriter, columnMetadata)) { int lengthOfLongestEntry = reader.getLengthOfLongestEntry(); - int maxNumberOfMVEntries = existingColMetadata.getMaxNumberOfMultiValues(); - int maxRowLengthInBytes = - MultiValueVarByteRawIndexCreator.getMaxRowDataLengthInBytes(lengthOfLongestEntry, maxNumberOfMVEntries); - - IndexCreationContext context = IndexCreationContext.builder() - .withMaxRowLengthInBytes(maxRowLengthInBytes) - .withIndexDir(indexDir) - .withColumnMetadata(existingColMetadata) - .withLengthOfLongestEntry(lengthOfLongestEntry) - .build(); - - ForwardIndexConfig config = _fieldIndexConfigs.get(column).getConfig(StandardIndexes.forward()); - - try (ForwardIndexCreator creator = StandardIndexes.forward().createIndexCreator(context, config)) { - if (!reader.getStoredType().equals(creator.getValueType())) { - // Creator stored type should match reader stored type for raw columns. We do not support changing datatypes. - String failureMsg = - "Unsupported operation to change datatype for column=" + column + " from " + reader.getStoredType() - .toString() + " to " + creator.getValueType().toString(); - throw new UnsupportedOperationException(failureMsg); - } - - int numDocs = existingColMetadata.getTotalDocs(); - forwardIndexRewriteHelper(column, existingColMetadata, reader, creator, numDocs, null, null); + IndexCreationContext context; + if (columnMetadata.isSingleValue()) { + context = IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(columnMetadata) + .withLengthOfLongestEntry(lengthOfLongestEntry).build(); + } else { + // For VarByte MV columns like String and Bytes, the storage representation of each row contains the following + // components: + // 1. bytes required to store the actual elements of the MV row (A) + // 2. bytes required to store the number of elements in the MV row (B) + // 3. bytes required to store the length of each MV element (C) + // + // lengthOfLongestEntry = A + B + C + // maxRowLengthInBytes = A + int maxNumValuesPerEntry = columnMetadata.getMaxNumberOfMultiValues(); + int maxRowLengthInBytes = + MultiValueVarByteRawIndexCreator.getMaxRowDataLengthInBytes(lengthOfLongestEntry, maxNumValuesPerEntry); + context = IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(columnMetadata) + .withLengthOfLongestEntry(lengthOfLongestEntry).withMaxRowLengthInBytes(maxRowLengthInBytes).build(); } - } - } - - private void rewriteRawSVForwardIndexForCompressionChange(String column, ColumnMetadata existingColMetadata, - File indexDir, SegmentDirectory.Writer segmentWriter, ChunkCompressionType newCompressionType) - throws Exception { - try (ForwardIndexReader<?> reader = ForwardIndexType.read(segmentWriter, existingColMetadata)) { - int lengthOfLongestEntry = reader.getLengthOfLongestEntry(); - - IndexCreationContext context = IndexCreationContext.builder() - .withIndexDir(indexDir) - .withColumnMetadata(existingColMetadata) - .withLengthOfLongestEntry(lengthOfLongestEntry) - .build(); ForwardIndexConfig config = _fieldIndexConfigs.get(column).getConfig(StandardIndexes.forward()); - try (ForwardIndexCreator creator = StandardIndexes.forward().createIndexCreator(context, config)) { if (!reader.getStoredType().equals(creator.getValueType())) { // Creator stored type should match reader stored type for raw columns. We do not support changing datatypes. @@ -518,8 +488,8 @@ public class ForwardIndexHandler extends BaseIndexHandler { throw new UnsupportedOperationException(failureMsg); } - int numDocs = existingColMetadata.getTotalDocs(); - forwardIndexRewriteHelper(column, existingColMetadata, reader, creator, numDocs, null, null); + int numDocs = columnMetadata.getTotalDocs(); + forwardIndexRewriteHelper(column, columnMetadata, reader, creator, numDocs, null, null); } } } @@ -528,16 +498,39 @@ public class ForwardIndexHandler extends BaseIndexHandler { ForwardIndexReader<?> reader, ForwardIndexCreator creator, int numDocs, @Nullable SegmentDictionaryCreator dictionaryCreator, @Nullable Dictionary dictionaryReader) { if (dictionaryReader == null && dictionaryCreator == null) { - // Read raw forward index and write raw forward index. - forwardIndexReadRawWriteRawHelper(column, existingColumnMetadata, reader, creator, numDocs); - } else if (dictionaryReader != null && dictionaryCreator == null) { + if (reader.isDictionaryEncoded()) { + Preconditions.checkState(creator.isDictionaryEncoded(), "Cannot change dictionary based forward index to raw " + + "forward index without providing dictionary reader for column: %s", column); + // Read dictionary based forward index and write dictionary based forward index. + forwardIndexReadDictWriteDictHelper(reader, creator, numDocs); + } else { + Preconditions.checkState(!creator.isDictionaryEncoded(), "Cannot change raw forward index to dictionary based " + + "forward index without providing dictionary creator for column: %s", column); + // Read raw forward index and write raw forward index. + forwardIndexReadRawWriteRawHelper(column, existingColumnMetadata, reader, creator, numDocs); + } + } else if (dictionaryCreator == null) { // Read dictionary based forward index and write raw forward index. forwardIndexReadDictWriteRawHelper(column, existingColumnMetadata, reader, creator, numDocs, dictionaryReader); } else if (dictionaryReader == null) { // Read raw forward index and write dictionary based forward index. forwardIndexReadRawWriteDictHelper(column, existingColumnMetadata, reader, creator, numDocs, dictionaryCreator); } else { - Preconditions.checkState(false, "Invalid dict-based read/write for column=" + column); + throw new IllegalStateException("One of dictionary reader or creator should be null for column: %s" + column); + } + } + + private <C extends ForwardIndexReaderContext> void forwardIndexReadDictWriteDictHelper(ForwardIndexReader<C> reader, + ForwardIndexCreator creator, int numDocs) { + C readerContext = reader.createContext(); + if (reader.isSingleValue()) { + for (int i = 0; i < numDocs; i++) { + creator.putDictId(reader.getDictId(i, readerContext)); + } + } else { + for (int i = 0; i < numDocs; i++) { + creator.putDictIdMV(reader.getDictIdMV(i, readerContext)); + } } } @@ -856,11 +849,10 @@ public class ForwardIndexHandler extends BaseIndexHandler { DictionaryIndexConfig dictConf = _fieldIndexConfigs.get(column).getConfig(StandardIndexes.dictionary()); - boolean useVarLength = dictConf.getUseVarLengthDictionary() - || DictionaryIndexType.shouldUseVarLengthDictionary(reader.getStoredType(), statsCollector); - SegmentDictionaryCreator dictionaryCreator = - new SegmentDictionaryCreator(existingColMetadata.getFieldSpec(), - _segmentDirectory.getSegmentMetadata().getIndexDir(), useVarLength); + boolean useVarLength = dictConf.getUseVarLengthDictionary() || DictionaryIndexType.shouldUseVarLengthDictionary( + reader.getStoredType(), statsCollector); + SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(existingColMetadata.getFieldSpec(), + _segmentDirectory.getSegmentMetadata().getIndexDir(), useVarLength); dictionaryCreator.build(statsCollector.getUniqueValuesSet()); return dictionaryCreator; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java index bdd54224e0..f7f185cc65 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java @@ -35,7 +35,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.pinot.common.utils.config.TableConfigUtils; import org.apache.pinot.segment.local.segment.index.column.PhysicalColumnIndexContainer; import org.apache.pinot.segment.local.segment.index.loader.columnminmaxvalue.ColumnMinMaxValueGeneratorMode; -import org.apache.pinot.segment.spi.compression.ChunkCompressionType; import org.apache.pinot.segment.spi.creator.SegmentVersion; import org.apache.pinot.segment.spi.index.ColumnConfigDeserializer; import org.apache.pinot.segment.spi.index.FieldIndexConfigs; @@ -49,6 +48,7 @@ import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; import org.apache.pinot.spi.config.table.BloomFilterConfig; import org.apache.pinot.spi.config.table.FSTType; import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec; import org.apache.pinot.spi.config.table.IndexConfig; import org.apache.pinot.spi.config.table.IndexingConfig; import org.apache.pinot.spi.config.table.JsonIndexConfig; @@ -93,7 +93,7 @@ public class IndexLoadingConfig { private boolean _enableDynamicStarTreeCreation; private List<StarTreeIndexConfig> _starTreeIndexConfigs; private boolean _enableDefaultStarTree; - private Map<String, ChunkCompressionType> _compressionConfigs = new HashMap<>(); + private Map<String, CompressionCodec> _compressionConfigs = new HashMap<>(); private Map<String, FieldIndexConfigs> _indexConfigsByColName = new HashMap<>(); private SegmentVersion _segmentVersion; @@ -299,8 +299,8 @@ public class IndexLoadingConfig { + "indexLoadingConfig for indexType: {}", _schema == null, _tableConfig == null, indexType); deserializer = IndexConfigDeserializer.fromMap(table -> fromIndexLoadingConfig); } else if (_segmentTier == null) { - deserializer = IndexConfigDeserializer.fromMap(table -> fromIndexLoadingConfig) - .withFallbackAlternative(stdDeserializer); + deserializer = + IndexConfigDeserializer.fromMap(table -> fromIndexLoadingConfig).withFallbackAlternative(stdDeserializer); } else { // No need to fall back to fromIndexLoadingConfig which contains index configs for default tier, when looking // for tier specific index configs. @@ -352,8 +352,7 @@ public class IndexLoadingConfig { for (FieldConfig fieldConfig : fieldConfigList) { String column = fieldConfig.getName(); if (fieldConfig.getCompressionCodec() != null) { - ChunkCompressionType compressionType = ChunkCompressionType.valueOf(fieldConfig.getCompressionCodec().name()); - _compressionConfigs.put(column, compressionType); + _compressionConfigs.put(column, fieldConfig.getCompressionCodec()); } } } @@ -613,7 +612,7 @@ public class IndexLoadingConfig { * Used by segmentPreProcessorTest to set compression configs. */ @VisibleForTesting - public void setCompressionConfigs(Map<String, ChunkCompressionType> compressionConfigs) { + public void setCompressionConfigs(Map<String, CompressionCodec> compressionConfigs) { _compressionConfigs = new HashMap<>(compressionConfigs); _dirty = true; } @@ -750,7 +749,7 @@ public class IndexLoadingConfig { * * @return a map containing column name as key and compressionType as value. */ - public Map<String, ChunkCompressionType> getCompressionConfigs() { + public Map<String, CompressionCodec> getCompressionConfigs() { return unmodifiable(_compressionConfigs); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java index acfd4827c6..09643966c2 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java @@ -36,6 +36,7 @@ import org.apache.pinot.segment.local.function.FunctionEvaluator; import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory; import org.apache.pinot.segment.local.segment.creator.impl.SegmentColumnarIndexCreator; import org.apache.pinot.segment.local.segment.creator.impl.SegmentDictionaryCreator; +import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueEntryDictForwardIndexCreator; import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueUnsortedForwardIndexCreator; import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueSortedForwardIndexCreator; import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueUnsortedForwardIndexCreator; @@ -48,6 +49,7 @@ import org.apache.pinot.segment.local.segment.creator.impl.stats.IntColumnPreInd import org.apache.pinot.segment.local.segment.creator.impl.stats.LongColumnPreIndexStatsCollector; import org.apache.pinot.segment.local.segment.creator.impl.stats.StringColumnPreIndexStatsCollector; import org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexType; +import org.apache.pinot.segment.local.segment.index.forward.ForwardIndexPlugin; import org.apache.pinot.segment.local.segment.index.forward.ForwardIndexType; import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils; @@ -55,8 +57,11 @@ import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader; import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.SegmentMetadata; import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.compression.DictIdCompressionType; import org.apache.pinot.segment.spi.creator.ColumnIndexCreationInfo; import org.apache.pinot.segment.spi.creator.StatsCollectorConfig; +import org.apache.pinot.segment.spi.index.FieldIndexConfigs; +import org.apache.pinot.segment.spi.index.ForwardIndexConfig; import org.apache.pinot.segment.spi.index.StandardIndexes; import org.apache.pinot.segment.spi.index.creator.DictionaryBasedInvertedIndexCreator; import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator; @@ -780,8 +785,18 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler { } } } else { - try (ForwardIndexCreator forwardIndexCreator = new MultiValueUnsortedForwardIndexCreator(_indexDir, column, - cardinality, numDocs, indexCreationInfo.getTotalNumberOfEntries())) { + DictIdCompressionType dictIdCompressionType = null; + FieldIndexConfigs fieldIndexConfig = _indexLoadingConfig.getFieldIndexConfig(column); + if (fieldIndexConfig != null) { + ForwardIndexConfig forwardIndexConfig = fieldIndexConfig.getConfig(new ForwardIndexPlugin().getIndexType()); + if (forwardIndexConfig != null) { + dictIdCompressionType = forwardIndexConfig.getDictIdCompressionType(); + } + } + try (ForwardIndexCreator forwardIndexCreator = dictIdCompressionType == DictIdCompressionType.MV_ENTRY_DICT + ? new MultiValueEntryDictForwardIndexCreator(_indexDir, column, cardinality, numDocs) + : new MultiValueUnsortedForwardIndexCreator(_indexDir, column, cardinality, numDocs, + indexCreationInfo.getTotalNumberOfEntries())) { for (int i = 0; i < numDocs; i++) { forwardIndexCreator.putDictIdMV(dictionaryCreator.indexOfMV(outputValues[i])); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitMVEntryDictForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitMVEntryDictForwardIndexReader.java new file mode 100644 index 0000000000..05c4000903 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitMVEntryDictForwardIndexReader.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.index.readers.forward; + +import com.google.common.base.Preconditions; +import java.util.List; +import org.apache.pinot.segment.local.io.util.FixedBitIntReaderWriter; +import org.apache.pinot.segment.local.io.util.PinotDataBitSet; +import org.apache.pinot.segment.local.io.writer.impl.FixedBitMVEntryDictForwardIndexWriter; +import org.apache.pinot.segment.spi.compression.DictIdCompressionType; +import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; +import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; +import org.apache.pinot.spi.data.FieldSpec.DataType; + + +/** + * Bit-compressed dictionary-encoded forward index reader for multi-value columns, where a second level dictionary + * encoding for multi-value entries (instead of individual values within the entry) are maintained within the forward + * index. + * See {@link FixedBitMVEntryDictForwardIndexWriter} for index layout. + */ +public final class FixedBitMVEntryDictForwardIndexReader implements ForwardIndexReader<ForwardIndexReaderContext> { + public static final int MAGIC_MARKER = FixedBitMVEntryDictForwardIndexWriter.MAGIC_MARKER; + public static final short VERSION = FixedBitMVEntryDictForwardIndexWriter.VERSION; + private static final int HEADER_SIZE = FixedBitMVEntryDictForwardIndexWriter.HEADER_SIZE; + + private final FixedBitIntReaderWriter _idReader; + private final int _offsetBufferOffset; + private final FixedBitIntReaderWriter _offsetReader; + private final int _valueBufferOffset; + private final FixedBitIntReaderWriter _valueReader; + + public FixedBitMVEntryDictForwardIndexReader(PinotDataBuffer dataBuffer, int numDocs, int numBitsPerValue) { + int magicMarker = dataBuffer.getInt(0); + Preconditions.checkState(magicMarker == MAGIC_MARKER, "Invalid magic marker: %s (expected: %s)", magicMarker, + MAGIC_MARKER); + short version = dataBuffer.getShort(4); + Preconditions.checkState(version == VERSION, "Invalid version: %s (expected: %s)", version, VERSION); + int numBitsPerValueInHeader = dataBuffer.getByte(6); + Preconditions.checkState(numBitsPerValueInHeader == numBitsPerValue, "Invalid numBitsPerValue: %s (expected: %s)", + numBitsPerValueInHeader, numBitsPerValue); + int numBitsPerId = dataBuffer.getByte(7); + int numUniqueEntries = dataBuffer.getInt(8); + int numTotalValues = dataBuffer.getInt(12); + _offsetBufferOffset = dataBuffer.getInt(16); + _valueBufferOffset = dataBuffer.getInt(20); + _idReader = new FixedBitIntReaderWriter(dataBuffer.view(HEADER_SIZE, _offsetBufferOffset), numDocs, numBitsPerId); + _offsetReader = + new FixedBitIntReaderWriter(dataBuffer.view(_offsetBufferOffset, _valueBufferOffset), numUniqueEntries + 1, + PinotDataBitSet.getNumBitsPerValue(numTotalValues)); + _valueReader = new FixedBitIntReaderWriter(dataBuffer.view(_valueBufferOffset, dataBuffer.size()), numTotalValues, + numBitsPerValue); + } + + @Override + public boolean isDictionaryEncoded() { + return true; + } + + @Override + public boolean isSingleValue() { + return false; + } + + @Override + public DataType getStoredType() { + return DataType.INT; + } + + @Override + public DictIdCompressionType getDictIdCompressionType() { + return DictIdCompressionType.MV_ENTRY_DICT; + } + + @Override + public int getDictIdMV(int docId, int[] dictIdBuffer, ForwardIndexReaderContext context) { + int id = _idReader.readInt(docId); + int startIndex = _offsetReader.readInt(id); + int numValues = _offsetReader.readInt(id + 1) - startIndex; + _valueReader.readInt(startIndex, numValues, dictIdBuffer); + return numValues; + } + + @Override + public int[] getDictIdMV(int docId, ForwardIndexReaderContext context) { + int id = _idReader.readInt(docId); + int startIndex = _offsetReader.readInt(id); + int numValues = _offsetReader.readInt(id + 1) - startIndex; + int[] dictIdBuffer = new int[numValues]; + _valueReader.readInt(startIndex, numValues, dictIdBuffer); + return dictIdBuffer; + } + + @Override + public int getNumValuesMV(int docId, ForwardIndexReaderContext context) { + int id = _idReader.readInt(docId); + return _offsetReader.readInt(id + 1) - _offsetReader.readInt(id); + } + + @Override + public boolean isBufferByteRangeInfoSupported() { + return true; + } + + @Override + public void recordDocIdByteRanges(int docId, ForwardIndexReaderContext context, List<ByteRange> ranges) { + int id = _idReader.readInt(docId); + int idReaderStartByteOffset = _idReader.getStartByteOffset(docId); + int idReaderEndByteOffset = _idReader.getEndByteOffset(docId); + ranges.add(new ByteRange(HEADER_SIZE + idReaderStartByteOffset, idReaderEndByteOffset - idReaderStartByteOffset)); + + int startIndex = _offsetReader.readInt(id); + int numValues = _offsetReader.readInt(id + 1) - startIndex; + int offsetReaderStartByteOffset = _offsetReader.getStartByteOffset(id); + int offsetReaderEndByteOffset = _offsetReader.getEndByteOffset(id + 1); + ranges.add(new ByteRange(_offsetBufferOffset + offsetReaderStartByteOffset, + offsetReaderEndByteOffset - offsetReaderStartByteOffset)); + + int valueReaderStartByteOffset = _valueReader.getStartByteOffset(startIndex); + int valueReaderEndByteOffset = _valueReader.getEndByteOffset(startIndex + numValues - 1); + ranges.add(new ByteRange(_valueBufferOffset + valueReaderStartByteOffset, + valueReaderEndByteOffset - valueReaderStartByteOffset)); + } + + @Override + public boolean isFixedOffsetMappingType() { + return false; + } + + @Override + public void close() { + // NOTE: DO NOT close the PinotDataBuffer here because it is tracked by the caller and might be reused later. The + // caller is responsible of closing the PinotDataBuffer. + _idReader.close(); + _offsetReader.close(); + _valueReader.close(); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index ebea84c455..8900b6f36f 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -51,6 +51,8 @@ import org.apache.pinot.segment.spi.index.IndexService; import org.apache.pinot.segment.spi.index.IndexType; import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair; import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec; +import org.apache.pinot.spi.config.table.FieldConfig.EncodingType; import org.apache.pinot.spi.config.table.IndexingConfig; import org.apache.pinot.spi.config.table.QuotaConfig; import org.apache.pinot.spi.config.table.RoutingConfig; @@ -943,8 +945,8 @@ public final class TableConfigUtils { * Also ensures proper dependency between index types (eg: Inverted Index columns * cannot be present in no-dictionary columns). */ - private static void validateIndexingConfig(@Nullable IndexingConfig indexingConfig, @Nullable Schema schema) { - if (indexingConfig == null || schema == null) { + private static void validateIndexingConfig(IndexingConfig indexingConfig, @Nullable Schema schema) { + if (schema == null) { return; } ArrayListMultimap<String, String> columnNameToConfigMap = ArrayListMultimap.create(); @@ -1115,59 +1117,69 @@ public final class TableConfigUtils { * Validates index compatibility for forward index disabled columns */ private static void validateFieldConfigList(@Nullable List<FieldConfig> fieldConfigList, - @Nullable IndexingConfig indexingConfigs, @Nullable Schema schema) { - if (fieldConfigList == null || schema == null) { + IndexingConfig indexingConfig, @Nullable Schema schema) { + if (fieldConfigList == null) { return; } + assert indexingConfig != null; for (FieldConfig fieldConfig : fieldConfigList) { String columnName = fieldConfig.getName(); - FieldSpec fieldConfigColSpec = schema.getFieldSpecFor(columnName); - Preconditions.checkState(fieldConfigColSpec != null, - "Column Name " + columnName + " defined in field config list must be a valid column defined in the schema"); - - if (indexingConfigs != null) { - List<String> noDictionaryColumns = indexingConfigs.getNoDictionaryColumns(); - switch (fieldConfig.getEncodingType()) { - case DICTIONARY: - if (noDictionaryColumns != null) { - Preconditions.checkArgument(!noDictionaryColumns.contains(columnName), - "FieldConfig encoding type is different from indexingConfig for column: " + columnName); - } - Preconditions.checkArgument(fieldConfig.getCompressionCodec() == null, - "Set compression codec to null for dictionary encoding type"); - break; - default: - break; - } + EncodingType encodingType = fieldConfig.getEncodingType(); + Preconditions.checkArgument(encodingType != null, "Encoding type must be specified for column: %s", columnName); + CompressionCodec compressionCodec = fieldConfig.getCompressionCodec(); + switch (encodingType) { + case RAW: + Preconditions.checkArgument(compressionCodec == null || compressionCodec.isApplicableToRawIndex(), + "Compression codec: %s is not applicable to raw index", compressionCodec); + break; + case DICTIONARY: + Preconditions.checkArgument(compressionCodec == null || compressionCodec.isApplicableToDictEncodedIndex(), + "Compression codec: %s is not applicable to dictionary encoded index", compressionCodec); + List<String> noDictionaryColumns = indexingConfig.getNoDictionaryColumns(); + Preconditions.checkArgument(noDictionaryColumns == null || !noDictionaryColumns.contains(columnName), + "FieldConfig encoding type is different from indexingConfig for column: %s", columnName); + Map<String, String> noDictionaryConfig = indexingConfig.getNoDictionaryConfig(); + Preconditions.checkArgument(noDictionaryConfig == null || !noDictionaryConfig.containsKey(columnName), + "FieldConfig encoding type is different from indexingConfig for column: %s", columnName); + break; + default: + break; + } - // Validate the forward index disabled compatibility with other indexes if enabled for this column - validateForwardIndexDisabledIndexCompatibility(columnName, fieldConfig, indexingConfigs, noDictionaryColumns, - schema); + if (schema == null) { + return; } + FieldSpec fieldSpec = schema.getFieldSpecFor(columnName); + Preconditions.checkState(fieldSpec != null, + "Column: %s defined in field config list must be a valid column defined in the schema", columnName); + + // Validate the forward index disabled compatibility with other indexes if enabled for this column + validateForwardIndexDisabledIndexCompatibility(columnName, fieldConfig, indexingConfig, schema); if (CollectionUtils.isNotEmpty(fieldConfig.getIndexTypes())) { for (FieldConfig.IndexType indexType : fieldConfig.getIndexTypes()) { switch (indexType) { - case FST: - Preconditions.checkArgument(fieldConfig.getEncodingType() == FieldConfig.EncodingType.DICTIONARY, - "FST Index is only enabled on dictionary encoded columns"); - Preconditions.checkState(fieldConfigColSpec.isSingleValueField() - && fieldConfigColSpec.getDataType().getStoredType() == DataType.STRING, - "FST Index is only supported for single value string columns"); - break; case INVERTED: - Preconditions.checkArgument(fieldConfig.getEncodingType() == FieldConfig.EncodingType.DICTIONARY, - "Cannot create an Inverted Index on column: " + fieldConfig.getName() + ", specified as " - + "a non dictionary column"); + Preconditions.checkState(fieldConfig.getEncodingType() == EncodingType.DICTIONARY, + "Cannot create inverted index on column: %s, it can only be applied to dictionary encoded columns", + columnName); break; case TEXT: - Preconditions.checkState(fieldConfigColSpec.getDataType().getStoredType() == DataType.STRING, - "TEXT Index is only supported for string columns"); + Preconditions.checkState(fieldSpec.getDataType().getStoredType() == DataType.STRING, + "Cannot create text index on column: %s, it can only be applied to string columns", columnName); + break; + case FST: + Preconditions.checkState( + fieldConfig.getEncodingType() == EncodingType.DICTIONARY && fieldSpec.isSingleValueField() + && fieldSpec.getDataType().getStoredType() == DataType.STRING, + "Cannot create FST index on column: %s, it can only be applied to dictionary encoded single value " + + "string columns", columnName); break; case TIMESTAMP: - Preconditions.checkState(fieldConfigColSpec.getDataType() == DataType.TIMESTAMP, - "TIMESTAMP Index is only supported for timestamp columns"); + Preconditions.checkState(fieldSpec.getDataType() == DataType.TIMESTAMP, + "Cannot create timestamp index on column: %s, it can only be applied to timestamp columns", + columnName); break; default: break; @@ -1189,7 +1201,7 @@ public final class TableConfigUtils { * back or generate a new index for existing segments is to either refresh or back-fill the segments. */ private static void validateForwardIndexDisabledIndexCompatibility(String columnName, FieldConfig fieldConfig, - IndexingConfig indexingConfigs, List<String> noDictionaryColumns, Schema schema) { + IndexingConfig indexingConfig, Schema schema) { Map<String, String> fieldConfigProperties = fieldConfig.getProperties(); if (fieldConfigProperties == null) { return; @@ -1204,26 +1216,24 @@ public final class TableConfigUtils { FieldSpec fieldSpec = schema.getFieldSpecFor(columnName); // Check for the range index since the index itself relies on the existence of the forward index to work. - if (indexingConfigs.getRangeIndexColumns() != null && indexingConfigs.getRangeIndexColumns().contains(columnName)) { + if (indexingConfig.getRangeIndexColumns() != null && indexingConfig.getRangeIndexColumns().contains(columnName)) { Preconditions.checkState(fieldSpec.isSingleValueField(), String.format("Feature not supported for multi-value " + "columns with range index. Cannot disable forward index for column %s. Disable range index on this " + "column to use this feature", columnName)); - Preconditions.checkState(indexingConfigs.getRangeIndexVersion() == BitSlicedRangeIndexCreator.VERSION, + Preconditions.checkState(indexingConfig.getRangeIndexVersion() == BitSlicedRangeIndexCreator.VERSION, String.format("Feature not supported for single-value columns with range index version < 2. Cannot disable " + "forward index for column %s. Either disable range index or create range index with" + " version >= 2 to use this feature", columnName)); } - Preconditions.checkState( - !indexingConfigs.isOptimizeDictionaryForMetrics() && !indexingConfigs.isOptimizeDictionary(), String.format( + Preconditions.checkState(!indexingConfig.isOptimizeDictionaryForMetrics() && !indexingConfig.isOptimizeDictionary(), + String.format( "Dictionary override optimization options (OptimizeDictionary, optimizeDictionaryForMetrics)" + " not supported with forward index for column: %s, disabled", columnName)); - boolean hasDictionary = - fieldConfig.getEncodingType() == FieldConfig.EncodingType.DICTIONARY || noDictionaryColumns == null - || !noDictionaryColumns.contains(columnName); + boolean hasDictionary = fieldConfig.getEncodingType() == EncodingType.DICTIONARY; boolean hasInvertedIndex = - indexingConfigs.getInvertedIndexColumns() != null && indexingConfigs.getInvertedIndexColumns() + indexingConfig.getInvertedIndexColumns() != null && indexingConfig.getInvertedIndexColumns() .contains(columnName); if (!hasDictionary || !hasInvertedIndex) { diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedBitMVEntryDictForwardIndexTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedBitMVEntryDictForwardIndexTest.java new file mode 100644 index 0000000000..12d3b2c352 --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedBitMVEntryDictForwardIndexTest.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.index.forward; + +import java.io.File; +import java.io.IOException; +import java.util.Random; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.segment.local.io.writer.impl.FixedBitMVEntryDictForwardIndexWriter; +import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitMVEntryDictForwardIndexReader; +import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; + + +public class FixedBitMVEntryDictForwardIndexTest { + private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "FixedBitMVEntryDictForwardIndexTest"); + private static final File INDEX_FILE = + new File(TEMP_DIR, "testColumn" + V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION); + private static final int NUM_DOCS = 1000; + private static final int MAX_NUM_VALUES_PER_MV_ENTRY = 3; + private static final Random RANDOM = new Random(); + + @BeforeClass + public void setUp() + throws IOException { + FileUtils.forceMkdir(TEMP_DIR); + } + + @Test + public void testRandomGeneratedValues() + throws Exception { + for (int numBitsPerValue = 1; numBitsPerValue <= 31; numBitsPerValue++) { + // Generate random values + int[][] valuesArray = new int[NUM_DOCS][]; + int maxValue = numBitsPerValue != 31 ? 1 << numBitsPerValue : Integer.MAX_VALUE; + for (int i = 0; i < NUM_DOCS; i++) { + int numValues = RANDOM.nextInt(MAX_NUM_VALUES_PER_MV_ENTRY + 1); + int[] values = new int[numValues]; + for (int j = 0; j < numValues; j++) { + values[j] = RANDOM.nextInt(maxValue); + } + valuesArray[i] = values; + } + + // Create the forward index + try ( + FixedBitMVEntryDictForwardIndexWriter writer = new FixedBitMVEntryDictForwardIndexWriter(INDEX_FILE, NUM_DOCS, + numBitsPerValue)) { + for (int[] values : valuesArray) { + writer.putDictIds(values); + } + } + + // Read the forward index + try (PinotDataBuffer dataBuffer = PinotDataBuffer.mapReadOnlyBigEndianFile(INDEX_FILE); + FixedBitMVEntryDictForwardIndexReader reader = new FixedBitMVEntryDictForwardIndexReader(dataBuffer, NUM_DOCS, + numBitsPerValue)) { + int[] valueBuffer = new int[MAX_NUM_VALUES_PER_MV_ENTRY]; + for (int i = 0; i < NUM_DOCS; i++) { + int numValues = reader.getDictIdMV(i, valueBuffer, null); + assertEquals(numValues, valuesArray[i].length); + for (int j = 0; j < numValues; j++) { + assertEquals(valueBuffer[j], valuesArray[i][j]); + } + } + } + + FileUtils.forceDelete(INDEX_FILE); + } + } + + @Test + public void testAllEmptyValues() + throws Exception { + // Create the forward index + try (FixedBitMVEntryDictForwardIndexWriter writer = new FixedBitMVEntryDictForwardIndexWriter(INDEX_FILE, NUM_DOCS, + 1)) { + int[] value = new int[0]; + for (int i = 0; i < NUM_DOCS; i++) { + writer.putDictIds(value); + } + } + + // Read the forward index + try (PinotDataBuffer dataBuffer = PinotDataBuffer.mapReadOnlyBigEndianFile(INDEX_FILE); + FixedBitMVEntryDictForwardIndexReader reader = new FixedBitMVEntryDictForwardIndexReader(dataBuffer, NUM_DOCS, + 1)) { + int[] valueBuffer = new int[0]; + for (int i = 0; i < NUM_DOCS; i++) { + assertEquals(reader.getDictIdMV(i, valueBuffer, null), 0); + } + } + + FileUtils.forceDelete(INDEX_FILE); + } + + @AfterClass + public void tearDown() + throws IOException { + FileUtils.deleteDirectory(TEMP_DIR); + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexTypeTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexTypeTest.java index 2f438a5b6a..c72a95a4e9 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexTypeTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexTypeTest.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.stream.Collectors; import org.apache.pinot.segment.local.segment.index.AbstractSerdeIndexContract; import org.apache.pinot.segment.spi.compression.ChunkCompressionType; +import org.apache.pinot.segment.spi.compression.DictIdCompressionType; import org.apache.pinot.segment.spi.index.ForwardIndexConfig; import org.apache.pinot.segment.spi.index.StandardIndexes; import org.apache.pinot.spi.config.table.FieldConfig; @@ -162,29 +163,17 @@ public class ForwardIndexTypeTest { } @Test - public void oldConfEnableDictWithSnappyCompression() + public void oldConfEnableDictWithMVEntryDictFormat() throws IOException { addFieldIndexConfig("" - + " {\n" - + " \"name\": \"dimInt\"," - + " \"encodingType\": \"DICTIONARY\",\n" - + " \"compressionCodec\": \"SNAPPY\"\n" - + " }" + + "{" + + " \"name\": \"dimInt\"," + + " \"encodingType\": \"DICTIONARY\"," + + " \"compressionCodec\": \"MV_ENTRY_DICT\"" + + "}" ); - assertEquals(ForwardIndexConfig.DEFAULT); - } - - @Test - public void oldConfEnableDictWithLZ4Compression() - throws IOException { - addFieldIndexConfig("" - + " {\n" - + " \"name\": \"dimInt\"," - + " \"encodingType\": \"DICTIONARY\",\n" - + " \"compressionCodec\": \"LZ4\"\n" - + " }" - ); - assertEquals(ForwardIndexConfig.DEFAULT); + assertEquals( + new ForwardIndexConfig.Builder().withDictIdCompressionType(DictIdCompressionType.MV_ENTRY_DICT).build()); } @Test @@ -197,13 +186,7 @@ public class ForwardIndexTypeTest { + " }" ); - assertEquals( - new ForwardIndexConfig.Builder() - .withCompressionType(null) - .withDeriveNumDocsPerChunk(false) - .withRawIndexWriterVersion(ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION) - .build() - ); + assertEquals(ForwardIndexConfig.DEFAULT); } @Test(dataProvider = "allChunkCompressionType", dataProviderClass = ForwardIndexTypeTest.class) @@ -269,7 +252,7 @@ public class ForwardIndexTypeTest { } @Test - public void newConfigDisabled2() + public void newConfigDisabled() throws IOException { addFieldIndexConfig("{\n" + " \"name\": \"dimInt\",\n" @@ -296,6 +279,23 @@ public class ForwardIndexTypeTest { assertEquals(ForwardIndexConfig.DEFAULT); } + @Test + public void newConfigMVEntryDictFormat() + throws IOException { + addFieldIndexConfig("" + + "{" + + " \"name\": \"dimInt\"," + + " \"indexes\" : {" + + " \"forward\": {" + + " \"dictIdCompressionType\": \"MV_ENTRY_DICT\"" + + " }" + + " }" + + "}" + ); + assertEquals( + new ForwardIndexConfig.Builder().withDictIdCompressionType(DictIdCompressionType.MV_ENTRY_DICT).build()); + } + @Test(dataProvider = "allChunkCompressionType", dataProviderClass = ForwardIndexTypeTest.class) public void newConfigEnabled(String compression) throws IOException { diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java index 2800cf9053..6077ca1625 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.stream.Collectors; import javax.annotation.Nullable; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; @@ -44,6 +45,7 @@ import org.apache.pinot.segment.local.segment.store.SegmentLocalFSDirectory; import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.V1Constants; import org.apache.pinot.segment.spi.compression.ChunkCompressionType; +import org.apache.pinot.segment.spi.compression.DictIdCompressionType; import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; import org.apache.pinot.segment.spi.index.FieldIndexConfigs; import org.apache.pinot.segment.spi.index.IndexType; @@ -53,6 +55,7 @@ import org.apache.pinot.segment.spi.index.reader.Dictionary; import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; import org.apache.pinot.segment.spi.store.SegmentDirectory; import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec; import org.apache.pinot.spi.config.table.IndexConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; @@ -70,10 +73,7 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertNotEquals; -import static org.testng.Assert.assertTrue; +import static org.testng.Assert.*; public class ForwardIndexHandlerTest { @@ -104,7 +104,6 @@ public class ForwardIndexHandlerTest { // Sorted columns private static final String DIM_RAW_SORTED_INTEGER = "DIM_RAW_SORTED_INTEGER"; - // Metric columns private static final String METRIC_PASS_THROUGH_INTEGER = "METRIC_PASS_THROUGH_INTEGER"; private static final String METRIC_SNAPPY_INTEGER = "METRIC_SNAPPY_INTEGER"; @@ -191,6 +190,9 @@ public class ForwardIndexHandlerTest { DIM_DICT_LONG, DIM_DICT_STRING, DIM_DICT_BYES, DIM_DICT_MV_BYTES, DIM_DICT_MV_STRING, DIM_DICT_MV_INTEGER, DIM_DICT_MV_LONG); + private static final List<String> DICT_ENABLED_MV_COLUMNS_WITH_FORWARD_INDEX = + Arrays.asList(DIM_DICT_MV_INTEGER, DIM_DICT_MV_LONG, DIM_DICT_MV_STRING, DIM_DICT_MV_BYTES); + private static final List<String> SV_FORWARD_INDEX_DISABLED_COLUMNS = Arrays.asList( DIM_SV_FORWARD_INDEX_DISABLED_INTEGER, DIM_SV_FORWARD_INDEX_DISABLED_LONG, DIM_SV_FORWARD_INDEX_DISABLED_STRING, DIM_SV_FORWARD_INDEX_DISABLED_BYTES); @@ -206,14 +208,16 @@ public class ForwardIndexHandlerTest { private static final List<String> FORWARD_INDEX_DISABLED_RAW_COLUMNS = Arrays.asList(DIM_RAW_SV_FORWARD_INDEX_DISABLED_INTEGER, DIM_RAW_MV_FORWARD_INDEX_DISABLED_INTEGER); + private static final List<CompressionCodec> RAW_COMPRESSION_TYPES = + Arrays.stream(CompressionCodec.values()).filter(CompressionCodec::isApplicableToRawIndex) + .collect(Collectors.toList()); + private final List<String> _noDictionaryColumns = new ArrayList<>(); private final List<String> _forwardIndexDisabledColumns = new ArrayList<>(); private final List<String> _invertedIndexColumns = new ArrayList<>(); TableConfig _tableConfig; Schema _schema; File _segmentDirectory; - private List<FieldConfig.CompressionCodec> _allCompressionTypes = - Arrays.asList(FieldConfig.CompressionCodec.values()); @BeforeMethod public void setUp() @@ -242,27 +246,27 @@ public class ForwardIndexHandlerTest { for (String indexColumn : RAW_SNAPPY_INDEX_COLUMNS) { fieldConfigs.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, Collections.emptyList(), - FieldConfig.CompressionCodec.SNAPPY, null)); + CompressionCodec.SNAPPY, null)); } for (String indexColumn : RAW_SORTED_INDEX_COLUMNS) { fieldConfigs.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, - Collections.singletonList(FieldConfig.IndexType.SORTED), FieldConfig.CompressionCodec.SNAPPY, null)); + Collections.singletonList(FieldConfig.IndexType.SORTED), CompressionCodec.SNAPPY, null)); } for (String indexColumn : RAW_ZSTANDARD_INDEX_COLUMNS) { fieldConfigs.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, Collections.emptyList(), - FieldConfig.CompressionCodec.ZSTANDARD, null)); + CompressionCodec.ZSTANDARD, null)); } for (String indexColumn : RAW_PASS_THROUGH_INDEX_COLUMNS) { fieldConfigs.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, Collections.emptyList(), - FieldConfig.CompressionCodec.PASS_THROUGH, null)); + CompressionCodec.PASS_THROUGH, null)); } for (String indexColumn : RAW_LZ4_INDEX_COLUMNS) { fieldConfigs.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, Collections.emptyList(), - FieldConfig.CompressionCodec.LZ4, null)); + CompressionCodec.LZ4, null)); } for (String indexColumn : SV_FORWARD_INDEX_DISABLED_COLUMNS) { @@ -284,8 +288,8 @@ public class ForwardIndexHandlerTest { } for (String indexColumn : FORWARD_INDEX_DISABLED_RAW_COLUMNS) { - fieldConfigs.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, - Collections.emptyList(), FieldConfig.CompressionCodec.LZ4, + fieldConfigs.add( + new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, Collections.emptyList(), CompressionCodec.LZ4, Collections.singletonMap(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString()))); } @@ -696,8 +700,9 @@ public class ForwardIndexHandlerTest { || DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX.equals(name) || DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX.equals(name)); FieldConfig config = fieldConfigs.remove(randIdx); - FieldConfig.CompressionCodec newCompressionType = null; - for (FieldConfig.CompressionCodec type : _allCompressionTypes) { + CompressionCodec newCompressionType = null; + for (CompressionCodec type : CompressionCodec.values()) { + if (config.getCompressionCodec() != type) { newCompressionType = type; break; @@ -717,7 +722,7 @@ public class ForwardIndexHandlerTest { Map<String, List<ForwardIndexHandler.Operation>> operationMap = fwdIndexHandler.computeOperations(writer); assertEquals(operationMap.size(), 1); assertEquals(operationMap.get(config.getName()), - Collections.singletonList(ForwardIndexHandler.Operation.CHANGE_RAW_INDEX_COMPRESSION_TYPE)); + Collections.singletonList(ForwardIndexHandler.Operation.CHANGE_INDEX_COMPRESSION_TYPE)); // TEST2: Change compression and add index. Change compressionType for more than 1 column. fieldConfigs = new ArrayList<>(_tableConfig.getFieldConfigList()); @@ -725,10 +730,10 @@ public class ForwardIndexHandlerTest { FieldConfig config2 = fieldConfigs.remove(1); FieldConfig newConfig1 = new FieldConfig(config1.getName(), FieldConfig.EncodingType.RAW, Collections.emptyList(), - FieldConfig.CompressionCodec.ZSTANDARD, null); + CompressionCodec.ZSTANDARD, null); fieldConfigs.add(newConfig1); FieldConfig newConfig2 = new FieldConfig(config2.getName(), FieldConfig.EncodingType.RAW, Collections.emptyList(), - FieldConfig.CompressionCodec.ZSTANDARD, null); + CompressionCodec.ZSTANDARD, null); fieldConfigs.add(newConfig2); tableConfig = @@ -743,9 +748,9 @@ public class ForwardIndexHandlerTest { operationMap = fwdIndexHandler.computeOperations(writer); assertEquals(operationMap.size(), 2); assertEquals(operationMap.get(config1.getName()), - Collections.singletonList(ForwardIndexHandler.Operation.CHANGE_RAW_INDEX_COMPRESSION_TYPE)); + Collections.singletonList(ForwardIndexHandler.Operation.CHANGE_INDEX_COMPRESSION_TYPE)); assertEquals(operationMap.get(config2.getName()), - Collections.singletonList(ForwardIndexHandler.Operation.CHANGE_RAW_INDEX_COMPRESSION_TYPE)); + Collections.singletonList(ForwardIndexHandler.Operation.CHANGE_INDEX_COMPRESSION_TYPE)); // Tear down segmentLocalFSDirectory.close(); @@ -963,8 +968,8 @@ public class ForwardIndexHandlerTest { name = fieldConfigs.get(randIdx).getName(); } while (!SV_FORWARD_INDEX_DISABLED_COLUMNS.contains(name) && !MV_FORWARD_INDEX_DISABLED_COLUMNS.contains(name)); FieldConfig config = fieldConfigs.remove(randIdx); - FieldConfig.CompressionCodec newCompressionType = null; - for (FieldConfig.CompressionCodec type : _allCompressionTypes) { + CompressionCodec newCompressionType = null; + for (CompressionCodec type : RAW_COMPRESSION_TYPES) { if (config.getCompressionCodec() != type) { newCompressionType = type; break; @@ -1083,7 +1088,7 @@ public class ForwardIndexHandlerTest { continue; } // For every noDictionaryColumn, change the compressionType to all available types, one by one. - for (FieldConfig.CompressionCodec compressionType : _allCompressionTypes) { + for (CompressionCodec compressionType : RAW_COMPRESSION_TYPES) { // Setup SegmentMetadataImpl existingSegmentMetadata = new SegmentMetadataImpl(_segmentDirectory); SegmentDirectory segmentLocalFSDirectory = @@ -1100,10 +1105,9 @@ public class ForwardIndexHandlerTest { } FieldConfig config = fieldConfigs.remove(index); String columnName = config.getName(); - FieldConfig.CompressionCodec newCompressionType = compressionType; FieldConfig newConfig = - new FieldConfig(columnName, FieldConfig.EncodingType.RAW, Collections.emptyList(), newCompressionType, + new FieldConfig(columnName, FieldConfig.EncodingType.RAW, Collections.emptyList(), compressionType, null); fieldConfigs.add(newConfig); @@ -1125,7 +1129,7 @@ public class ForwardIndexHandlerTest { ColumnMetadata metadata = existingSegmentMetadata.getColumnMetadataFor(columnName); testIndexExists(columnName, StandardIndexes.forward()); validateIndexMap(columnName, false, false); - validateForwardIndex(columnName, newCompressionType, metadata.isSorted()); + validateForwardIndex(columnName, compressionType, metadata.isSorted()); // Validate metadata properties. Nothing should change when a forwardIndex is rewritten for compressionType // change. @@ -1138,6 +1142,79 @@ public class ForwardIndexHandlerTest { } } + @Test + public void testChangeDictCompression() + throws Exception { + List<FieldConfig> fieldConfigs = new ArrayList<>(_tableConfig.getFieldConfigList()); + + // Change to MV_ENTRY_DICT compression + for (String column : DICT_ENABLED_MV_COLUMNS_WITH_FORWARD_INDEX) { + FieldConfig newFieldConfig = new FieldConfig(column, FieldConfig.EncodingType.DICTIONARY, Collections.emptyList(), + CompressionCodec.MV_ENTRY_DICT, null); + fieldConfigs.add(newFieldConfig); + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) + .setNoDictionaryColumns(_noDictionaryColumns).setFieldConfigList(fieldConfigs).build(); + + SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_segmentDirectory); + try (SegmentDirectory segmentLocalFSDirectory = new SegmentLocalFSDirectory(_segmentDirectory, segmentMetadata, + ReadMode.mmap); SegmentDirectory.Writer writer = segmentLocalFSDirectory.createWriter()) { + ForwardIndexHandler forwardIndexHandler = + new ForwardIndexHandler(segmentLocalFSDirectory, new IndexLoadingConfig(tableConfig, null), null); + + Map<String, List<ForwardIndexHandler.Operation>> operations = forwardIndexHandler.computeOperations(writer); + assertEquals(operations, Collections.singletonMap(column, + Collections.singletonList(ForwardIndexHandler.Operation.CHANGE_INDEX_COMPRESSION_TYPE))); + assertTrue(forwardIndexHandler.needUpdateIndices(writer)); + + forwardIndexHandler.updateIndices(writer); + forwardIndexHandler.postUpdateIndicesCleanup(writer); + } + + segmentMetadata = new SegmentMetadataImpl(_segmentDirectory); + try (SegmentDirectory segmentLocalFSDirectory = new SegmentLocalFSDirectory(_segmentDirectory, segmentMetadata, + ReadMode.mmap); SegmentDirectory.Reader reader = segmentLocalFSDirectory.createReader()) { + ForwardIndexReader<?> forwardIndexReader = + ForwardIndexType.read(reader, segmentMetadata.getColumnMetadataFor(column)); + assertTrue(forwardIndexReader.isDictionaryEncoded()); + assertFalse(forwardIndexReader.isSingleValue()); + assertEquals(forwardIndexReader.getDictIdCompressionType(), DictIdCompressionType.MV_ENTRY_DICT); + } + } + + // Change back to regular forward index + for (int i = 0; i < DICT_ENABLED_MV_COLUMNS_WITH_FORWARD_INDEX.size(); i++) { + FieldConfig fieldConfig = fieldConfigs.remove(fieldConfigs.size() - 1); + String column = fieldConfig.getName(); + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) + .setNoDictionaryColumns(_noDictionaryColumns).setFieldConfigList(fieldConfigs).build(); + + SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_segmentDirectory); + try (SegmentDirectory segmentLocalFSDirectory = new SegmentLocalFSDirectory(_segmentDirectory, segmentMetadata, + ReadMode.mmap); SegmentDirectory.Writer writer = segmentLocalFSDirectory.createWriter()) { + ForwardIndexHandler forwardIndexHandler = + new ForwardIndexHandler(segmentLocalFSDirectory, new IndexLoadingConfig(tableConfig, null), null); + + Map<String, List<ForwardIndexHandler.Operation>> operations = forwardIndexHandler.computeOperations(writer); + assertEquals(operations, Collections.singletonMap(column, + Collections.singletonList(ForwardIndexHandler.Operation.CHANGE_INDEX_COMPRESSION_TYPE))); + assertTrue(forwardIndexHandler.needUpdateIndices(writer)); + + forwardIndexHandler.updateIndices(writer); + forwardIndexHandler.postUpdateIndicesCleanup(writer); + } + + segmentMetadata = new SegmentMetadataImpl(_segmentDirectory); + try (SegmentDirectory segmentLocalFSDirectory = new SegmentLocalFSDirectory(_segmentDirectory, segmentMetadata, + ReadMode.mmap); SegmentDirectory.Reader reader = segmentLocalFSDirectory.createReader()) { + ForwardIndexReader<?> forwardIndexReader = + ForwardIndexType.read(reader, segmentMetadata.getColumnMetadataFor(column)); + assertTrue(forwardIndexReader.isDictionaryEncoded()); + assertFalse(forwardIndexReader.isSingleValue()); + assertNull(forwardIndexReader.getDictIdCompressionType()); + } + } + } + @Test public void testChangeCompressionForMultipleColumns() throws Exception { @@ -1149,8 +1226,8 @@ public class ForwardIndexHandlerTest { List<FieldConfig> fieldConfigs = new ArrayList<>(_tableConfig.getFieldConfigList()); Random rand = new Random(); - int randomIdx = rand.nextInt(_allCompressionTypes.size()); - FieldConfig.CompressionCodec newCompressionType = _allCompressionTypes.get(randomIdx); + int randomIdx = rand.nextInt(RAW_COMPRESSION_TYPES.size()); + CompressionCodec newCompressionType = RAW_COMPRESSION_TYPES.get(randomIdx); // Column 1 String name; @@ -1497,7 +1574,7 @@ public class ForwardIndexHandlerTest { testIndexExists(column, StandardIndexes.forward()); validateIndexMap(column, false, false); // All the columns are dimensions. So default compression type is LZ4. - validateForwardIndex(column, FieldConfig.CompressionCodec.LZ4, metadata.isSorted()); + validateForwardIndex(column, CompressionCodec.LZ4, metadata.isSorted()); // In column metadata, nothing other than hasDictionary and dictionaryElementSize should change. validateMetadataProperties(column, false, 0, metadata.getCardinality(), metadata.getTotalDocs(), @@ -1538,7 +1615,7 @@ public class ForwardIndexHandlerTest { testIndexExists(column1, StandardIndexes.forward()); validateIndexMap(column1, false, false); // All the columns are dimensions. So default compression type is LZ4. - validateForwardIndex(column1, FieldConfig.CompressionCodec.LZ4, metadata.isSorted()); + validateForwardIndex(column1, CompressionCodec.LZ4, metadata.isSorted()); // In column metadata, nothing other than hasDictionary and dictionaryElementSize should change. validateMetadataProperties(column1, false, 0, metadata.getCardinality(), metadata.getTotalDocs(), @@ -1551,7 +1628,7 @@ public class ForwardIndexHandlerTest { testIndexExists(column2, StandardIndexes.forward()); validateIndexMap(column2, false, false); // All the columns are dimensions. So default compression type is LZ4. - validateForwardIndex(column2, FieldConfig.CompressionCodec.LZ4, metadata.isSorted()); + validateForwardIndex(column2, CompressionCodec.LZ4, metadata.isSorted()); // In column metadata, nothing other than hasDictionary and dictionaryElementSize should change. validateMetadataProperties(column2, false, 0, metadata.getCardinality(), metadata.getTotalDocs(), @@ -1940,7 +2017,7 @@ public class ForwardIndexHandlerTest { // Col1 validation. ColumnMetadata metadata = existingSegmentMetadata.getColumnMetadataFor(col1); validateIndexMap(col1, false, false); - validateForwardIndex(col1, FieldConfig.CompressionCodec.LZ4, metadata.isSorted()); + validateForwardIndex(col1, CompressionCodec.LZ4, metadata.isSorted()); // In column metadata, nothing should change. validateMetadataProperties(col1, false, 0, metadata.getCardinality(), metadata.getTotalDocs(), metadata.getDataType(), metadata.getFieldType(), metadata.isSorted(), @@ -1950,7 +2027,7 @@ public class ForwardIndexHandlerTest { // Col2 validation. metadata = existingSegmentMetadata.getColumnMetadataFor(col2); validateIndexMap(col2, false, false); - validateForwardIndex(col2, FieldConfig.CompressionCodec.LZ4, metadata.isSorted()); + validateForwardIndex(col2, CompressionCodec.LZ4, metadata.isSorted()); // In column metadata, nothing should change. validateMetadataProperties(col2, false, 0, metadata.getCardinality(), metadata.getTotalDocs(), metadata.getDataType(), metadata.getFieldType(), metadata.isSorted(), @@ -1985,7 +2062,7 @@ public class ForwardIndexHandlerTest { // Column validation. ColumnMetadata metadata = existingSegmentMetadata.getColumnMetadataFor(column); validateIndexMap(column, false, false); - validateForwardIndex(column, FieldConfig.CompressionCodec.LZ4, metadata.isSorted()); + validateForwardIndex(column, CompressionCodec.LZ4, metadata.isSorted()); // In column metadata, some values can change since MV columns with duplicates lose the duplicates on forward index // regeneration. validateMetadataProperties(column, false, 0, metadata.getCardinality(), @@ -2030,7 +2107,7 @@ public class ForwardIndexHandlerTest { ColumnMetadata metadata = existingSegmentMetadata.getColumnMetadataFor(column); validateIndexMap(column, false, false); - validateForwardIndex(column, FieldConfig.CompressionCodec.LZ4, metadata.isSorted()); + validateForwardIndex(column, CompressionCodec.LZ4, metadata.isSorted()); // In column metadata, nothing should change. validateMetadataProperties(column, false, 0, @@ -2308,7 +2385,7 @@ public class ForwardIndexHandlerTest { } } - private void validateForwardIndex(String columnName, @Nullable FieldConfig.CompressionCodec expectedCompressionType, + private void validateForwardIndex(String columnName, @Nullable CompressionCodec expectedCompressionType, boolean isSorted) throws IOException { // Setup diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java index 8e549d2ba3..3cc14682bc 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java @@ -62,6 +62,7 @@ import org.apache.pinot.segment.spi.store.SegmentDirectory; import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths; import org.apache.pinot.segment.spi.utils.SegmentMetadataUtils; import org.apache.pinot.spi.config.table.BloomFilterConfig; +import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec; import org.apache.pinot.spi.config.table.IndexConfig; import org.apache.pinot.spi.config.table.IndexingConfig; import org.apache.pinot.spi.config.table.StarTreeIndexConfig; @@ -656,9 +657,8 @@ public class SegmentPreProcessorTest { @Test public void testForwardIndexHandlerChangeCompression() throws Exception { - Map<String, ChunkCompressionType> compressionConfigs = new HashMap<>(); - ChunkCompressionType newCompressionType = ChunkCompressionType.ZSTANDARD; - compressionConfigs.put(EXISTING_STRING_COL_RAW, newCompressionType); + Map<String, CompressionCodec> compressionConfigs = new HashMap<>(); + compressionConfigs.put(EXISTING_STRING_COL_RAW, CompressionCodec.ZSTANDARD); _indexLoadingConfig.setCompressionConfigs(compressionConfigs); _indexLoadingConfig.addNoDictionaryColumns(EXISTING_STRING_COL_RAW); @@ -672,12 +672,11 @@ public class SegmentPreProcessorTest { new SegmentV1V2ToV3FormatConverter().convert(_indexDir); // Test2: Now forward index will be rewritten with ZSTANDARD compressionType. - checkForwardIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0, newCompressionType, true, - 0, DataType.STRING, 100000); + checkForwardIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0, + ChunkCompressionType.ZSTANDARD, true, 0, DataType.STRING, 100000); // Test3: Change compression on existing raw index column. Also add text index on same column. Check correctness. - newCompressionType = ChunkCompressionType.SNAPPY; - compressionConfigs.put(EXISTING_STRING_COL_RAW, newCompressionType); + compressionConfigs.put(EXISTING_STRING_COL_RAW, CompressionCodec.SNAPPY); _indexLoadingConfig.setCompressionConfigs(compressionConfigs); Set<String> textIndexColumns = new HashSet<>(); textIndexColumns.add(EXISTING_STRING_COL_RAW); @@ -688,12 +687,11 @@ public class SegmentPreProcessorTest { ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(EXISTING_STRING_COL_RAW); assertNotNull(columnMetadata); checkTextIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0); - validateIndex(StandardIndexes.forward(), EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0, true, - 0, newCompressionType, false, DataType.STRING, 100000); + validateIndex(StandardIndexes.forward(), EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0, true, 0, + ChunkCompressionType.SNAPPY, false, DataType.STRING, 100000); // Test4: Change compression on RAW index column. Change another index on another column. Check correctness. - newCompressionType = ChunkCompressionType.ZSTANDARD; - compressionConfigs.put(EXISTING_STRING_COL_RAW, newCompressionType); + compressionConfigs.put(EXISTING_STRING_COL_RAW, CompressionCodec.ZSTANDARD); _indexLoadingConfig.setCompressionConfigs(compressionConfigs); Set<String> fstColumns = new HashSet<>(); fstColumns.add(EXISTING_STRING_COL_DICT); @@ -706,12 +704,11 @@ public class SegmentPreProcessorTest { // Check FST index checkFSTIndexCreation(EXISTING_STRING_COL_DICT, 9, 4, _newColumnsSchemaWithFST, false, false, 26); // Check forward index. - validateIndex(StandardIndexes.forward(), EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0, true, - 0, newCompressionType, false, DataType.STRING, 100000); + validateIndex(StandardIndexes.forward(), EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0, true, 0, + ChunkCompressionType.ZSTANDARD, false, DataType.STRING, 100000); // Test5: Change compressionType for an MV column - newCompressionType = ChunkCompressionType.ZSTANDARD; - compressionConfigs.put(EXISTING_INT_COL_RAW_MV, newCompressionType); + compressionConfigs.put(EXISTING_INT_COL_RAW_MV, CompressionCodec.ZSTANDARD); _indexLoadingConfig.setCompressionConfigs(compressionConfigs); _indexLoadingConfig.addNoDictionaryColumns(EXISTING_INT_COL_RAW_MV); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java index 7653ff7636..9c9b2cdf43 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java @@ -31,6 +31,7 @@ import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair import org.apache.pinot.spi.config.table.ColumnPartitionConfig; import org.apache.pinot.spi.config.table.DedupConfig; import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec; import org.apache.pinot.spi.config.table.HashFunction; import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig; import org.apache.pinot.spi.config.table.RoutingConfig; @@ -706,9 +707,8 @@ public class TableConfigUtilsTest { streamConfigs.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE, "100m"); streamConfigs.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS); ingestionConfig.setStreamIngestionConfig(new StreamIngestionConfig(List.of(streamConfigs))); - tableConfig = - new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN) - .setIngestionConfig(ingestionConfig).build(); + tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN) + .setIngestionConfig(ingestionConfig).build(); try { TableConfigUtils.validate(tableConfig, schema); @@ -720,9 +720,8 @@ public class TableConfigUtilsTest { // When size based threshold is specified, rows has to be set to 0. streamConfigs.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, "1000"); ingestionConfig.setStreamIngestionConfig(new StreamIngestionConfig(List.of(streamConfigs))); - tableConfig = - new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName("timeColumn") - .setIngestionConfig(ingestionConfig).build(); + tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName("timeColumn") + .setIngestionConfig(ingestionConfig).build(); try { TableConfigUtils.validate(tableConfig, schema); @@ -734,9 +733,8 @@ public class TableConfigUtilsTest { // When size based threshold is specified, rows has to be set to 0. streamConfigs.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, "0"); ingestionConfig.setStreamIngestionConfig(new StreamIngestionConfig(List.of(streamConfigs))); - tableConfig = - new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName("timeColumn") - .setIngestionConfig(ingestionConfig).build(); + tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName("timeColumn") + .setIngestionConfig(ingestionConfig).build(); try { TableConfigUtils.validate(tableConfig, schema); @@ -1030,7 +1028,9 @@ public class TableConfigUtilsTest { TableConfigUtils.validate(tableConfig, schema); Assert.fail("Should fail since FST index is enabled on RAW encoding type"); } catch (Exception e) { - Assert.assertEquals(e.getMessage(), "FST Index is only enabled on dictionary encoded columns"); + Assert.assertEquals(e.getMessage(), + "Cannot create FST index on column: myCol1, it can only be applied to dictionary encoded single value " + + "string columns"); } tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build(); @@ -1041,7 +1041,9 @@ public class TableConfigUtilsTest { TableConfigUtils.validate(tableConfig, schema); Assert.fail("Should fail since FST index is enabled on multi value column"); } catch (Exception e) { - Assert.assertEquals(e.getMessage(), "FST Index is only supported for single value string columns"); + Assert.assertEquals(e.getMessage(), + "Cannot create FST index on column: myCol2, it can only be applied to dictionary encoded single value " + + "string columns"); } tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build(); @@ -1052,7 +1054,9 @@ public class TableConfigUtilsTest { TableConfigUtils.validate(tableConfig, schema); Assert.fail("Should fail since FST index is enabled on non String column"); } catch (Exception e) { - Assert.assertEquals(e.getMessage(), "FST Index is only supported for single value string columns"); + Assert.assertEquals(e.getMessage(), + "Cannot create FST index on column: intCol, it can only be applied to dictionary encoded single value " + + "string columns"); } tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) @@ -1064,7 +1068,8 @@ public class TableConfigUtilsTest { TableConfigUtils.validate(tableConfig, schema); Assert.fail("Should fail since TEXT index is enabled on non String column"); } catch (Exception e) { - Assert.assertEquals(e.getMessage(), "TEXT Index is only supported for string columns"); + Assert.assertEquals(e.getMessage(), + "Cannot create text index on column: intCol, it can only be applied to string columns"); } tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) @@ -1077,29 +1082,29 @@ public class TableConfigUtilsTest { Assert.fail("Should fail since field name is not present in schema"); } catch (Exception e) { Assert.assertEquals(e.getMessage(), - "Column Name myCol21 defined in field config list must be a valid column defined in the schema"); + "Column: myCol21 defined in field config list must be a valid column defined in the schema"); } tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build(); try { FieldConfig fieldConfig = new FieldConfig("intCol", FieldConfig.EncodingType.DICTIONARY, Collections.emptyList(), - FieldConfig.CompressionCodec.SNAPPY, null); + CompressionCodec.SNAPPY, null); tableConfig.setFieldConfigList(Arrays.asList(fieldConfig)); TableConfigUtils.validate(tableConfig, schema); - Assert.fail("Should fail since dictionary encoding does not support compression codec snappy"); + Assert.fail("Should fail since dictionary encoding does not support compression codec SNAPPY"); } catch (Exception e) { - Assert.assertEquals(e.getMessage(), "Set compression codec to null for dictionary encoding type"); + Assert.assertEquals(e.getMessage(), "Compression codec: SNAPPY is not applicable to dictionary encoded index"); } tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build(); try { - FieldConfig fieldConfig = new FieldConfig("intCol", FieldConfig.EncodingType.DICTIONARY, Collections.emptyList(), - FieldConfig.CompressionCodec.ZSTANDARD, null); + FieldConfig fieldConfig = new FieldConfig("intCol", FieldConfig.EncodingType.RAW, Collections.emptyList(), + CompressionCodec.MV_ENTRY_DICT, null); tableConfig.setFieldConfigList(Arrays.asList(fieldConfig)); TableConfigUtils.validate(tableConfig, schema); - Assert.fail("Should fail since dictionary encoding does not support compression codec zstandard"); + Assert.fail("Should fail since raw encoding does not support compression codec MV_ENTRY_DICT"); } catch (Exception e) { - Assert.assertEquals(e.getMessage(), "Set compression codec to null for dictionary encoding type"); + Assert.assertEquals(e.getMessage(), "Compression codec: MV_ENTRY_DICT is not applicable to raw index"); } tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) @@ -1227,7 +1232,7 @@ public class TableConfigUtilsTest { Assert.fail("Should not be able to disable dictionary but keep inverted index"); } catch (Exception e) { Assert.assertEquals(e.getMessage(), - "Cannot create an Inverted index on column myCol2 specified in the " + "noDictionaryColumns config"); + "Cannot create an Inverted index on column myCol2 specified in the noDictionaryColumns config"); } // Tests the case when the field-config list marks a column as raw (non-dictionary) and enables @@ -1242,7 +1247,7 @@ public class TableConfigUtilsTest { Assert.fail("Should not be able to disable dictionary but keep inverted index"); } catch (Exception e) { Assert.assertEquals(e.getMessage(), - "Cannot create an Inverted Index on column: myCol2, specified as a non dictionary column"); + "Cannot create inverted index on column: myCol2, it can only be applied to dictionary encoded columns"); } tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) @@ -1258,7 +1263,9 @@ public class TableConfigUtilsTest { TableConfigUtils.validate(tableConfig, schema); Assert.fail("Should not be able to disable dictionary but keep inverted index"); } catch (Exception e) { - Assert.assertEquals(e.getMessage(), "FST Index is only enabled on dictionary encoded columns"); + Assert.assertEquals(e.getMessage(), + "Cannot create FST index on column: myCol2, it can only be applied to dictionary encoded single value " + + "string columns"); } tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) @@ -1433,7 +1440,7 @@ public class TableConfigUtilsTest { } starTreeIndexConfig = new StarTreeIndexConfig(Arrays.asList("myCol"), null, null, - Arrays.asList(new StarTreeAggregationConfig("myCol2", "SUM", FieldConfig.CompressionCodec.LZ4)), 1); + Arrays.asList(new StarTreeAggregationConfig("myCol2", "SUM", CompressionCodec.LZ4)), 1); tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME) .setStarTreeIndexConfigs(Arrays.asList(starTreeIndexConfig)).build(); try { @@ -1798,8 +1805,7 @@ public class TableConfigUtilsTest { try { TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema); } catch (IllegalStateException e) { - Assert.assertEquals(e.getMessage(), - "The outOfOrderRecordColumn must be a single-valued BOOLEAN column"); + Assert.assertEquals(e.getMessage(), "The outOfOrderRecordColumn must be a single-valued BOOLEAN column"); } } diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/DictIdCompressionType.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/DictIdCompressionType.java new file mode 100644 index 0000000000..82a60ed869 --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/DictIdCompressionType.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.compression; + +/** + * Compression type for dictionary-encoded forward index, where the values stored are dictionary ids. + */ +public enum DictIdCompressionType { + // Add a second level dictionary encoding for the multi-value entries + MV_ENTRY_DICT(false, true); + + private final boolean _applicableToSV; + private final boolean _applicableToMV; + + DictIdCompressionType(boolean applicableToSV, boolean applicableToMV) { + _applicableToSV = applicableToSV; + _applicableToMV = applicableToMV; + } + + public boolean isApplicableToSV() { + return _applicableToSV; + } + + public boolean isApplicableToMV() { + return _applicableToMV; + } + + public boolean isApplicable(boolean isSingleValue) { + return isSingleValue ? isApplicableToSV() : isApplicableToMV(); + } +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java index 445dcc0b71..fcdbbe4fe0 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java @@ -21,19 +21,19 @@ package org.apache.pinot.segment.spi.index; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import java.io.IOException; import java.util.Map; import java.util.Objects; import javax.annotation.Nullable; import org.apache.pinot.segment.spi.compression.ChunkCompressionType; +import org.apache.pinot.segment.spi.compression.DictIdCompressionType; import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec; import org.apache.pinot.spi.config.table.IndexConfig; -import org.apache.pinot.spi.utils.JsonUtils; public class ForwardIndexConfig extends IndexConfig { public static final int DEFAULT_RAW_WRITER_VERSION = 2; - public static final ForwardIndexConfig DISABLED = new ForwardIndexConfig(true, null, null, null); + public static final ForwardIndexConfig DISABLED = new ForwardIndexConfig(true, null, null, null, null); public static final ForwardIndexConfig DEFAULT = new Builder().build(); @Nullable @@ -41,15 +41,20 @@ public class ForwardIndexConfig extends IndexConfig { private final boolean _deriveNumDocsPerChunk; private final int _rawIndexWriterVersion; + @Nullable + private final DictIdCompressionType _dictIdCompressionType; + @JsonCreator - public ForwardIndexConfig(@Nullable @JsonProperty("disabled") Boolean disabled, - @Nullable @JsonProperty("chunkCompressionType") ChunkCompressionType chunkCompressionType, + public ForwardIndexConfig(@JsonProperty("disabled") @Nullable Boolean disabled, + @JsonProperty("chunkCompressionType") @Nullable ChunkCompressionType chunkCompressionType, @JsonProperty("deriveNumDocsPerChunk") Boolean deriveNumDocsPerChunk, - @JsonProperty("rawIndexWriterVersion") Integer rawIndexWriterVersion) { + @JsonProperty("rawIndexWriterVersion") Integer rawIndexWriterVersion, + @JsonProperty("dictIdCompressionType") @Nullable DictIdCompressionType dictIdCompressionType) { super(disabled); _chunkCompressionType = chunkCompressionType; _deriveNumDocsPerChunk = deriveNumDocsPerChunk != null && deriveNumDocsPerChunk; _rawIndexWriterVersion = rawIndexWriterVersion == null ? DEFAULT_RAW_WRITER_VERSION : rawIndexWriterVersion; + _dictIdCompressionType = dictIdCompressionType; } @Nullable @@ -65,12 +70,17 @@ public class ForwardIndexConfig extends IndexConfig { return _rawIndexWriterVersion; } + @Nullable + public DictIdCompressionType getDictIdCompressionType() { + return _dictIdCompressionType; + } + @Override public boolean equals(Object o) { if (this == o) { return true; } - if (o == null || getClass() != o.getClass()) { + if (!(o instanceof ForwardIndexConfig)) { return false; } if (!super.equals(o)) { @@ -78,12 +88,14 @@ public class ForwardIndexConfig extends IndexConfig { } ForwardIndexConfig that = (ForwardIndexConfig) o; return _deriveNumDocsPerChunk == that._deriveNumDocsPerChunk - && _rawIndexWriterVersion == that._rawIndexWriterVersion && _chunkCompressionType == that._chunkCompressionType; + && _rawIndexWriterVersion == that._rawIndexWriterVersion && _chunkCompressionType == that._chunkCompressionType + && Objects.equals(_dictIdCompressionType, that._dictIdCompressionType); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), _chunkCompressionType, _deriveNumDocsPerChunk, _rawIndexWriterVersion); + return Objects.hash(super.hashCode(), _chunkCompressionType, _deriveNumDocsPerChunk, _rawIndexWriterVersion, + _dictIdCompressionType); } public static class Builder { @@ -92,6 +104,9 @@ public class ForwardIndexConfig extends IndexConfig { private boolean _deriveNumDocsPerChunk = false; private int _rawIndexWriterVersion = DEFAULT_RAW_WRITER_VERSION; + @Nullable + private DictIdCompressionType _dictIdCompressionType; + public Builder() { } @@ -99,6 +114,7 @@ public class ForwardIndexConfig extends IndexConfig { _chunkCompressionType = other.getChunkCompressionType(); _deriveNumDocsPerChunk = other._deriveNumDocsPerChunk; _rawIndexWriterVersion = other._rawIndexWriterVersion; + _dictIdCompressionType = other._dictIdCompressionType; } public Builder withCompressionType(ChunkCompressionType chunkCompressionType) { @@ -116,6 +132,39 @@ public class ForwardIndexConfig extends IndexConfig { return this; } + public Builder withDictIdCompressionType(DictIdCompressionType dictIdCompressionType) { + _dictIdCompressionType = dictIdCompressionType; + return this; + } + + public Builder withCompressionCodec(CompressionCodec compressionCodec) { + if (compressionCodec == null) { + _chunkCompressionType = null; + _dictIdCompressionType = null; + return this; + } + switch (compressionCodec) { + case PASS_THROUGH: + _chunkCompressionType = ChunkCompressionType.PASS_THROUGH; + break; + case SNAPPY: + _chunkCompressionType = ChunkCompressionType.SNAPPY; + break; + case ZSTANDARD: + _chunkCompressionType = ChunkCompressionType.ZSTANDARD; + break; + case LZ4: + _chunkCompressionType = ChunkCompressionType.LZ4; + break; + case MV_ENTRY_DICT: + _dictIdCompressionType = DictIdCompressionType.MV_ENTRY_DICT; + break; + default: + throw new IllegalStateException("Unsupported compression codec: " + compressionCodec); + } + return this; + } + public Builder withLegacyProperties(Map<String, Map<String, String>> propertiesByCol, String colName) { if (propertiesByCol != null) { Map<String, String> colProps = propertiesByCol.get(colName); @@ -139,18 +188,8 @@ public class ForwardIndexConfig extends IndexConfig { } public ForwardIndexConfig build() { - return new ForwardIndexConfig(false, _chunkCompressionType, _deriveNumDocsPerChunk, _rawIndexWriterVersion); - } - } - - @Override - public String toString() { - try { - return JsonUtils.objectToString(this); - } catch (IOException ex) { - return "{" + "\"chunkCompressionType\":" + _chunkCompressionType - + ", \"deriveNumDocsPerChunk\":" + _deriveNumDocsPerChunk - + ", \"rawIndexWriterVersion\":" + _rawIndexWriterVersion + '}'; + return new ForwardIndexConfig(false, _chunkCompressionType, _deriveNumDocsPerChunk, _rawIndexWriterVersion, + _dictIdCompressionType); } } } diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java index 5882986edd..e067376472 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java @@ -25,6 +25,7 @@ import lombok.AllArgsConstructor; import lombok.EqualsAndHashCode; import lombok.Getter; import org.apache.pinot.segment.spi.compression.ChunkCompressionType; +import org.apache.pinot.segment.spi.compression.DictIdCompressionType; import org.apache.pinot.segment.spi.index.IndexReader; import org.apache.pinot.spi.data.FieldSpec.DataType; import org.apache.pinot.spi.utils.BigDecimalUtils; @@ -57,12 +58,20 @@ public interface ForwardIndexReader<T extends ForwardIndexReaderContext> extends /** * Returns the compression type (if valid). Only valid for RAW forward index columns implemented in * BaseChunkForwardIndexReader. - * @return */ + @Nullable default ChunkCompressionType getCompressionType() { return null; } + /** + * Returns the compression type for dictionary encoded forward index. + */ + @Nullable + default DictIdCompressionType getDictIdCompressionType() { + return null; + } + /** * Returns the length of the longest entry. Only valid for RAW forward index columns implemented in * BaseChunkForwardIndexReader. Returns -1 otherwise. diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java index 30a42189a4..47a846d878 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java @@ -119,7 +119,29 @@ public class FieldConfig extends BaseJsonConfig { } public enum CompressionCodec { - PASS_THROUGH, SNAPPY, ZSTANDARD, LZ4 + PASS_THROUGH(true, false), + SNAPPY(true, false), + ZSTANDARD(true, false), + LZ4(true, false), + + // For MV dictionary encoded forward index, add a second level dictionary encoding for the multi-value entries + MV_ENTRY_DICT(false, true); + + private final boolean _applicableToRawIndex; + private final boolean _applicableToDictEncodedIndex; + + CompressionCodec(boolean applicableToRawIndex, boolean applicableToDictEncodedIndex) { + _applicableToRawIndex = applicableToRawIndex; + _applicableToDictEncodedIndex = applicableToDictEncodedIndex; + } + + public boolean isApplicableToRawIndex() { + return _applicableToRawIndex; + } + + public boolean isApplicableToDictEncodedIndex() { + return _applicableToDictEncodedIndex; + } } public String getName() { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org