This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new f9ab252 make index creator provision pluggable (#7885) f9ab252 is described below commit f9ab252980e4f973d60b9db2a0f5e7d5764bdaf2 Author: Richard Startin <rich...@startree.ai> AuthorDate: Thu Dec 16 20:21:48 2021 +0000 make index creator provision pluggable (#7885) This allows index creation to be intercepted, so that the current static logic in SegmentIndexCreator can be extended or overridden. This is achieved by introducing a new interface IndexCreatorProvider which provides various new index creators from an IndexCreationContext which bundles all information about index creation. External users can register a decorator which can enhance or entirely replace the default index creator provision logic. Typically, a registered decorator should pa [...] --- .../pinot/core/minion/RawIndexConverter.java | 19 +- .../org/apache/pinot/perf/BenchmarkRangeIndex.java | 11 +- .../ConvertToRawIndexTaskExecutor.java | 4 +- pinot-segment-local/pom.xml | 3 +- .../creator/impl/DefaultIndexCreatorProvider.java | 274 ++++++++++++ .../creator/impl/SegmentColumnarIndexCreator.java | 290 +++---------- .../impl/inv/BitSlicedRangeIndexCreator.java | 55 ++- .../segment/index/loader/IndexHandlerFactory.java | 20 +- .../loader/bloomfilter/BloomFilterHandler.java | 12 +- .../loader/invertedindex/FSTIndexHandler.java | 17 +- .../index/loader/invertedindex/H3IndexHandler.java | 12 +- .../loader/invertedindex/InvertedIndexHandler.java | 14 +- .../loader/invertedindex/JsonIndexHandler.java | 16 +- .../loader/invertedindex/RangeIndexHandler.java | 25 +- .../loader/invertedindex/TextIndexHandler.java | 10 +- .../creator/impl/IndexCreatorOverrideTest.java | 88 ++++ .../index/creator/BitSlicedIndexCreatorTest.java | 20 +- pinot-segment-spi/pom.xml | 6 + .../spi/creator/BloomFilterCreatorProvider.java | 36 ++ .../spi/creator/ForwardIndexCreatorProvider.java | 35 ++ .../creator/GeoSpatialIndexCreatorProvider.java | 37 ++ .../segment/spi/creator/IndexCreationContext.java | 467 +++++++++++++++++++++ .../segment/spi/creator/IndexCreatorProvider.java | 28 ++ .../segment/spi/creator/IndexCreatorProviders.java | 159 +++++++ .../spi/creator/InvertedIndexCreatorProvider.java | 36 ++ .../spi/creator/JsonIndexCreatorProvider.java | 36 ++ .../spi/creator/RangeIndexCreatorProvider.java | 36 ++ .../spi/creator/TextIndexCreatorProvider.java | 37 ++ .../spi/index/metadata/ColumnMetadataImpl.java | 4 + .../spi/creator/IndexCreatorProvidersTest.java | 53 +++ .../converter/DictionaryToRawIndexConverter.java | 4 +- 31 files changed, 1538 insertions(+), 326 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java b/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java index 78552dd..f6d2a7d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java @@ -26,8 +26,6 @@ import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; -import org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter; -import org.apache.pinot.segment.local.segment.creator.impl.SegmentColumnarIndexCreator; import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.segment.local.utils.CrcUtils; @@ -36,12 +34,15 @@ import org.apache.pinot.segment.spi.ImmutableSegment; import org.apache.pinot.segment.spi.SegmentMetadata; import org.apache.pinot.segment.spi.V1Constants; import org.apache.pinot.segment.spi.compression.ChunkCompressionType; +import org.apache.pinot.segment.spi.creator.ForwardIndexCreatorProvider; +import org.apache.pinot.segment.spi.creator.IndexCreationContext; import org.apache.pinot.segment.spi.creator.SegmentVersion; import org.apache.pinot.segment.spi.datasource.DataSource; import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator; import org.apache.pinot.segment.spi.index.reader.Dictionary; import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext; +import org.apache.pinot.spi.data.DimensionFieldSpec; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.FieldSpec.DataType; import org.apache.pinot.spi.data.MetricFieldSpec; @@ -82,13 +83,14 @@ public class RawIndexConverter { private final File _convertedIndexDir; private final PropertiesConfiguration _convertedProperties; private final String _columnsToConvert; + private final ForwardIndexCreatorProvider _indexCreatorProvider; /** * NOTE: original segment should be in V1 format. * TODO: support V3 format */ public RawIndexConverter(String rawTableName, File originalIndexDir, File convertedIndexDir, - @Nullable String columnsToConvert) + @Nullable String columnsToConvert, ForwardIndexCreatorProvider indexCreatorProvider) throws Exception { FileUtils.copyDirectory(originalIndexDir, convertedIndexDir); IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(); @@ -101,6 +103,7 @@ public class RawIndexConverter { _convertedProperties = new PropertiesConfiguration(new File(_convertedIndexDir, V1Constants.MetadataKeys.METADATA_FILE_NAME)); _columnsToConvert = columnsToConvert; + _indexCreatorProvider = indexCreatorProvider; } public boolean convert() @@ -205,11 +208,11 @@ public class RawIndexConverter { assert dictionary != null; DataType storedType = dictionary.getValueType(); int numDocs = _originalSegmentMetadata.getTotalDocs(); - int lengthOfLongestEntry = _originalSegmentMetadata.getColumnMetadataFor(columnName).getColumnMaxLength(); - try (ForwardIndexCreator rawIndexCreator = SegmentColumnarIndexCreator - .getRawIndexCreatorForSVColumn(_convertedIndexDir, ChunkCompressionType.LZ4, columnName, - storedType, numDocs, lengthOfLongestEntry, false, - BaseChunkSVForwardIndexWriter.DEFAULT_VERSION); + ColumnMetadata columnMetadata = _originalSegmentMetadata.getColumnMetadataFor(columnName); + try (ForwardIndexCreator rawIndexCreator = _indexCreatorProvider.newForwardIndexCreator( + IndexCreationContext.builder().withIndexDir(_convertedIndexDir).withColumnMetadata(columnMetadata) + .withFieldSpec(new DimensionFieldSpec(columnName, storedType, columnMetadata.isSingleValue())) + .withDictionary(false).build().forForwardIndex(ChunkCompressionType.LZ4, null)); ForwardIndexReaderContext readerContext = reader.createContext()) { switch (storedType) { case INT: diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRangeIndex.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRangeIndex.java index 60db2a3..e250eb6 100644 --- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRangeIndex.java +++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRangeIndex.java @@ -191,14 +191,7 @@ public class BenchmarkRangeIndex { public void setup() throws IOException { super.setup(); - ColumnMetadata metadata = new ColumnMetadataImpl.Builder() - .setFieldSpec(_fieldSpec) - .setTotalDocs(_numDocs) - .setHasDictionary(false) - .setMaxValue(max()) - .setMinValue(min()) - .build(); - _creator = new BitSlicedRangeIndexCreator(_indexDir, metadata); + _creator = new BitSlicedRangeIndexCreator(_indexDir, _fieldSpec, min(), max()); } } @@ -328,7 +321,7 @@ public class BenchmarkRangeIndex { @Override protected RawValueBasedInvertedIndexCreator newCreator() { - return new BitSlicedRangeIndexCreator(_indexDir, metadata()); + return new BitSlicedRangeIndexCreator(_indexDir, _fieldSpec, min(), max()); } } diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutor.java index 7a5a33f..ba3e610 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutor.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutor.java @@ -27,6 +27,7 @@ import org.apache.pinot.core.minion.PinotTaskConfig; import org.apache.pinot.core.minion.RawIndexConverter; import org.apache.pinot.plugin.minion.tasks.BaseSingleSegmentConversionExecutor; import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult; +import org.apache.pinot.segment.spi.creator.IndexCreatorProviders; import org.apache.pinot.spi.utils.builder.TableNameBuilder; @@ -39,7 +40,8 @@ public class ConvertToRawIndexTaskExecutor extends BaseSingleSegmentConversionEx String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY); String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); new RawIndexConverter(rawTableName, indexDir, workingDir, - configs.get(MinionConstants.ConvertToRawIndexTask.COLUMNS_TO_CONVERT_KEY)).convert(); + configs.get(MinionConstants.ConvertToRawIndexTask.COLUMNS_TO_CONVERT_KEY), + IndexCreatorProviders.getIndexCreatorProvider()).convert(); return new SegmentConversionResult.Builder().setFile(workingDir) .setTableNameWithType(configs.get(MinionConstants.TABLE_NAME_KEY)) .setSegmentName(configs.get(MinionConstants.SEGMENT_NAME_KEY)).build(); diff --git a/pinot-segment-local/pom.xml b/pinot-segment-local/pom.xml index c60da9c..e1bab72 100644 --- a/pinot-segment-local/pom.xml +++ b/pinot-segment-local/pom.xml @@ -122,9 +122,10 @@ <artifactId>testng</artifactId> <scope>test</scope> </dependency> + <!-- required for static mock in IndexCreatorOverrideTest --> <dependency> <groupId>org.mockito</groupId> - <artifactId>mockito-core</artifactId> + <artifactId>mockito-inline</artifactId> <scope>test</scope> </dependency> <dependency> diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/DefaultIndexCreatorProvider.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/DefaultIndexCreatorProvider.java new file mode 100644 index 0000000..2753459 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/DefaultIndexCreatorProvider.java @@ -0,0 +1,274 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.creator.impl; + +import com.google.common.base.Preconditions; +import java.io.File; +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter; +import org.apache.pinot.segment.local.segment.creator.impl.bloom.OnHeapGuavaBloomFilterCreator; +import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueFixedByteRawIndexCreator; +import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueUnsortedForwardIndexCreator; +import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator; +import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueFixedByteRawIndexCreator; +import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueSortedForwardIndexCreator; +import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueUnsortedForwardIndexCreator; +import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueVarByteRawIndexCreator; +import org.apache.pinot.segment.local.segment.creator.impl.inv.BitSlicedRangeIndexCreator; +import org.apache.pinot.segment.local.segment.creator.impl.inv.OffHeapBitmapInvertedIndexCreator; +import org.apache.pinot.segment.local.segment.creator.impl.inv.OnHeapBitmapInvertedIndexCreator; +import org.apache.pinot.segment.local.segment.creator.impl.inv.RangeIndexCreator; +import org.apache.pinot.segment.local.segment.creator.impl.inv.geospatial.OffHeapH3IndexCreator; +import org.apache.pinot.segment.local.segment.creator.impl.inv.geospatial.OnHeapH3IndexCreator; +import org.apache.pinot.segment.local.segment.creator.impl.inv.json.OffHeapJsonIndexCreator; +import org.apache.pinot.segment.local.segment.creator.impl.inv.json.OnHeapJsonIndexCreator; +import org.apache.pinot.segment.local.segment.creator.impl.inv.text.LuceneFSTIndexCreator; +import org.apache.pinot.segment.local.segment.creator.impl.text.LuceneTextIndexCreator; +import org.apache.pinot.segment.local.utils.nativefst.NativeFSTIndexCreator; +import org.apache.pinot.segment.spi.compression.ChunkCompressionType; +import org.apache.pinot.segment.spi.creator.IndexCreationContext; +import org.apache.pinot.segment.spi.creator.IndexCreatorProvider; +import org.apache.pinot.segment.spi.index.creator.BloomFilterCreator; +import org.apache.pinot.segment.spi.index.creator.CombinedInvertedIndexCreator; +import org.apache.pinot.segment.spi.index.creator.DictionaryBasedInvertedIndexCreator; +import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator; +import org.apache.pinot.segment.spi.index.creator.GeoSpatialIndexCreator; +import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator; +import org.apache.pinot.segment.spi.index.creator.TextIndexCreator; +import org.apache.pinot.segment.spi.index.reader.H3IndexResolution; +import org.apache.pinot.spi.config.table.FSTType; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.data.FieldSpec; + + +/** + * This class centralizes logic for how to create indexes. It can be overridden + * by SPI {@see IndexCreatorProviders} and should not be constructed directly, but + * accessed only via {@see IndexCreatorProviders#getIndexCreatorProvider}. Unless + * a user provides an override, this is the logic which will be used to create + * each index type. + */ +public final class DefaultIndexCreatorProvider implements IndexCreatorProvider { + + @Override + public ForwardIndexCreator newForwardIndexCreator(IndexCreationContext.Forward context) + throws Exception { + if (!context.hasDictionary()) { + boolean deriveNumDocsPerChunk = + shouldDeriveNumDocsPerChunk(context.getFieldSpec().getName(), context.getColumnProperties()); + int writerVersion = getRawIndexWriterVersion(context.getFieldSpec().getName(), context.getColumnProperties()); + if (context.getFieldSpec().isSingleValueField()) { + return getRawIndexCreatorForSVColumn(context.getIndexDir(), context.getChunkCompressionType(), + context.getFieldSpec().getName(), context.getFieldSpec().getDataType().getStoredType(), + context.getTotalDocs(), context.getLengthOfLongestEntry(), deriveNumDocsPerChunk, writerVersion); + } else { + return getRawIndexCreatorForMVColumn(context.getIndexDir(), context.getChunkCompressionType(), + context.getFieldSpec().getName(), context.getFieldSpec().getDataType().getStoredType(), + context.getTotalDocs(), context.getMaxNumberOfMultiValueElements(), deriveNumDocsPerChunk, writerVersion, + context.getMaxRowLengthInBytes()); + } + } else { + if (context.getFieldSpec().isSingleValueField()) { + if (context.isSorted()) { + return new SingleValueSortedForwardIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(), + context.getCardinality()); + } else { + return new SingleValueUnsortedForwardIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(), + context.getCardinality(), context.getTotalDocs()); + } + } else { + return new MultiValueUnsortedForwardIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(), + context.getCardinality(), context.getTotalDocs(), context.getTotalNumberOfEntries()); + } + } + } + + @Override + public DictionaryBasedInvertedIndexCreator newInvertedIndexCreator(IndexCreationContext.Inverted context) + throws IOException { + if (context.isOnHeap()) { + return new OnHeapBitmapInvertedIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(), + context.getCardinality()); + } else { + return new OffHeapBitmapInvertedIndexCreator(context.getIndexDir(), context.getFieldSpec(), + context.getCardinality(), context.getTotalDocs(), context.getTotalNumberOfEntries()); + } + } + + @Override + public JsonIndexCreator newJsonIndexCreator(IndexCreationContext.Json context) + throws IOException { + Preconditions.checkState(context.getFieldSpec().isSingleValueField(), + "Json index is currently only supported on single-value columns"); + Preconditions.checkState(context.getFieldSpec().getDataType().getStoredType() == FieldSpec.DataType.STRING, + "Json index is currently only supported on STRING columns"); + return context.isOnHeap() ? new OnHeapJsonIndexCreator(context.getIndexDir(), context.getFieldSpec().getName()) + : new OffHeapJsonIndexCreator(context.getIndexDir(), context.getFieldSpec().getName()); + } + + @Override + public TextIndexCreator newTextIndexCreator(IndexCreationContext.Text context) + throws IOException { + if (context.isFst()) { + Preconditions.checkState(context.getFieldSpec().isSingleValueField(), + "FST index is currently only supported on single-value columns"); + Preconditions.checkState(context.getFieldSpec().getDataType().getStoredType() == FieldSpec.DataType.STRING, + "FST index is currently only supported on STRING type columns"); + Preconditions.checkState(context.hasDictionary(), + "FST index is currently only supported on dictionary-encoded columns"); + String[] sortedValues = context.getSortedUniqueElementsArray(); + if (context.getFstType() == FSTType.NATIVE) { + return new NativeFSTIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(), sortedValues); + } else { + return new LuceneFSTIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(), sortedValues); + } + } else { + Preconditions.checkState(context.getFieldSpec().getDataType().getStoredType() == FieldSpec.DataType.STRING, + "Text index is currently only supported on STRING type columns"); + return new LuceneTextIndexCreator(context.getFieldSpec().getName(), context.getIndexDir(), + context.isCommitOnClose()); + } + } + + @Override + public GeoSpatialIndexCreator newGeoSpatialIndexCreator(IndexCreationContext.Geospatial context) + throws IOException { + Preconditions.checkState(context.getFieldSpec().isSingleValueField(), + "H3 index is currently only supported on single-value columns"); + Preconditions.checkState(context.getFieldSpec().getDataType().getStoredType() == FieldSpec.DataType.BYTES, + "H3 index is currently only supported on BYTES columns"); + H3IndexResolution resolution = Objects.requireNonNull(context.getH3IndexConfig()).getResolution(); + return context.isOnHeap() ? new OnHeapH3IndexCreator(context.getIndexDir(), context.getFieldSpec().getName(), + resolution) : new OffHeapH3IndexCreator(context.getIndexDir(), context.getFieldSpec().getName(), resolution); + } + + public static boolean shouldDeriveNumDocsPerChunk(String columnName, + Map<String, Map<String, String>> columnProperties) { + if (columnProperties != null) { + Map<String, String> properties = columnProperties.get(columnName); + return properties != null && Boolean.parseBoolean( + properties.get(FieldConfig.DERIVE_NUM_DOCS_PER_CHUNK_RAW_INDEX_KEY)); + } + return false; + } + + public static int getRawIndexWriterVersion(String columnName, Map<String, Map<String, String>> columnProperties) { + if (columnProperties != null && columnProperties.get(columnName) != null) { + Map<String, String> properties = columnProperties.get(columnName); + String version = properties.get(FieldConfig.RAW_INDEX_WRITER_VERSION); + if (version == null) { + return BaseChunkSVForwardIndexWriter.DEFAULT_VERSION; + } + return Integer.parseInt(version); + } + return BaseChunkSVForwardIndexWriter.DEFAULT_VERSION; + } + + /** + * Helper method to build the raw index creator for the column. + * Assumes that column to be indexed is single valued. + * + * @param file Output index file + * @param column Column name + * @param totalDocs Total number of documents to index + * @param lengthOfLongestEntry Length of longest entry + * @param deriveNumDocsPerChunk true if varbyte writer should auto-derive the number of rows per chunk + * @param writerVersion version to use for the raw index writer + * @return raw index creator + */ + public static ForwardIndexCreator getRawIndexCreatorForSVColumn(File file, ChunkCompressionType compressionType, + String column, FieldSpec.DataType dataType, int totalDocs, int lengthOfLongestEntry, + boolean deriveNumDocsPerChunk, + int writerVersion) + throws IOException { + switch (dataType.getStoredType()) { + case INT: + case LONG: + case FLOAT: + case DOUBLE: + return new SingleValueFixedByteRawIndexCreator(file, compressionType, column, totalDocs, dataType, + writerVersion); + case STRING: + case BYTES: + return new SingleValueVarByteRawIndexCreator(file, compressionType, column, totalDocs, dataType, + lengthOfLongestEntry, deriveNumDocsPerChunk, writerVersion); + default: + throw new UnsupportedOperationException("Data type not supported for raw indexing: " + dataType); + } + } + + /** + * Helper method to build the raw index creator for the column. + * Assumes that column to be indexed is single valued. + * + * @param file Output index file + * @param column Column name + * @param totalDocs Total number of documents to index + * @param deriveNumDocsPerChunk true if varbyte writer should auto-derive the number of rows + * per chunk + * @param writerVersion version to use for the raw index writer + * @param maxRowLengthInBytes the length of the longest row in bytes + * @return raw index creator + */ + public static ForwardIndexCreator getRawIndexCreatorForMVColumn(File file, ChunkCompressionType compressionType, + String column, FieldSpec.DataType dataType, final int totalDocs, int maxNumberOfMultiValueElements, + boolean deriveNumDocsPerChunk, int writerVersion, int maxRowLengthInBytes) + throws IOException { + switch (dataType.getStoredType()) { + case INT: + case LONG: + case FLOAT: + case DOUBLE: + return new MultiValueFixedByteRawIndexCreator(file, compressionType, column, totalDocs, dataType, + maxNumberOfMultiValueElements, deriveNumDocsPerChunk, writerVersion); + case STRING: + case BYTES: + return new MultiValueVarByteRawIndexCreator(file, compressionType, column, totalDocs, dataType, writerVersion, + maxRowLengthInBytes, maxNumberOfMultiValueElements); + default: + throw new UnsupportedOperationException("Data type not supported for raw indexing: " + dataType); + } + } + + @Override + public BloomFilterCreator newBloomFilterCreator(IndexCreationContext.BloomFilter context) + throws IOException { + return new OnHeapGuavaBloomFilterCreator(context.getIndexDir(), context.getFieldSpec().getName(), + context.getCardinality(), Objects.requireNonNull(context.getBloomFilterConfig())); + } + + @Override + public CombinedInvertedIndexCreator newRangeIndexCreator(IndexCreationContext.Range context) + throws IOException { + if (context.getRangeIndexVersion() == BitSlicedRangeIndexCreator.VERSION && context.getFieldSpec() + .isSingleValueField()) { + if (context.hasDictionary()) { + return new BitSlicedRangeIndexCreator(context.getIndexDir(), context.getFieldSpec(), context.getCardinality()); + } + return new BitSlicedRangeIndexCreator(context.getIndexDir(), context.getFieldSpec(), context.getMin(), + context.getMax()); + } + // default to RangeIndexCreator for the time being + return new RangeIndexCreator(context.getIndexDir(), context.getFieldSpec(), + context.hasDictionary() ? FieldSpec.DataType.INT : context.getFieldSpec().getDataType(), -1, + -1, context.getTotalDocs(), context.getTotalNumberOfEntries()); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java index 764d36f..b01dd3f 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java @@ -33,28 +33,14 @@ import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.pinot.common.utils.FileUtils; import org.apache.pinot.segment.local.io.util.PinotDataBitSet; -import org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter; -import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueFixedByteRawIndexCreator; -import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueUnsortedForwardIndexCreator; -import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator; -import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueFixedByteRawIndexCreator; -import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueSortedForwardIndexCreator; -import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueUnsortedForwardIndexCreator; -import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueVarByteRawIndexCreator; -import org.apache.pinot.segment.local.segment.creator.impl.inv.OffHeapBitmapInvertedIndexCreator; -import org.apache.pinot.segment.local.segment.creator.impl.inv.OnHeapBitmapInvertedIndexCreator; -import org.apache.pinot.segment.local.segment.creator.impl.inv.geospatial.OffHeapH3IndexCreator; -import org.apache.pinot.segment.local.segment.creator.impl.inv.geospatial.OnHeapH3IndexCreator; -import org.apache.pinot.segment.local.segment.creator.impl.inv.json.OffHeapJsonIndexCreator; -import org.apache.pinot.segment.local.segment.creator.impl.inv.json.OnHeapJsonIndexCreator; -import org.apache.pinot.segment.local.segment.creator.impl.inv.text.LuceneFSTIndexCreator; import org.apache.pinot.segment.local.segment.creator.impl.nullvalue.NullValueVectorCreator; -import org.apache.pinot.segment.local.segment.creator.impl.text.LuceneTextIndexCreator; import org.apache.pinot.segment.local.utils.GeometrySerializer; -import org.apache.pinot.segment.local.utils.nativefst.NativeFSTIndexCreator; import org.apache.pinot.segment.spi.V1Constants; import org.apache.pinot.segment.spi.compression.ChunkCompressionType; import org.apache.pinot.segment.spi.creator.ColumnIndexCreationInfo; +import org.apache.pinot.segment.spi.creator.IndexCreationContext; +import org.apache.pinot.segment.spi.creator.IndexCreatorProvider; +import org.apache.pinot.segment.spi.creator.IndexCreatorProviders; import org.apache.pinot.segment.spi.creator.SegmentCreator; import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; import org.apache.pinot.segment.spi.index.creator.DictionaryBasedInvertedIndexCreator; @@ -64,9 +50,7 @@ import org.apache.pinot.segment.spi.index.creator.H3IndexConfig; import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator; import org.apache.pinot.segment.spi.index.creator.SegmentIndexCreationInfo; import org.apache.pinot.segment.spi.index.creator.TextIndexCreator; -import org.apache.pinot.segment.spi.index.reader.H3IndexResolution; import org.apache.pinot.segment.spi.partition.PartitionFunction; -import org.apache.pinot.spi.config.table.FSTType; import org.apache.pinot.spi.config.table.FieldConfig; import org.apache.pinot.spi.data.DateTimeFieldSpec; import org.apache.pinot.spi.data.DateTimeFormatSpec; @@ -99,6 +83,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { private SegmentGeneratorConfig _config; private Map<String, ColumnIndexCreationInfo> _indexCreationInfoMap; + private final IndexCreatorProvider _indexCreatorProvider = IndexCreatorProviders.getIndexCreatorProvider(); private final Map<String, SegmentDictionaryCreator> _dictionaryCreatorMap = new HashMap<>(); private final Map<String, ForwardIndexCreator> _forwardIndexCreatorMap = new HashMap<>(); private final Map<String, DictionaryBasedInvertedIndexCreator> _invertedIndexCreatorMap = new HashMap<>(); @@ -179,126 +164,70 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { } String columnName = fieldSpec.getName(); - DataType storedType = fieldSpec.getDataType().getStoredType(); - ColumnIndexCreationInfo indexCreationInfo = indexCreationInfoMap.get(columnName); - Preconditions.checkNotNull(indexCreationInfo, "Missing index creation info for column: %s", columnName); - boolean dictEnabledColumn = createDictionaryForColumn(indexCreationInfo, segmentCreationSpec, fieldSpec); - + ColumnIndexCreationInfo columnIndexCreationInfo = indexCreationInfoMap.get(columnName); + Preconditions.checkNotNull(columnIndexCreationInfo, "Missing index creation info for column: %s", columnName); + boolean dictEnabledColumn = createDictionaryForColumn(columnIndexCreationInfo, segmentCreationSpec, fieldSpec); + Preconditions.checkState(dictEnabledColumn || !invertedIndexColumns.contains(columnName), + "Cannot create inverted index for raw index column: %s", columnName); + + IndexCreationContext.Common context = IndexCreationContext.builder() + .withIndexDir(_indexDir) + .withCardinality(columnIndexCreationInfo.getDistinctValueCount()) + .withDictionary(dictEnabledColumn) + .withFieldSpec(fieldSpec) + .withTotalDocs(segmentIndexCreationInfo.getTotalDocs()) + .withTotalNumberOfEntries(columnIndexCreationInfo.getTotalNumberOfEntries()) + .withColumnIndexCreationInfo(columnIndexCreationInfo) + .sorted(columnIndexCreationInfo.isSorted()) + .onHeap(segmentCreationSpec.isOnHeap()) + .build(); + // Initialize forward index creator + ChunkCompressionType chunkCompressionType = + dictEnabledColumn ? null : getColumnCompressionType(segmentCreationSpec, fieldSpec); + _forwardIndexCreatorMap.put(columnName, _indexCreatorProvider.newForwardIndexCreator( + context.forForwardIndex(chunkCompressionType, segmentCreationSpec.getColumnProperties()))); + + // Initialize inverted index creator; skip creating inverted index if sorted + if (invertedIndexColumns.contains(columnName) && !columnIndexCreationInfo.isSorted()) { + _invertedIndexCreatorMap.put(columnName, + _indexCreatorProvider.newInvertedIndexCreator(context.forInvertedIndex())); + } if (dictEnabledColumn) { // Create dictionary-encoded index - // Initialize dictionary creator SegmentDictionaryCreator dictionaryCreator = - new SegmentDictionaryCreator(indexCreationInfo.getSortedUniqueElementsArray(), fieldSpec, _indexDir, - indexCreationInfo.isUseVarLengthDictionary()); + new SegmentDictionaryCreator(columnIndexCreationInfo.getSortedUniqueElementsArray(), fieldSpec, _indexDir, + columnIndexCreationInfo.isUseVarLengthDictionary()); _dictionaryCreatorMap.put(columnName, dictionaryCreator); - // Create dictionary try { dictionaryCreator.build(); } catch (Exception e) { LOGGER.error("Error building dictionary for field: {}, cardinality: {}, number of bytes per entry: {}", - fieldSpec.getName(), indexCreationInfo.getDistinctValueCount(), dictionaryCreator.getNumBytesPerEntry()); + fieldSpec.getName(), columnIndexCreationInfo.getDistinctValueCount(), + dictionaryCreator.getNumBytesPerEntry()); throw e; } - - // Initialize forward index creator - int cardinality = indexCreationInfo.getDistinctValueCount(); - if (fieldSpec.isSingleValueField()) { - if (indexCreationInfo.isSorted()) { - _forwardIndexCreatorMap.put(columnName, - new SingleValueSortedForwardIndexCreator(_indexDir, columnName, cardinality)); - } else { - _forwardIndexCreatorMap.put(columnName, - new SingleValueUnsortedForwardIndexCreator(_indexDir, columnName, cardinality, _totalDocs)); - } - } else { - _forwardIndexCreatorMap.put(columnName, - new MultiValueUnsortedForwardIndexCreator(_indexDir, columnName, cardinality, _totalDocs, - indexCreationInfo.getTotalNumberOfEntries())); - } - - // Initialize inverted index creator; skip creating inverted index if sorted - if (invertedIndexColumns.contains(columnName) && !indexCreationInfo.isSorted()) { - if (segmentCreationSpec.isOnHeap()) { - _invertedIndexCreatorMap.put(columnName, - new OnHeapBitmapInvertedIndexCreator(_indexDir, columnName, cardinality)); - } else { - _invertedIndexCreatorMap.put(columnName, - new OffHeapBitmapInvertedIndexCreator(_indexDir, fieldSpec, cardinality, _totalDocs, - indexCreationInfo.getTotalNumberOfEntries())); - } - } - } else { - // Create raw index - Preconditions.checkState(!invertedIndexColumns.contains(columnName), - "Cannot create inverted index for raw index column: %s", columnName); - - ChunkCompressionType compressionType = getColumnCompressionType(segmentCreationSpec, fieldSpec); - - // Initialize forward index creator - boolean deriveNumDocsPerChunk = - shouldDeriveNumDocsPerChunk(columnName, segmentCreationSpec.getColumnProperties()); - int writerVersion = rawIndexWriterVersion(columnName, segmentCreationSpec.getColumnProperties()); - if (fieldSpec.isSingleValueField()) { - _forwardIndexCreatorMap.put(columnName, - getRawIndexCreatorForSVColumn(_indexDir, compressionType, columnName, storedType, _totalDocs, - indexCreationInfo.getLengthOfLongestEntry(), deriveNumDocsPerChunk, writerVersion)); - } else { - _forwardIndexCreatorMap.put(columnName, - getRawIndexCreatorForMVColumn(_indexDir, compressionType, columnName, storedType, _totalDocs, - indexCreationInfo.getMaxNumberOfMultiValueElements(), deriveNumDocsPerChunk, writerVersion, - indexCreationInfo.getMaxRowLengthInBytes())); - } } if (textIndexColumns.contains(columnName)) { - // Initialize text index creator - Preconditions.checkState(storedType == DataType.STRING, - "Text index is currently only supported on STRING type columns"); - _textIndexCreatorMap.put(columnName, - new LuceneTextIndexCreator(columnName, _indexDir, true /* commitOnClose */)); + _textIndexCreatorMap.put(columnName, _indexCreatorProvider.newTextIndexCreator(context.forTextIndex(true))); } if (fstIndexColumns.contains(columnName)) { - Preconditions.checkState(fieldSpec.isSingleValueField(), - "FST index is currently only supported on single-value columns"); - Preconditions.checkState(storedType == DataType.STRING, - "FST index is currently only supported on STRING type columns"); - Preconditions.checkState(dictEnabledColumn, - "FST index is currently only supported on dictionary-encoded columns"); - String[] sortedValues = (String[]) indexCreationInfo.getSortedUniqueElementsArray(); - TextIndexCreator textIndexCreator; - if (_config.getFSTIndexType() == FSTType.NATIVE) { - textIndexCreator = new NativeFSTIndexCreator(_indexDir, columnName, sortedValues); - } else { - textIndexCreator = new LuceneFSTIndexCreator(_indexDir, columnName, sortedValues); - } - - _fstIndexCreatorMap.put(columnName, textIndexCreator); + _fstIndexCreatorMap.put(columnName, _indexCreatorProvider.newTextIndexCreator( + context.forFSTIndex(_config.getFSTIndexType(), + (String[]) columnIndexCreationInfo.getSortedUniqueElementsArray()))); } if (jsonIndexColumns.contains(columnName)) { - Preconditions.checkState(fieldSpec.isSingleValueField(), - "Json index is currently only supported on single-value columns"); - Preconditions.checkState(storedType == DataType.STRING, - "Json index is currently only supported on STRING columns"); - JsonIndexCreator jsonIndexCreator = - segmentCreationSpec.isOnHeap() ? new OnHeapJsonIndexCreator(_indexDir, columnName) - : new OffHeapJsonIndexCreator(_indexDir, columnName); - _jsonIndexCreatorMap.put(columnName, jsonIndexCreator); + _jsonIndexCreatorMap.put(columnName, _indexCreatorProvider.newJsonIndexCreator(context.forJsonIndex())); } H3IndexConfig h3IndexConfig = h3IndexConfigs.get(columnName); if (h3IndexConfig != null) { - Preconditions.checkState(fieldSpec.isSingleValueField(), - "H3 index is currently only supported on single-value columns"); - Preconditions.checkState(storedType == DataType.BYTES, "H3 index is currently only supported on BYTES columns"); - H3IndexResolution resolution = h3IndexConfig.getResolution(); - GeoSpatialIndexCreator h3IndexCreator = - segmentCreationSpec.isOnHeap() ? new OnHeapH3IndexCreator(_indexDir, columnName, resolution) - : new OffHeapH3IndexCreator(_indexDir, columnName, resolution); - _h3IndexCreatorMap.put(columnName, h3IndexCreator); + _h3IndexCreatorMap.put(columnName, + _indexCreatorProvider.newGeoSpatialIndexCreator(context.forGeospatialIndex(h3IndexConfig))); } _nullHandlingEnabled = _config.isNullHandlingEnabled(); @@ -309,26 +238,29 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { } } - public static boolean shouldDeriveNumDocsPerChunk(String columnName, - Map<String, Map<String, String>> columnProperties) { - if (columnProperties != null) { - Map<String, String> properties = columnProperties.get(columnName); - return properties != null && Boolean.parseBoolean( - properties.get(FieldConfig.DERIVE_NUM_DOCS_PER_CHUNK_RAW_INDEX_KEY)); - } - return false; - } - - public static int rawIndexWriterVersion(String columnName, Map<String, Map<String, String>> columnProperties) { - if (columnProperties != null && columnProperties.get(columnName) != null) { - Map<String, String> properties = columnProperties.get(columnName); - String version = properties.get(FieldConfig.RAW_INDEX_WRITER_VERSION); - if (version == null) { - return BaseChunkSVForwardIndexWriter.DEFAULT_VERSION; - } - return Integer.parseInt(version); + /** + * Returns true if dictionary should be created for a column, false otherwise. + * Currently there are two sources for this config: + * <ul> + * <li> ColumnIndexCreationInfo (this is currently hard-coded to always return dictionary). </li> + * <li> SegmentGeneratorConfig</li> + * </ul> + * + * This method gives preference to the SegmentGeneratorConfig first. + * + * @param info Column index creation info + * @param config Segment generation config + * @param spec Field spec for the column + * @return True if dictionary should be created for the column, false otherwise + */ + private boolean createDictionaryForColumn(ColumnIndexCreationInfo info, SegmentGeneratorConfig config, + FieldSpec spec) { + String column = spec.getName(); + if (config.getRawIndexCreationColumns().contains(column) || config.getRawIndexCompressionType() + .containsKey(column)) { + return false; } - return BaseChunkSVForwardIndexWriter.DEFAULT_VERSION; + return info.isCreateDictionary(); } /** @@ -347,7 +279,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { FieldSpec fieldSpec) { ChunkCompressionType compressionType = segmentCreationSpec.getRawIndexCompressionType().get(fieldSpec.getName()); if (compressionType == null) { - if (fieldSpec.getFieldType() == FieldType.METRIC) { + if (fieldSpec.getFieldType() == FieldSpec.FieldType.METRIC) { return ChunkCompressionType.PASS_THROUGH; } else { return ChunkCompressionType.LZ4; @@ -357,31 +289,6 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { } } - /** - * Returns true if dictionary should be created for a column, false otherwise. - * Currently there are two sources for this config: - * <ul> - * <li> ColumnIndexCreationInfo (this is currently hard-coded to always return dictionary). </li> - * <li> SegmentGeneratorConfig</li> - * </ul> - * - * This method gives preference to the SegmentGeneratorConfig first. - * - * @param info Column index creation info - * @param config Segment generation config - * @param spec Field spec for the column - * @return True if dictionary should be created for the column, false otherwise - */ - private boolean createDictionaryForColumn(ColumnIndexCreationInfo info, SegmentGeneratorConfig config, - FieldSpec spec) { - String column = spec.getName(); - if (config.getRawIndexCreationColumns().contains(column) || config.getRawIndexCompressionType() - .containsKey(column)) { - return false; - } - return info.isCreateDictionary(); - } - @Override public void indexRow(GenericRow row) throws IOException { @@ -795,71 +702,6 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { properties.subset(COLUMN_PROPS_KEY_PREFIX + column).clear(); } - /** - * Helper method to build the raw index creator for the column. - * Assumes that column to be indexed is single valued. - * - * @param file Output index file - * @param column Column name - * @param totalDocs Total number of documents to index - * @param lengthOfLongestEntry Length of longest entry - * @param deriveNumDocsPerChunk true if varbyte writer should auto-derive the number of rows per chunk - * @param writerVersion version to use for the raw index writer - * @return raw index creator - */ - public static ForwardIndexCreator getRawIndexCreatorForSVColumn(File file, ChunkCompressionType compressionType, - String column, DataType dataType, int totalDocs, int lengthOfLongestEntry, boolean deriveNumDocsPerChunk, - int writerVersion) - throws IOException { - switch (dataType.getStoredType()) { - case INT: - case LONG: - case FLOAT: - case DOUBLE: - return new SingleValueFixedByteRawIndexCreator(file, compressionType, column, totalDocs, dataType, - writerVersion); - case STRING: - case BYTES: - return new SingleValueVarByteRawIndexCreator(file, compressionType, column, totalDocs, dataType, - lengthOfLongestEntry, deriveNumDocsPerChunk, writerVersion); - default: - throw new UnsupportedOperationException("Data type not supported for raw indexing: " + dataType); - } - } - - /** - * Helper method to build the raw index creator for the column. - * Assumes that column to be indexed is single valued. - * - * @param file Output index file - * @param column Column name - * @param totalDocs Total number of documents to index - * @param deriveNumDocsPerChunk true if varbyte writer should auto-derive the number of rows - * per chunk - * @param writerVersion version to use for the raw index writer - * @param maxRowLengthInBytes the length of the longest row in bytes - * @return raw index creator - */ - public static ForwardIndexCreator getRawIndexCreatorForMVColumn(File file, ChunkCompressionType compressionType, - String column, DataType dataType, final int totalDocs, int maxNumberOfMultiValueElements, - boolean deriveNumDocsPerChunk, int writerVersion, int maxRowLengthInBytes) - throws IOException { - switch (dataType.getStoredType()) { - case INT: - case LONG: - case FLOAT: - case DOUBLE: - return new MultiValueFixedByteRawIndexCreator(file, compressionType, column, totalDocs, dataType, - maxNumberOfMultiValueElements, deriveNumDocsPerChunk, writerVersion); - case STRING: - case BYTES: - return new MultiValueVarByteRawIndexCreator(file, compressionType, column, totalDocs, dataType, writerVersion, - maxRowLengthInBytes, maxNumberOfMultiValueElements); - default: - throw new UnsupportedOperationException("Data type not supported for raw indexing: " + dataType); - } - } - @Override public void close() throws IOException { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/BitSlicedRangeIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/BitSlicedRangeIndexCreator.java index 4c0b98b..0f96bf4 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/BitSlicedRangeIndexCreator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/BitSlicedRangeIndexCreator.java @@ -18,10 +18,10 @@ */ package org.apache.pinot.segment.local.segment.creator.impl.inv; +import com.google.common.base.Preconditions; import java.io.File; import java.io.IOException; import org.apache.pinot.segment.local.utils.FPOrdering; -import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.index.creator.CombinedInvertedIndexCreator; import org.apache.pinot.spi.data.FieldSpec; import org.roaringbitmap.RangeBitmap; @@ -41,13 +41,33 @@ public class BitSlicedRangeIndexCreator implements CombinedInvertedIndexCreator private final File _rangeIndexFile; private final long _minValue; - public BitSlicedRangeIndexCreator(File indexDir, ColumnMetadata metadata) { - if (!metadata.isSingleValue()) { - throw new IllegalArgumentException("MV columns not supported"); - } - _appender = RangeBitmap.appender(maxValue(metadata)); - _rangeIndexFile = new File(indexDir, metadata.getColumnName() + BITMAP_RANGE_INDEX_FILE_EXTENSION); - _minValue = minValue(metadata); + private BitSlicedRangeIndexCreator(File indexDir, FieldSpec fieldSpec, long minValue, long maxValue) { + Preconditions.checkArgument(fieldSpec.isSingleValueField(), "MV columns not supported"); + _rangeIndexFile = new File(indexDir, fieldSpec.getName() + BITMAP_RANGE_INDEX_FILE_EXTENSION); + _appender = RangeBitmap.appender(maxValue); + _minValue = minValue; + } + + /** + * For dictionarized columns + * @param indexDir the directory for the index + * @param fieldSpec the specification of the field + * @param cardinality the cardinality of the dictionary + */ + public BitSlicedRangeIndexCreator(File indexDir, FieldSpec fieldSpec, int cardinality) { + this(indexDir, fieldSpec, 0, cardinality - 1); + } + + /** + * For raw columns + * @param indexDir the directory for the index + * @param fieldSpec the specification of the field + * @param minValue the minimum value + * @param maxValue the maximum value + */ + public BitSlicedRangeIndexCreator(File indexDir, FieldSpec fieldSpec, Comparable<?> minValue, + Comparable<?> maxValue) { + this(indexDir, fieldSpec, minValue(fieldSpec, minValue), maxValue(fieldSpec, minValue, maxValue)); } @Override @@ -110,13 +130,8 @@ public class BitSlicedRangeIndexCreator implements CombinedInvertedIndexCreator throws IOException { } - private static long maxValue(ColumnMetadata metadata) { - if (metadata.hasDictionary()) { - return metadata.getCardinality() - 1; - } - FieldSpec.DataType storedType = metadata.getDataType().getStoredType(); - Comparable<?> minValue = metadata.getMinValue(); - Comparable<?> maxValue = metadata.getMaxValue(); + private static long maxValue(FieldSpec fieldSpec, Comparable<?> minValue, Comparable<?> maxValue) { + FieldSpec.DataType storedType = fieldSpec.getDataType().getStoredType(); if (storedType == INT || storedType == LONG) { return ((Number) maxValue).longValue() - ((Number) minValue).longValue(); } @@ -126,15 +141,11 @@ public class BitSlicedRangeIndexCreator implements CombinedInvertedIndexCreator if (storedType == DOUBLE) { return 0xFFFFFFFFFFFFFFFFL; } - throw new IllegalArgumentException("Unsupported data type: " + metadata.getDataType()); + throw new IllegalArgumentException("Unsupported data type: " + fieldSpec.getDataType()); } - private static long minValue(ColumnMetadata metadata) { - if (metadata.hasDictionary()) { - return 0; - } - FieldSpec.DataType storedType = metadata.getDataType().getStoredType(); - Comparable<?> minValue = metadata.getMinValue(); + private static long minValue(FieldSpec fieldSpec, Comparable<?> minValue) { + FieldSpec.DataType storedType = fieldSpec.getDataType().getStoredType(); if (storedType == INT || storedType == LONG) { return ((Number) minValue).longValue(); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandlerFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandlerFactory.java index 9dd9ecb..a6fe925 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandlerFactory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandlerFactory.java @@ -26,6 +26,8 @@ import org.apache.pinot.segment.local.segment.index.loader.invertedindex.Inverte import org.apache.pinot.segment.local.segment.index.loader.invertedindex.JsonIndexHandler; import org.apache.pinot.segment.local.segment.index.loader.invertedindex.RangeIndexHandler; import org.apache.pinot.segment.local.segment.index.loader.invertedindex.TextIndexHandler; +import org.apache.pinot.segment.spi.creator.IndexCreatorProvider; +import org.apache.pinot.segment.spi.creator.IndexCreatorProviders; import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; import org.apache.pinot.segment.spi.store.ColumnIndexType; import org.apache.pinot.segment.spi.store.SegmentDirectory; @@ -40,22 +42,26 @@ public class IndexHandlerFactory { public static IndexHandler getIndexHandler(ColumnIndexType type, File indexDir, SegmentMetadataImpl segmentMetadata, IndexLoadingConfig indexLoadingConfig, SegmentDirectory.Writer segmentWriter) { + IndexCreatorProvider indexCreatorProvider = IndexCreatorProviders.getIndexCreatorProvider(); switch (type) { case INVERTED_INDEX: - return new InvertedIndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter); + return new InvertedIndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter, + indexCreatorProvider); case RANGE_INDEX: - return new RangeIndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter); + return new RangeIndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter, + indexCreatorProvider); case TEXT_INDEX: - return new TextIndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter); + return new TextIndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter, indexCreatorProvider); case FST_INDEX: return new FSTIndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter, - indexLoadingConfig.getFSTIndexType()); + indexLoadingConfig.getFSTIndexType(), indexCreatorProvider); case JSON_INDEX: - return new JsonIndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter); + return new JsonIndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter, indexCreatorProvider); case H3_INDEX: - return new H3IndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter); + return new H3IndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter, indexCreatorProvider); case BLOOM_FILTER: - return new BloomFilterHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter); + return new BloomFilterHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter, + indexCreatorProvider); default: return NO_OP_HANDLER; } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java index b8a3264..f992344 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java @@ -24,7 +24,6 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; import org.apache.commons.io.FileUtils; -import org.apache.pinot.segment.local.segment.creator.impl.bloom.OnHeapGuavaBloomFilterCreator; import org.apache.pinot.segment.local.segment.index.loader.IndexHandler; import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils; @@ -37,6 +36,8 @@ import org.apache.pinot.segment.local.segment.index.readers.LongDictionary; import org.apache.pinot.segment.local.segment.index.readers.StringDictionary; import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.creator.BloomFilterCreatorProvider; +import org.apache.pinot.segment.spi.creator.IndexCreationContext; import org.apache.pinot.segment.spi.creator.SegmentVersion; import org.apache.pinot.segment.spi.index.creator.BloomFilterCreator; import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; @@ -57,13 +58,15 @@ public class BloomFilterHandler implements IndexHandler { private final SegmentMetadataImpl _segmentMetadata; private final SegmentDirectory.Writer _segmentWriter; private final Map<String, BloomFilterConfig> _bloomFilterConfigs; + private final BloomFilterCreatorProvider _indexCreatorProvider; public BloomFilterHandler(File indexDir, SegmentMetadataImpl segmentMetadata, IndexLoadingConfig indexLoadingConfig, - SegmentDirectory.Writer segmentWriter) { + SegmentDirectory.Writer segmentWriter, BloomFilterCreatorProvider indexCreatorProvider) { _indexDir = indexDir; _segmentWriter = segmentWriter; _segmentMetadata = segmentMetadata; _bloomFilterConfigs = indexLoadingConfig.getBloomFilterConfigs(); + _indexCreatorProvider = indexCreatorProvider; } @Override @@ -112,8 +115,9 @@ public class BloomFilterHandler implements IndexHandler { BloomFilterConfig bloomFilterConfig = _bloomFilterConfigs.get(columnName); LOGGER.info("Creating new bloom filter for segment: {}, column: {} with config: {}", segmentName, columnName, bloomFilterConfig); - try (BloomFilterCreator bloomFilterCreator = new OnHeapGuavaBloomFilterCreator(_indexDir, columnName, - columnMetadata.getCardinality(), bloomFilterConfig); + try (BloomFilterCreator bloomFilterCreator = _indexCreatorProvider.newBloomFilterCreator( + IndexCreationContext.builder().withIndexDir(_indexDir).withColumnMetadata(columnMetadata) + .build().forBloomFilter(bloomFilterConfig)); Dictionary dictionary = getDictionaryReader(columnMetadata, _segmentWriter)) { int length = dictionary.length(); for (int i = 0; i < length; i++) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/FSTIndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/FSTIndexHandler.java index 50f2b67..292bf0c 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/FSTIndexHandler.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/FSTIndexHandler.java @@ -24,15 +24,15 @@ import java.io.IOException; import java.util.HashSet; import java.util.Set; import org.apache.commons.io.FileUtils; -import org.apache.pinot.segment.local.segment.creator.impl.inv.text.LuceneFSTIndexCreator; import org.apache.pinot.segment.local.segment.index.loader.IndexHandler; import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils; import org.apache.pinot.segment.local.segment.index.loader.SegmentPreProcessor; -import org.apache.pinot.segment.local.utils.nativefst.NativeFSTIndexCreator; import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.SegmentMetadata; +import org.apache.pinot.segment.spi.creator.IndexCreationContext; import org.apache.pinot.segment.spi.creator.SegmentVersion; +import org.apache.pinot.segment.spi.creator.TextIndexCreatorProvider; import org.apache.pinot.segment.spi.index.creator.TextIndexCreator; import org.apache.pinot.segment.spi.index.reader.Dictionary; import org.apache.pinot.segment.spi.store.ColumnIndexType; @@ -71,14 +71,16 @@ public class FSTIndexHandler implements IndexHandler { private final SegmentDirectory.Writer _segmentWriter; private final Set<String> _columnsToAddIdx; private final FSTType _fstType; + private final TextIndexCreatorProvider _indexCreatorProvider; public FSTIndexHandler(File indexDir, SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig, - SegmentDirectory.Writer segmentWriter, FSTType fstType) { + SegmentDirectory.Writer segmentWriter, FSTType fstType, TextIndexCreatorProvider indexCreatorProvider) { _indexDir = indexDir; _segmentMetadata = segmentMetadata; _segmentWriter = segmentWriter; _columnsToAddIdx = new HashSet<>(indexLoadingConfig.getFSTIndexColumns()); _fstType = fstType; + _indexCreatorProvider = indexCreatorProvider; } @Override @@ -136,12 +138,9 @@ public class FSTIndexHandler implements IndexHandler { LOGGER.info("Creating new FST index for column: {} in segment: {}, cardinality: {}", column, segmentName, columnMetadata.getCardinality()); - TextIndexCreator fstIndexCreator; - if (_fstType == FSTType.LUCENE) { - fstIndexCreator = new LuceneFSTIndexCreator(_indexDir, column, null); - } else { - fstIndexCreator = new NativeFSTIndexCreator(_indexDir, column, null); - } + TextIndexCreator fstIndexCreator = _indexCreatorProvider.newTextIndexCreator( + IndexCreationContext.builder().withIndexDir(_indexDir).withColumnMetadata(columnMetadata) + .build().forFSTIndex(_fstType, null)); try (Dictionary dictionary = LoaderUtils.getDictionary(_segmentWriter, columnMetadata)) { for (int dictId = 0; dictId < dictionary.length(); dictId++) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/H3IndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/H3IndexHandler.java index ef96114..d2fceaa 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/H3IndexHandler.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/H3IndexHandler.java @@ -32,7 +32,10 @@ import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils; import org.apache.pinot.segment.local.utils.GeometrySerializer; import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.creator.IndexCreationContext; +import org.apache.pinot.segment.spi.creator.IndexCreatorProvider; import org.apache.pinot.segment.spi.creator.SegmentVersion; +import org.apache.pinot.segment.spi.index.creator.GeoSpatialIndexCreator; import org.apache.pinot.segment.spi.index.creator.H3IndexConfig; import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; import org.apache.pinot.segment.spi.index.reader.Dictionary; @@ -53,13 +56,15 @@ public class H3IndexHandler implements IndexHandler { private final SegmentMetadataImpl _segmentMetadata; private final SegmentDirectory.Writer _segmentWriter; private final Map<String, H3IndexConfig> _h3Configs; + private final IndexCreatorProvider _indexCreatorProvider; public H3IndexHandler(File indexDir, SegmentMetadataImpl segmentMetadata, IndexLoadingConfig indexLoadingConfig, - SegmentDirectory.Writer segmentWriter) { + SegmentDirectory.Writer segmentWriter, IndexCreatorProvider indexCreatorProvider) { _indexDir = indexDir; _segmentMetadata = segmentMetadata; _segmentWriter = segmentWriter; _h3Configs = indexLoadingConfig.getH3IndexConfigs(); + _indexCreatorProvider = indexCreatorProvider; } @Override @@ -129,8 +134,9 @@ public class H3IndexHandler implements IndexHandler { try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata); ForwardIndexReaderContext readerContext = forwardIndexReader.createContext(); Dictionary dictionary = LoaderUtils.getDictionary(_segmentWriter, columnMetadata); - OffHeapH3IndexCreator h3IndexCreator = new OffHeapH3IndexCreator(_indexDir, columnName, - _h3Configs.get(columnName).getResolution())) { + GeoSpatialIndexCreator h3IndexCreator = _indexCreatorProvider.newGeoSpatialIndexCreator( + IndexCreationContext.builder().withIndexDir(_indexDir).withColumnMetadata(columnMetadata) + .build().forGeospatialIndex(_h3Configs.get(columnName)))) { int numDocs = columnMetadata.getTotalDocs(); for (int i = 0; i < numDocs; i++) { int dictId = forwardIndexReader.getDictId(i, readerContext); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/InvertedIndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/InvertedIndexHandler.java index ee9d78f..693ca32 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/InvertedIndexHandler.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/InvertedIndexHandler.java @@ -23,14 +23,16 @@ import java.io.IOException; import java.util.HashSet; import java.util.Set; import org.apache.commons.io.FileUtils; -import org.apache.pinot.segment.local.segment.creator.impl.inv.OffHeapBitmapInvertedIndexCreator; import org.apache.pinot.segment.local.segment.index.loader.IndexHandler; import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils; import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.SegmentMetadata; import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.creator.IndexCreationContext; +import org.apache.pinot.segment.spi.creator.InvertedIndexCreatorProvider; import org.apache.pinot.segment.spi.creator.SegmentVersion; +import org.apache.pinot.segment.spi.index.creator.DictionaryBasedInvertedIndexCreator; import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext; import org.apache.pinot.segment.spi.store.ColumnIndexType; @@ -47,13 +49,15 @@ public class InvertedIndexHandler implements IndexHandler { private final SegmentMetadata _segmentMetadata; private final SegmentDirectory.Writer _segmentWriter; private final HashSet<String> _columnsToAddIdx; + private final InvertedIndexCreatorProvider _indexCreatorProvider; public InvertedIndexHandler(File indexDir, SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig, - SegmentDirectory.Writer segmentWriter) { + SegmentDirectory.Writer segmentWriter, InvertedIndexCreatorProvider indexCreatorProvider) { _indexDir = indexDir; _segmentMetadata = segmentMetadata; _segmentWriter = segmentWriter; _columnsToAddIdx = new HashSet<>(indexLoadingConfig.getInvertedIndexColumns()); + _indexCreatorProvider = indexCreatorProvider; } @Override @@ -100,9 +104,9 @@ public class InvertedIndexHandler implements IndexHandler { // Create new inverted index for the column. LOGGER.info("Creating new inverted index for segment: {}, column: {}", segmentName, column); int numDocs = columnMetadata.getTotalDocs(); - try (OffHeapBitmapInvertedIndexCreator creator = new OffHeapBitmapInvertedIndexCreator(_indexDir, - columnMetadata.getFieldSpec(), columnMetadata.getCardinality(), numDocs, - columnMetadata.getTotalNumberOfEntries())) { + try (DictionaryBasedInvertedIndexCreator creator = _indexCreatorProvider.newInvertedIndexCreator( + IndexCreationContext.builder().withIndexDir(_indexDir).withColumnMetadata(columnMetadata).build() + .forInvertedIndex())) { try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata); ForwardIndexReaderContext readerContext = forwardIndexReader.createContext()) { if (columnMetadata.isSingleValue()) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/JsonIndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/JsonIndexHandler.java index 93d52e8..27b8ade 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/JsonIndexHandler.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/JsonIndexHandler.java @@ -24,14 +24,16 @@ import java.io.IOException; import java.util.HashSet; import java.util.Set; import org.apache.commons.io.FileUtils; -import org.apache.pinot.segment.local.segment.creator.impl.inv.json.OffHeapJsonIndexCreator; import org.apache.pinot.segment.local.segment.index.loader.IndexHandler; import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils; import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.SegmentMetadata; import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.creator.IndexCreationContext; +import org.apache.pinot.segment.spi.creator.JsonIndexCreatorProvider; import org.apache.pinot.segment.spi.creator.SegmentVersion; +import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator; import org.apache.pinot.segment.spi.index.reader.Dictionary; import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext; @@ -50,13 +52,15 @@ public class JsonIndexHandler implements IndexHandler { private final SegmentMetadata _segmentMetadata; private final SegmentDirectory.Writer _segmentWriter; private final HashSet<String> _columnsToAddIdx; + private final JsonIndexCreatorProvider _indexCreatorProvider; public JsonIndexHandler(File indexDir, SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig, - SegmentDirectory.Writer segmentWriter) { + SegmentDirectory.Writer segmentWriter, JsonIndexCreatorProvider indexCreatorProvider) { _indexDir = indexDir; _segmentMetadata = segmentMetadata; _segmentWriter = segmentWriter; _columnsToAddIdx = new HashSet<>(indexLoadingConfig.getJsonIndexColumns()); + _indexCreatorProvider = indexCreatorProvider; } @Override @@ -122,11 +126,11 @@ public class JsonIndexHandler implements IndexHandler { private void handleDictionaryBasedColumn(ColumnMetadata columnMetadata) throws IOException { - String columnName = columnMetadata.getColumnName(); try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata); ForwardIndexReaderContext readerContext = forwardIndexReader.createContext(); Dictionary dictionary = LoaderUtils.getDictionary(_segmentWriter, columnMetadata); - OffHeapJsonIndexCreator jsonIndexCreator = new OffHeapJsonIndexCreator(_indexDir, columnName)) { + JsonIndexCreator jsonIndexCreator = _indexCreatorProvider.newJsonIndexCreator(IndexCreationContext.builder() + .withIndexDir(_indexDir).withColumnMetadata(columnMetadata).build().forJsonIndex())) { int numDocs = columnMetadata.getTotalDocs(); for (int i = 0; i < numDocs; i++) { int dictId = forwardIndexReader.getDictId(i, readerContext); @@ -138,10 +142,10 @@ public class JsonIndexHandler implements IndexHandler { private void handleNonDictionaryBasedColumn(ColumnMetadata columnMetadata) throws IOException { - String columnName = columnMetadata.getColumnName(); try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata); ForwardIndexReaderContext readerContext = forwardIndexReader.createContext(); - OffHeapJsonIndexCreator jsonIndexCreator = new OffHeapJsonIndexCreator(_indexDir, columnName)) { + JsonIndexCreator jsonIndexCreator = _indexCreatorProvider.newJsonIndexCreator(IndexCreationContext.builder() + .withIndexDir(_indexDir).withColumnMetadata(columnMetadata).build().forJsonIndex())) { int numDocs = columnMetadata.getTotalDocs(); for (int i = 0; i < numDocs; i++) { jsonIndexCreator.add(forwardIndexReader.getString(i, readerContext)); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/RangeIndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/RangeIndexHandler.java index 55c964b..3d5551d 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/RangeIndexHandler.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/RangeIndexHandler.java @@ -23,21 +23,20 @@ import java.io.IOException; import java.util.HashSet; import java.util.Set; import org.apache.commons.io.FileUtils; -import org.apache.pinot.segment.local.segment.creator.impl.inv.BitSlicedRangeIndexCreator; -import org.apache.pinot.segment.local.segment.creator.impl.inv.RangeIndexCreator; import org.apache.pinot.segment.local.segment.index.loader.IndexHandler; import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils; import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.SegmentMetadata; import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.creator.IndexCreationContext; +import org.apache.pinot.segment.spi.creator.RangeIndexCreatorProvider; import org.apache.pinot.segment.spi.creator.SegmentVersion; import org.apache.pinot.segment.spi.index.creator.CombinedInvertedIndexCreator; import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext; import org.apache.pinot.segment.spi.store.ColumnIndexType; import org.apache.pinot.segment.spi.store.SegmentDirectory; -import org.apache.pinot.spi.data.FieldSpec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,14 +50,16 @@ public class RangeIndexHandler implements IndexHandler { private final SegmentDirectory.Writer _segmentWriter; private final Set<String> _columnsToAddIdx; private final int _rangeIndexVersion; + private final RangeIndexCreatorProvider _indexCreatorProvider; public RangeIndexHandler(File indexDir, SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig, - SegmentDirectory.Writer segmentWriter) { + SegmentDirectory.Writer segmentWriter, RangeIndexCreatorProvider indexCreatorProvider) { _indexDir = indexDir; _segmentMetadata = segmentMetadata; _segmentWriter = segmentWriter; _columnsToAddIdx = new HashSet<>(indexLoadingConfig.getRangeIndexColumns()); _rangeIndexVersion = indexLoadingConfig.getRangeIndexVersion(); + _indexCreatorProvider = indexCreatorProvider; } @Override @@ -125,7 +126,7 @@ public class RangeIndexHandler implements IndexHandler { int numDocs = columnMetadata.getTotalDocs(); try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata); ForwardIndexReaderContext readerContext = forwardIndexReader.createContext(); - CombinedInvertedIndexCreator rangeIndexCreator = newRangeIndexCreator(columnMetadata, FieldSpec.DataType.INT)) { + CombinedInvertedIndexCreator rangeIndexCreator = newRangeIndexCreator(columnMetadata)) { if (columnMetadata.isSingleValue()) { // Single-value column for (int i = 0; i < numDocs; i++) { @@ -148,8 +149,7 @@ public class RangeIndexHandler implements IndexHandler { int numDocs = columnMetadata.getTotalDocs(); try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata); ForwardIndexReaderContext readerContext = forwardIndexReader.createContext(); - CombinedInvertedIndexCreator rangeIndexCreator = newRangeIndexCreator(columnMetadata, - columnMetadata.getDataType())) { + CombinedInvertedIndexCreator rangeIndexCreator = newRangeIndexCreator(columnMetadata)) { if (columnMetadata.isSingleValue()) { // Single-value column. switch (columnMetadata.getDataType()) { @@ -216,13 +216,10 @@ public class RangeIndexHandler implements IndexHandler { } } - private CombinedInvertedIndexCreator newRangeIndexCreator(ColumnMetadata columnMetadata, FieldSpec.DataType dataType) + private CombinedInvertedIndexCreator newRangeIndexCreator(ColumnMetadata columnMetadata) throws IOException { - if (_rangeIndexVersion == BitSlicedRangeIndexCreator.VERSION && columnMetadata.isSingleValue()) { - return new BitSlicedRangeIndexCreator(_indexDir, columnMetadata); - } - // default to RangeIndexCreator for the time being - return new RangeIndexCreator(_indexDir, columnMetadata.getFieldSpec(), dataType, -1, -1, - columnMetadata.getTotalDocs(), columnMetadata.getTotalNumberOfEntries()); + return _indexCreatorProvider.newRangeIndexCreator( + IndexCreationContext.builder().withIndexDir(_indexDir).withColumnMetadata(columnMetadata).build() + .forRangeIndex(_rangeIndexVersion, columnMetadata.getMinValue(), columnMetadata.getMaxValue())); } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/TextIndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/TextIndexHandler.java index 19e3a91..6e7c64e 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/TextIndexHandler.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/TextIndexHandler.java @@ -40,13 +40,14 @@ import java.io.File; import java.io.IOException; import java.util.HashSet; import java.util.Set; -import org.apache.pinot.segment.local.segment.creator.impl.text.LuceneTextIndexCreator; import org.apache.pinot.segment.local.segment.index.loader.IndexHandler; import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils; import org.apache.pinot.segment.local.segment.index.loader.SegmentPreProcessor; import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.SegmentMetadata; +import org.apache.pinot.segment.spi.creator.IndexCreationContext; +import org.apache.pinot.segment.spi.creator.TextIndexCreatorProvider; import org.apache.pinot.segment.spi.index.creator.TextIndexCreator; import org.apache.pinot.segment.spi.index.reader.Dictionary; import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; @@ -85,13 +86,15 @@ public class TextIndexHandler implements IndexHandler { private final SegmentMetadata _segmentMetadata; private final SegmentDirectory.Writer _segmentWriter; private final Set<String> _columnsToAddIdx; + private final TextIndexCreatorProvider _textIndexCreatorProvider; public TextIndexHandler(File indexDir, SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig, - SegmentDirectory.Writer segmentWriter) { + SegmentDirectory.Writer segmentWriter, TextIndexCreatorProvider textIndexCreatorProvider) { _indexDir = indexDir; _segmentMetadata = segmentMetadata; _segmentWriter = segmentWriter; _columnsToAddIdx = new HashSet<>(indexLoadingConfig.getTextIndexColumns()); + _textIndexCreatorProvider = textIndexCreatorProvider; } @Override @@ -144,7 +147,8 @@ public class TextIndexHandler implements IndexHandler { // based on segmentVersion. try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata); ForwardIndexReaderContext readerContext = forwardIndexReader.createContext(); - LuceneTextIndexCreator textIndexCreator = new LuceneTextIndexCreator(column, segmentDirectory, true)) { + TextIndexCreator textIndexCreator = _textIndexCreatorProvider.newTextIndexCreator(IndexCreationContext.builder() + .withColumnMetadata(columnMetadata).withIndexDir(segmentDirectory).build().forTextIndex(true))) { if (columnMetadata.isSingleValue()) { processSVField(hasDictionary, forwardIndexReader, readerContext, textIndexCreator, numDocs, columnMetadata); } else { diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/IndexCreatorOverrideTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/IndexCreatorOverrideTest.java new file mode 100644 index 0000000..6e32884 --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/IndexCreatorOverrideTest.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.creator.impl; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.UUID; +import org.apache.pinot.segment.local.segment.creator.impl.inv.OffHeapBitmapInvertedIndexCreator; +import org.apache.pinot.segment.spi.creator.IndexCreationContext; +import org.apache.pinot.segment.spi.creator.IndexCreatorProvider; +import org.apache.pinot.segment.spi.creator.IndexCreatorProviders; +import org.apache.pinot.segment.spi.index.creator.DictionaryBasedInvertedIndexCreator; +import org.apache.pinot.segment.spi.index.metadata.ColumnMetadataImpl; +import org.apache.pinot.spi.data.DimensionFieldSpec; +import org.apache.pinot.spi.data.FieldSpec; +import org.testng.annotations.AfterTest; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + +import static org.apache.commons.io.FileUtils.deleteQuietly; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + + +public class IndexCreatorOverrideTest { + + private File _file; + + @BeforeTest + public void before() + throws IOException { + _file = Files.createTempFile("IndexCreatorOverrideTest", UUID.randomUUID().toString()).toFile(); + } + + @AfterTest + public void cleanup() { + deleteQuietly(_file); + } + + @Test + public void testOverrideInvertedIndexCreation() + throws IOException { + DictionaryBasedInvertedIndexCreator highCardinalityInvertedIndex = mock(DictionaryBasedInvertedIndexCreator.class); + IndexCreatorProvider provider = new IndexCreatorProviders.Default() { + @Override + public DictionaryBasedInvertedIndexCreator newInvertedIndexCreator(IndexCreationContext.Inverted context) + throws IOException { + if (context.getCardinality() >= 10000) { + return highCardinalityInvertedIndex; + } + return super.newInvertedIndexCreator(context); + } + }; + mockStatic(IndexCreatorProviders.class).when(IndexCreatorProviders::getIndexCreatorProvider).thenReturn(provider); + IndexCreationContext.Inverted highCardinalityContext = newContext(Integer.MAX_VALUE); + assertEquals(IndexCreatorProviders.getIndexCreatorProvider().newInvertedIndexCreator(highCardinalityContext), + highCardinalityInvertedIndex); + IndexCreationContext.Inverted lowCardinalityContext = newContext(1); + assertTrue(IndexCreatorProviders.getIndexCreatorProvider() + .newInvertedIndexCreator(lowCardinalityContext) instanceof OffHeapBitmapInvertedIndexCreator); + } + + private IndexCreationContext.Inverted newContext(int cardinality) { + FieldSpec fieldSpec = new DimensionFieldSpec("test", FieldSpec.DataType.INT, true); + return IndexCreationContext.builder().withIndexDir(_file) + .withColumnMetadata(ColumnMetadataImpl.builder().setFieldSpec(fieldSpec).setCardinality(cardinality).build()) + .build().forInvertedIndex(); + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/BitSlicedIndexCreatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/BitSlicedIndexCreatorTest.java index c06ebee..0ad213c 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/BitSlicedIndexCreatorTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/BitSlicedIndexCreatorTest.java @@ -65,14 +65,12 @@ public class BitSlicedIndexCreatorTest { @Test(expectedExceptions = IllegalArgumentException.class) public void testFailToCreateRawString() { - new BitSlicedRangeIndexCreator(INDEX_DIR, new ColumnMetadataImpl.Builder() - .setFieldSpec(new DimensionFieldSpec("foo", STRING, true)).build()); + new BitSlicedRangeIndexCreator(INDEX_DIR, new DimensionFieldSpec("foo", STRING, true), null, null); } @Test(expectedExceptions = IllegalArgumentException.class) public void testFailToCreateMV() { - new BitSlicedRangeIndexCreator(INDEX_DIR, new ColumnMetadataImpl.Builder() - .setFieldSpec(new DimensionFieldSpec("foo", INT, false)).build()); + new BitSlicedRangeIndexCreator(INDEX_DIR, new DimensionFieldSpec("foo", INT, false), 0, 10); } @Test @@ -153,7 +151,7 @@ public class BitSlicedIndexCreatorTest { private void testInt(Dataset<int[]> dataset) throws IOException { ColumnMetadata metadata = dataset.toColumnMetadata(); - try (BitSlicedRangeIndexCreator creator = new BitSlicedRangeIndexCreator(INDEX_DIR, metadata)) { + try (BitSlicedRangeIndexCreator creator = newBitslicedIndexCreator(metadata)) { for (int value : dataset.values()) { creator.add(value); } @@ -181,7 +179,7 @@ public class BitSlicedIndexCreatorTest { private void testLong(Dataset<long[]> dataset) throws IOException { ColumnMetadata metadata = dataset.toColumnMetadata(); - try (BitSlicedRangeIndexCreator creator = new BitSlicedRangeIndexCreator(INDEX_DIR, metadata)) { + try (BitSlicedRangeIndexCreator creator = newBitslicedIndexCreator(metadata)) { for (long value : dataset.values()) { creator.add(value); } @@ -209,7 +207,7 @@ public class BitSlicedIndexCreatorTest { private void testFloat(Dataset<float[]> dataset) throws IOException { ColumnMetadata metadata = dataset.toColumnMetadata(); - try (BitSlicedRangeIndexCreator creator = new BitSlicedRangeIndexCreator(INDEX_DIR, metadata)) { + try (BitSlicedRangeIndexCreator creator = newBitslicedIndexCreator(metadata)) { for (float value : dataset.values()) { creator.add(value); } @@ -237,7 +235,7 @@ public class BitSlicedIndexCreatorTest { private void testDouble(Dataset<double[]> dataset) throws IOException { ColumnMetadata metadata = dataset.toColumnMetadata(); - try (BitSlicedRangeIndexCreator creator = new BitSlicedRangeIndexCreator(INDEX_DIR, metadata)) { + try (BitSlicedRangeIndexCreator creator = newBitslicedIndexCreator(metadata)) { for (double value : dataset.values()) { creator.add(value); } @@ -262,6 +260,12 @@ public class BitSlicedIndexCreatorTest { } } + private static BitSlicedRangeIndexCreator newBitslicedIndexCreator(ColumnMetadata metadata) { + return metadata.hasDictionary() ? new BitSlicedRangeIndexCreator(INDEX_DIR, + metadata.getFieldSpec(), metadata.getCardinality()) : new BitSlicedRangeIndexCreator(INDEX_DIR, + metadata.getFieldSpec(), metadata.getMinValue(), metadata.getMaxValue()); + } + enum Distribution { NORMAL { @Override diff --git a/pinot-segment-spi/pom.xml b/pinot-segment-spi/pom.xml index 1af8184..89a56d9 100644 --- a/pinot-segment-spi/pom.xml +++ b/pinot-segment-spi/pom.xml @@ -93,5 +93,11 @@ <scope>test</scope> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> + </dependencies> </project> diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/BloomFilterCreatorProvider.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/BloomFilterCreatorProvider.java new file mode 100644 index 0000000..ba23359 --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/BloomFilterCreatorProvider.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.creator; + +import java.io.IOException; +import org.apache.pinot.segment.spi.index.creator.BloomFilterCreator; + + +public interface BloomFilterCreatorProvider { + /** + * Creates a {@see BloomFilterCreator} from information about index creation. + * This allows a plugin to pattern match index creation information to select + * an appropriate implementation. + * @param context context about the index creation. + * @return a {@see ForwardIndexCreator} + * @throws IOException whenever something goes wrong matching or constructing the creator + */ + BloomFilterCreator newBloomFilterCreator(IndexCreationContext.BloomFilter context) + throws IOException; +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ForwardIndexCreatorProvider.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ForwardIndexCreatorProvider.java new file mode 100644 index 0000000..8c15dbb --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ForwardIndexCreatorProvider.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.creator; + +import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator; + + +public interface ForwardIndexCreatorProvider { + /** + * Creates a {@see ForwardIndexCreator} from information about index creation. + * This allows a plugin to pattern match index creation information to select + * an appropriate implementation. + * @param context context about the index creation. + * @return a {@see ForwardIndexCreator} + * @throws Exception whenever something goes wrong matching or constructing the creator + */ + ForwardIndexCreator newForwardIndexCreator(IndexCreationContext.Forward context) + throws Exception; +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/GeoSpatialIndexCreatorProvider.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/GeoSpatialIndexCreatorProvider.java new file mode 100644 index 0000000..34341a1 --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/GeoSpatialIndexCreatorProvider.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.creator; + +import java.io.IOException; +import org.apache.pinot.segment.spi.index.creator.GeoSpatialIndexCreator; + + +public interface GeoSpatialIndexCreatorProvider { + + /** + * Creates a {@see GeoSpatialIndexCreator} from information about index creation. + * This allows a plugin to pattern match index creation information to select + * an appropriate implementation. + * @param context context about the index creation. + * @return a {@see ForwardIndexCreator} + * @throws IOException whenever something goes wrong matching or constructing the creator + */ + GeoSpatialIndexCreator newGeoSpatialIndexCreator(IndexCreationContext.Geospatial context) + throws IOException; +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java new file mode 100644 index 0000000..c1a20ba --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java @@ -0,0 +1,467 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.creator; + +import java.io.File; +import java.util.Map; +import java.util.Objects; +import javax.annotation.Nullable; +import org.apache.pinot.segment.spi.ColumnMetadata; +import org.apache.pinot.segment.spi.compression.ChunkCompressionType; +import org.apache.pinot.segment.spi.index.creator.H3IndexConfig; +import org.apache.pinot.spi.config.table.BloomFilterConfig; +import org.apache.pinot.spi.config.table.FSTType; +import org.apache.pinot.spi.data.FieldSpec; + + +/** + * Provides parameters for constructing indexes via {@see IndexCreatorProvider}. + * The responsibility for ensuring that the correct parameters for a particular + * index type lies with the caller. + */ +public interface IndexCreationContext { + + FieldSpec getFieldSpec(); + + File getIndexDir(); + + boolean isOnHeap(); + + int getLengthOfLongestEntry(); + + int getMaxNumberOfMultiValueElements(); + + int getMaxRowLengthInBytes(); + + boolean isSorted(); + + int getCardinality(); + + int getTotalNumberOfEntries(); + + int getTotalDocs(); + + boolean hasDictionary(); + + final class Builder { + private File _indexDir; + private int _lengthOfLongestEntry; + private int _maxNumberOfMultiValueElements; + private int _maxRowLengthInBytes; + private boolean _onHeap = false; + private FieldSpec _fieldSpec; + private boolean _sorted; + private int _cardinality; + private int _totalNumberOfEntries; + private int _totalDocs; + private boolean _hasDictionary = true; + + public Builder withColumnIndexCreationInfo(ColumnIndexCreationInfo columnIndexCreationInfo) { + return withLengthOfLongestEntry(columnIndexCreationInfo.getLengthOfLongestEntry()) + .withMaxNumberOfMultiValueElements(columnIndexCreationInfo.getMaxNumberOfMultiValueElements()) + .withMaxRowLengthInBytes(columnIndexCreationInfo.getMaxRowLengthInBytes()); + } + + public Builder withIndexDir(File indexDir) { + _indexDir = indexDir; + return this; + } + + public Builder onHeap(boolean onHeap) { + _onHeap = onHeap; + return this; + } + + public Builder withColumnMetadata(ColumnMetadata columnMetadata) { + return withFieldSpec(columnMetadata.getFieldSpec()) + .sorted(columnMetadata.isSorted()) + .withCardinality(columnMetadata.getCardinality()) + .withTotalNumberOfEntries(columnMetadata.getTotalNumberOfEntries()) + .withTotalDocs(columnMetadata.getTotalDocs()) + .withDictionary(columnMetadata.hasDictionary()); + } + + public Builder withLengthOfLongestEntry(int lengthOfLongestEntry) { + _lengthOfLongestEntry = lengthOfLongestEntry; + return this; + } + + public Builder withMaxNumberOfMultiValueElements(int maxNumberOfMultiValueElements) { + _maxNumberOfMultiValueElements = maxNumberOfMultiValueElements; + return this; + } + + public Builder withMaxRowLengthInBytes(int maxRowLengthInBytes) { + _maxRowLengthInBytes = maxRowLengthInBytes; + return this; + } + + public Builder withFieldSpec(FieldSpec fieldSpec) { + _fieldSpec = fieldSpec; + return this; + } + + public Builder sorted(boolean sorted) { + _sorted = sorted; + return this; + } + + public Builder withCardinality(int cardinality) { + _cardinality = cardinality; + return this; + } + + public Builder withTotalNumberOfEntries(int totalNumberOfEntries) { + _totalNumberOfEntries = totalNumberOfEntries; + return this; + } + + public Builder withTotalDocs(int totalDocs) { + _totalDocs = totalDocs; + return this; + } + + public Builder withDictionary(boolean hasDictionary) { + _hasDictionary = hasDictionary; + return this; + } + + public Common build() { + return new Common(Objects.requireNonNull(_indexDir), + _lengthOfLongestEntry, _maxNumberOfMultiValueElements, _maxRowLengthInBytes, + _onHeap, Objects.requireNonNull(_fieldSpec), + _sorted, _cardinality, _totalNumberOfEntries, _totalDocs, _hasDictionary); + } + } + + static Builder builder() { + return new Builder(); + } + + final class Common implements IndexCreationContext { + + private final File _indexDir; + private final int _lengthOfLongestEntry; + private final int _maxNumberOfMultiValueElements; + private final int _maxRowLengthInBytes; + private final boolean _onHeap; + private final FieldSpec _fieldSpec; + private final boolean _sorted; + private final int _cardinality; + private final int _totalNumberOfEntries; + private final int _totalDocs; + private final boolean _hasDictionary; + + public Common(File indexDir, int lengthOfLongestEntry, + int maxNumberOfMultiValueElements, int maxRowLengthInBytes, boolean onHeap, + FieldSpec fieldSpec, boolean sorted, int cardinality, int totalNumberOfEntries, + int totalDocs, boolean hasDictionary) { + _indexDir = indexDir; + _lengthOfLongestEntry = lengthOfLongestEntry; + _maxNumberOfMultiValueElements = maxNumberOfMultiValueElements; + _maxRowLengthInBytes = maxRowLengthInBytes; + _onHeap = onHeap; + _fieldSpec = fieldSpec; + _sorted = sorted; + _cardinality = cardinality; + _totalNumberOfEntries = totalNumberOfEntries; + _totalDocs = totalDocs; + _hasDictionary = hasDictionary; + } + + public FieldSpec getFieldSpec() { + return _fieldSpec; + } + + public File getIndexDir() { + return _indexDir; + } + + public boolean isOnHeap() { + return _onHeap; + } + + public int getLengthOfLongestEntry() { + return _lengthOfLongestEntry; + } + + public int getMaxNumberOfMultiValueElements() { + return _maxNumberOfMultiValueElements; + } + + public int getMaxRowLengthInBytes() { + return _maxRowLengthInBytes; + } + + public boolean isSorted() { + return _sorted; + } + + public int getCardinality() { + return _cardinality; + } + + public int getTotalNumberOfEntries() { + return _totalNumberOfEntries; + } + + public int getTotalDocs() { + return _totalDocs; + } + + public boolean hasDictionary() { + return _hasDictionary; + } + + public BloomFilter forBloomFilter(BloomFilterConfig bloomFilterConfig) { + return new BloomFilter(this, bloomFilterConfig); + } + + public Forward forForwardIndex(ChunkCompressionType chunkCompressionType, + @Nullable Map<String, Map<String, String>> columnProperties) { + return new Forward(this, chunkCompressionType, columnProperties); + } + + public Text forFSTIndex(FSTType fstType, String[] sortedUniqueElementsArray) { + return new Text(this, fstType, sortedUniqueElementsArray); + } + + public Geospatial forGeospatialIndex(H3IndexConfig h3IndexConfig) { + return new Geospatial(this, h3IndexConfig); + } + + public Inverted forInvertedIndex() { + return new Inverted(this); + } + + public Json forJsonIndex() { + return new Json(this); + } + + public Range forRangeIndex(int rangeIndexVersion, Comparable<?> min, Comparable<?> max) { + return new Range(this, rangeIndexVersion, min, max); + } + + public Text forTextIndex(boolean commitOnClose) { + return new Text(this, commitOnClose); + } + } + + class Wrapper implements IndexCreationContext { + + private final IndexCreationContext _delegate; + + Wrapper(IndexCreationContext delegate) { + _delegate = delegate; + } + + @Override + public FieldSpec getFieldSpec() { + return _delegate.getFieldSpec(); + } + + @Override + public File getIndexDir() { + return _delegate.getIndexDir(); + } + + @Override + public boolean isOnHeap() { + return _delegate.isOnHeap(); + } + + @Override + public int getLengthOfLongestEntry() { + return _delegate.getLengthOfLongestEntry(); + } + + @Override + public int getMaxNumberOfMultiValueElements() { + return _delegate.getMaxNumberOfMultiValueElements(); + } + + @Override + public int getMaxRowLengthInBytes() { + return _delegate.getMaxRowLengthInBytes(); + } + + @Override + public boolean isSorted() { + return _delegate.isSorted(); + } + + @Override + public int getCardinality() { + return _delegate.getCardinality(); + } + + @Override + public int getTotalNumberOfEntries() { + return _delegate.getTotalNumberOfEntries(); + } + + @Override + public int getTotalDocs() { + return _delegate.getTotalDocs(); + } + + @Override + public boolean hasDictionary() { + return _delegate.hasDictionary(); + } + } + + class BloomFilter extends Wrapper { + + private final BloomFilterConfig _bloomFilterConfig; + + public BloomFilter(IndexCreationContext wrapped, BloomFilterConfig bloomFilterConfig) { + super(wrapped); + _bloomFilterConfig = bloomFilterConfig; + } + + @Nullable + public BloomFilterConfig getBloomFilterConfig() { + return _bloomFilterConfig; + } + } + + class Forward extends Wrapper { + + private final ChunkCompressionType _chunkCompressionType; + private final Map<String, Map<String, String>> _columnProperties; + + Forward(IndexCreationContext delegate, + ChunkCompressionType chunkCompressionType, + @Nullable Map<String, Map<String, String>> columnProperties) { + super(delegate); + _chunkCompressionType = chunkCompressionType; + _columnProperties = columnProperties; + } + + public ChunkCompressionType getChunkCompressionType() { + return _chunkCompressionType; + } + + @Nullable + public Map<String, Map<String, String>> getColumnProperties() { + return _columnProperties; + } + } + + class Geospatial extends Wrapper { + + private final H3IndexConfig _h3IndexConfig; + + Geospatial(IndexCreationContext delegate, H3IndexConfig h3IndexConfig) { + super(delegate); + _h3IndexConfig = h3IndexConfig; + } + + public H3IndexConfig getH3IndexConfig() { + return _h3IndexConfig; + } + } + + class Inverted extends Wrapper { + + Inverted(IndexCreationContext delegate) { + super(delegate); + } + } + + class Json extends Wrapper { + + Json(IndexCreationContext delegate) { + super(delegate); + } + } + + class Range extends Wrapper { + + private final Comparable<?> _min; + private final Comparable<?> _max; + private final int _rangeIndexVersion; + + + Range(IndexCreationContext delegate, int rangeIndexVersion, Comparable<?> min, Comparable<?> max) { + super(delegate); + _rangeIndexVersion = rangeIndexVersion; + _min = min; + _max = max; + } + + public Comparable<?> getMin() { + return _min; + } + + public Comparable<?> getMax() { + return _max; + } + + public int getRangeIndexVersion() { + return _rangeIndexVersion; + } + } + + class Text extends Wrapper { + private final boolean _commitOnClose; + private final boolean _isFst; + private final FSTType _fstType; + private final String[] _sortedUniqueElementsArray; + + /** + * For text indexes + */ + public Text(IndexCreationContext wrapped, boolean commitOnClose) { + super(wrapped); + _commitOnClose = commitOnClose; + _fstType = null; + _sortedUniqueElementsArray = null; + _isFst = false; + } + + /** + * For FST indexes + */ + public Text(IndexCreationContext wrapped, FSTType fstType, String[] sortedUniqueElementsArray) { + super(wrapped); + _commitOnClose = true; + _fstType = fstType; + _sortedUniqueElementsArray = sortedUniqueElementsArray; + _isFst = true; + } + + public boolean isCommitOnClose() { + return _commitOnClose; + } + + public FSTType getFstType() { + return _fstType; + } + + public boolean isFst() { + return _isFst; + } + + public String[] getSortedUniqueElementsArray() { + return _sortedUniqueElementsArray; + } + } +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreatorProvider.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreatorProvider.java new file mode 100644 index 0000000..4dd1a02 --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreatorProvider.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.creator; + +/** + * Plugin interface to abstract index creation. + */ +public interface IndexCreatorProvider + extends ForwardIndexCreatorProvider, InvertedIndexCreatorProvider, JsonIndexCreatorProvider, + TextIndexCreatorProvider, GeoSpatialIndexCreatorProvider, RangeIndexCreatorProvider, + BloomFilterCreatorProvider { +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreatorProviders.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreatorProviders.java new file mode 100644 index 0000000..40466f1 --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreatorProviders.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.creator; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodType; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.pinot.segment.spi.index.creator.BloomFilterCreator; +import org.apache.pinot.segment.spi.index.creator.CombinedInvertedIndexCreator; +import org.apache.pinot.segment.spi.index.creator.DictionaryBasedInvertedIndexCreator; +import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator; +import org.apache.pinot.segment.spi.index.creator.GeoSpatialIndexCreator; +import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator; +import org.apache.pinot.segment.spi.index.creator.TextIndexCreator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Plugin registration point to allow index creation logic to be swapped out. + * Plugins should not reimplement Pinot's default index creation logic. + * Users provide an override to Pinot's index creation logic. This is simplified + * by extending {@see IndexCreatorProviders.Default} + */ +public final class IndexCreatorProviders { + + private static final Logger LOGGER = LoggerFactory.getLogger(IndexCreatorProviders.class); + + private static final IndexCreatorProvider DEFAULT = defaultProvider(); + private static final AtomicReference<IndexCreatorProvider> REGISTRATION = new AtomicReference<>(DEFAULT); + + private IndexCreatorProviders() { + } + + /** + * The caller provides a decorator to wrap the default provider, which allows plugins to create + * a delegation chain. + * @param provider index creation override + * @return true if this is the first invocation and the provider has not yet been used. + */ + public static boolean registerProvider(IndexCreatorProvider provider) { + return REGISTRATION.compareAndSet(DEFAULT, provider); + } + + /** + * Obtain the registered index creator provider. If the user has provided an override, then it will be used instead. + * If the user has not provided an override yet, then this action will prevent them from doing so. + * @return the global index provision logic. + */ + public static IndexCreatorProvider getIndexCreatorProvider() { + return Holder.PROVIDER; + } + + private static final class Holder { + public static final IndexCreatorProvider PROVIDER = REGISTRATION.get(); + } + + private static IndexCreatorProvider defaultProvider() { + // use MethodHandle to break circular dependency and keep implementation details encapsulated within + // pinot-segment-local + String className = "org.apache.pinot.segment.local.segment.creator.impl.DefaultIndexCreatorProvider"; + try { + Class<?> clazz = Class.forName(className, false, IndexCreatorProviders.class.getClassLoader()); + return (IndexCreatorProvider) MethodHandles.publicLookup() + .findConstructor(clazz, MethodType.methodType(void.class)).invoke(); + } catch (Throwable missing) { + LOGGER.error("could not construct MethodHandle for {}", className, missing); + // this means pinot-segment-local isn't on the classpath, but this means + // no indexes will be created, so it's ok to return null + return null; + } + } + + /** + * Extend this class to override index creation + */ + public static class Default implements IndexCreatorProvider { + + @Override + public BloomFilterCreator newBloomFilterCreator(IndexCreationContext.BloomFilter context) + throws IOException { + if (DEFAULT == null) { + throw new UnsupportedOperationException("default implementation not present on classpath"); + } + return DEFAULT.newBloomFilterCreator(context); + } + + @Override + public ForwardIndexCreator newForwardIndexCreator(IndexCreationContext.Forward context) + throws Exception { + if (DEFAULT == null) { + throw new UnsupportedOperationException("default implementation not present on classpath"); + } + return DEFAULT.newForwardIndexCreator(context); + } + + @Override + public GeoSpatialIndexCreator newGeoSpatialIndexCreator(IndexCreationContext.Geospatial context) + throws IOException { + if (DEFAULT == null) { + throw new UnsupportedOperationException("default implementation not present on classpath"); + } + return DEFAULT.newGeoSpatialIndexCreator(context); + } + + @Override + public DictionaryBasedInvertedIndexCreator newInvertedIndexCreator(IndexCreationContext.Inverted context) + throws IOException { + if (DEFAULT == null) { + throw new UnsupportedOperationException("default implementation not present on classpath"); + } + return DEFAULT.newInvertedIndexCreator(context); + } + + @Override + public JsonIndexCreator newJsonIndexCreator(IndexCreationContext.Json context) + throws IOException { + if (DEFAULT == null) { + throw new UnsupportedOperationException("default implementation not present on classpath"); + } + return DEFAULT.newJsonIndexCreator(context); + } + + @Override + public CombinedInvertedIndexCreator newRangeIndexCreator(IndexCreationContext.Range context) + throws IOException { + if (DEFAULT == null) { + throw new UnsupportedOperationException("default implementation not present on classpath"); + } + return DEFAULT.newRangeIndexCreator(context); + } + + @Override + public TextIndexCreator newTextIndexCreator(IndexCreationContext.Text context) + throws IOException { + if (DEFAULT == null) { + throw new UnsupportedOperationException("default implementation not present on classpath"); + } + return DEFAULT.newTextIndexCreator(context); + } + } +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/InvertedIndexCreatorProvider.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/InvertedIndexCreatorProvider.java new file mode 100644 index 0000000..2bf7f1a --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/InvertedIndexCreatorProvider.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.creator; + +import java.io.IOException; +import org.apache.pinot.segment.spi.index.creator.DictionaryBasedInvertedIndexCreator; + + +public interface InvertedIndexCreatorProvider { + /** + * Creates a {@see DictionaryBasedInvertedIndexCreator} from information about index creation. + * This allows a plugin to pattern match index creation information to select + * an appropriate implementation. + * @param context context about the index creation. + * @return a {@see ForwardIndexCreator} + * @throws IOException whenever something goes wrong matching or constructing the creator + */ + DictionaryBasedInvertedIndexCreator newInvertedIndexCreator(IndexCreationContext.Inverted context) + throws IOException; +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/JsonIndexCreatorProvider.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/JsonIndexCreatorProvider.java new file mode 100644 index 0000000..22e7f54 --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/JsonIndexCreatorProvider.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.creator; + +import java.io.IOException; +import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator; + + +public interface JsonIndexCreatorProvider { + /** + * Creates a {@see JsonIndexCreator} from information about index creation. + * This allows a plugin to pattern match index creation information to select + * an appropriate implementation. + * @param context context about the index creation. + * @return a {@see ForwardIndexCreator} + * @throws IOException whenever something goes wrong matching or constructing the creator + */ + JsonIndexCreator newJsonIndexCreator(IndexCreationContext.Json context) + throws IOException; +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/RangeIndexCreatorProvider.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/RangeIndexCreatorProvider.java new file mode 100644 index 0000000..a3abfc6 --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/RangeIndexCreatorProvider.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.creator; + +import java.io.IOException; +import org.apache.pinot.segment.spi.index.creator.CombinedInvertedIndexCreator; + + +public interface RangeIndexCreatorProvider { + /** + * Creates a {@see CombinedInvertedIndexCreator} from information about index creation. + * This allows a plugin to pattern match index creation information to select + * an appropriate implementation. + * @param context context about the index creation. + * @return a {@see ForwardIndexCreator} + * @throws IOException whenever something goes wrong matching or constructing the creator + */ + CombinedInvertedIndexCreator newRangeIndexCreator(IndexCreationContext.Range context) + throws IOException; +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/TextIndexCreatorProvider.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/TextIndexCreatorProvider.java new file mode 100644 index 0000000..05f53ea --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/TextIndexCreatorProvider.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.creator; + +import java.io.IOException; +import org.apache.pinot.segment.spi.index.creator.TextIndexCreator; + + +public interface TextIndexCreatorProvider { + + /** + * Creates a {@see TextIndexCreator} from information about index creation. + * This allows a plugin to pattern match index creation information to select + * an appropriate implementation. + * @param context context about the index creation. + * @return a {@see ForwardIndexCreator} + * @throws IOException whenever something goes wrong matching or constructing the creator + */ + TextIndexCreator newTextIndexCreator(IndexCreationContext.Text context) + throws IOException; +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadataImpl.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadataImpl.java index 19dbb2a..519f7c1 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadataImpl.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadataImpl.java @@ -289,6 +289,10 @@ public class ColumnMetadataImpl implements ColumnMetadata { return builder.build(); } + public static Builder builder() { + return new Builder(); + } + public static class Builder { private FieldSpec _fieldSpec; private int _totalDocs; diff --git a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/creator/IndexCreatorProvidersTest.java b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/creator/IndexCreatorProvidersTest.java new file mode 100644 index 0000000..bfaebb3 --- /dev/null +++ b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/creator/IndexCreatorProvidersTest.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.creator; + +import java.io.IOException; +import org.apache.pinot.segment.spi.index.creator.BloomFilterCreator; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + + +public class IndexCreatorProvidersTest { + + @Test + public void indexCreatorProvidersLoadableWithoutDefaultImplementation() + throws IOException { + BloomFilterCreator mockBloomFilterCreator = mock(BloomFilterCreator.class); + assertTrue(IndexCreatorProviders.registerProvider(new IndexCreatorProviders.Default() { + @Override + public BloomFilterCreator newBloomFilterCreator(IndexCreationContext.BloomFilter context) { + return mockBloomFilterCreator; + } + })); + // it's ok to load external overrides without an internal implementation present, e.g. for testing + assertEquals(mockBloomFilterCreator, IndexCreatorProviders.getIndexCreatorProvider() + .newBloomFilterCreator(mock(IndexCreationContext.BloomFilter.class))); + } + + @Test(expectedExceptions = UnsupportedOperationException.class) + public void whenDefaultImplementationMissingThrowUnsupportedOperationException() + throws IOException { + // the implementation is missing so no indexes will be created anyway... + new IndexCreatorProviders.Default().newBloomFilterCreator(mock(IndexCreationContext.BloomFilter.class)); + } +} diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java index d349da8..0367203 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java @@ -31,7 +31,7 @@ import org.apache.commons.io.FileUtils; import org.apache.pinot.common.utils.TarGzCompressionUtils; import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; import org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter; -import org.apache.pinot.segment.local.segment.creator.impl.SegmentColumnarIndexCreator; +import org.apache.pinot.segment.local.segment.creator.impl.DefaultIndexCreatorProvider; import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.segment.spi.V1Constants; import org.apache.pinot.segment.spi.compression.ChunkCompressionType; @@ -314,7 +314,7 @@ public class DictionaryToRawIndexConverter { int numDocs = segment.getSegmentMetadata().getTotalDocs(); int lengthOfLongestEntry = (storedType == DataType.STRING) ? getLengthOfLongestEntry(dictionary) : -1; - try (ForwardIndexCreator rawIndexCreator = SegmentColumnarIndexCreator + try (ForwardIndexCreator rawIndexCreator = DefaultIndexCreatorProvider .getRawIndexCreatorForSVColumn(newSegment, compressionType, column, storedType, numDocs, lengthOfLongestEntry, false, BaseChunkSVForwardIndexWriter.DEFAULT_VERSION); ForwardIndexReaderContext readerContext = reader.createContext()) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org