klsince commented on code in PR #10184: URL: https://github.com/apache/pinot/pull/10184#discussion_r1153898848
########## pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/CombinedInvertedIndexCreator.java: ########## @@ -18,6 +18,80 @@ */ package org.apache.pinot.segment.spi.index.creator; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.apache.pinot.spi.data.FieldSpec; + + +/** + * This is the index used to create range indexes Review Comment: ic. The class name is a bit misleading, but you're right this class seems only for range index. ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java: ########## @@ -133,183 +120,141 @@ public void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreatio return; } - Collection<FieldSpec> fieldSpecs = schema.getAllFieldSpecs(); - Set<String> invertedIndexColumns = new HashSet<>(); - for (String columnName : _config.getInvertedIndexCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create inverted index for column: %s because it is not in schema", columnName); - invertedIndexColumns.add(columnName); - } + Map<String, FieldIndexConfigs> indexConfigs = segmentCreationSpec.getIndexConfigsByColName(); - Set<String> bloomFilterColumns = new HashSet<>(); - for (String columnName : _config.getBloomFilterCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create bloom filter for column: %s because it is not in schema", columnName); - bloomFilterColumns.add(columnName); - } - - Set<String> rangeIndexColumns = new HashSet<>(); - for (String columnName : _config.getRangeIndexCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create range index for column: %s because it is not in schema", columnName); - rangeIndexColumns.add(columnName); - } - - Set<String> textIndexColumns = new HashSet<>(); - for (String columnName : _config.getTextIndexCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create text index for column: %s because it is not in schema", columnName); - textIndexColumns.add(columnName); - } - - Set<String> fstIndexColumns = new HashSet<>(); - for (String columnName : _config.getFSTIndexCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create FST index for column: %s because it is not in schema", columnName); - fstIndexColumns.add(columnName); - } - - Map<String, JsonIndexConfig> jsonIndexConfigs = _config.getJsonIndexConfigs(); - for (String columnName : jsonIndexConfigs.keySet()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create json index for column: %s because it is not in schema", columnName); - } - - Set<String> forwardIndexDisabledColumns = new HashSet<>(); - for (String columnName : _config.getForwardIndexDisabledColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), String.format("Invalid config. Can't disable " - + "forward index creation for a column: %s that does not exist in schema", columnName)); - forwardIndexDisabledColumns.add(columnName); - } - - Map<String, H3IndexConfig> h3IndexConfigs = _config.getH3IndexConfigs(); - for (String columnName : h3IndexConfigs.keySet()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create H3 index for column: %s because it is not in schema", columnName); - } + _creatorsByColAndIndex = Maps.newHashMapWithExpectedSize(indexConfigs.keySet().size()); - // Initialize creators for dictionary, forward index and inverted index - IndexingConfig indexingConfig = _config.getTableConfig().getIndexingConfig(); - int rangeIndexVersion = indexingConfig.getRangeIndexVersion(); - for (FieldSpec fieldSpec : fieldSpecs) { - // Ignore virtual columns + for (String columnName : indexConfigs.keySet()) { + FieldSpec fieldSpec = schema.getFieldSpecFor(columnName); + if (fieldSpec == null) { + Preconditions.checkState(schema.hasColumn(columnName), + "Cannot create index for column: %s because it is not in schema", columnName); + } if (fieldSpec.isVirtualColumn()) { + LOGGER.warn("Ignoring index creation for virtual column " + columnName); continue; } - String columnName = fieldSpec.getName(); + FieldIndexConfigs originalConfig = indexConfigs.get(columnName); ColumnIndexCreationInfo columnIndexCreationInfo = indexCreationInfoMap.get(columnName); Preconditions.checkNotNull(columnIndexCreationInfo, "Missing index creation info for column: %s", columnName); boolean dictEnabledColumn = createDictionaryForColumn(columnIndexCreationInfo, segmentCreationSpec, fieldSpec); - Preconditions.checkState(dictEnabledColumn || !invertedIndexColumns.contains(columnName), + Preconditions.checkState(dictEnabledColumn || !originalConfig.getConfig(StandardIndexes.inverted()).isEnabled(), "Cannot create inverted index for raw index column: %s", columnName); - boolean forwardIndexDisabled = forwardIndexDisabledColumns.contains(columnName); + IndexType<ForwardIndexConfig, ?, ForwardIndexCreator> forwardIdx = StandardIndexes.forward(); + boolean forwardIndexDisabled = !originalConfig.getConfig(forwardIdx).isEnabled(); IndexCreationContext.Common context = IndexCreationContext.builder() .withIndexDir(_indexDir) - .withCardinality(columnIndexCreationInfo.getDistinctValueCount()) .withDictionary(dictEnabledColumn) .withFieldSpec(fieldSpec) .withTotalDocs(segmentIndexCreationInfo.getTotalDocs()) - .withMinValue((Comparable<?>) columnIndexCreationInfo.getMin()) - .withMaxValue((Comparable<?>) columnIndexCreationInfo.getMax()) - .withTotalNumberOfEntries(columnIndexCreationInfo.getTotalNumberOfEntries()) .withColumnIndexCreationInfo(columnIndexCreationInfo) - .sorted(columnIndexCreationInfo.isSorted()) + .withIsOptimizedDictionary(_config.isOptimizeDictionary() + || _config.isOptimizeDictionaryForMetrics() && fieldSpec.getFieldType() == FieldSpec.FieldType.METRIC) .onHeap(segmentCreationSpec.isOnHeap()) .withForwardIndexDisabled(forwardIndexDisabled) + .withTextCommitOnClose(true) .build(); - // Initialize forward index creator - ChunkCompressionType chunkCompressionType = - dictEnabledColumn ? null : getColumnCompressionType(segmentCreationSpec, fieldSpec); - // Sorted columns treat the 'forwardIndexDisabled' flag as a no-op - _forwardIndexCreatorMap.put(columnName, (forwardIndexDisabled && !columnIndexCreationInfo.isSorted()) - ? null : _indexCreatorProvider.newForwardIndexCreator( - context.forForwardIndex(chunkCompressionType, segmentCreationSpec.getColumnProperties()))); - - // Initialize inverted index creator; skip creating inverted index if sorted - if (invertedIndexColumns.contains(columnName) && !columnIndexCreationInfo.isSorted()) { - _invertedIndexCreatorMap.put(columnName, - _indexCreatorProvider.newInvertedIndexCreator(context.forInvertedIndex())); - } + + FieldIndexConfigs config = adaptConfig(columnName, originalConfig, columnIndexCreationInfo, segmentCreationSpec); + if (dictEnabledColumn) { // Create dictionary-encoded index // Initialize dictionary creator // TODO: Dictionary creator holds all unique values on heap. Consider keeping dictionary instead of creator // which uses off-heap memory. - SegmentDictionaryCreator dictionaryCreator = - new SegmentDictionaryCreator(fieldSpec, _indexDir, columnIndexCreationInfo.isUseVarLengthDictionary()); - _dictionaryCreatorMap.put(columnName, dictionaryCreator); - // Create dictionary + + // Index conf should be present if dictEnabledColumn is true. In case it doesn't, getConfig will throw an + // exception + DictionaryIndexConfig dictConfig = config.getConfig(StandardIndexes.dictionary()); + if (!dictConfig.isEnabled()) { Review Comment: hmm... did you mean in this example `location` would get dict-encoded even though config wanted to use `RAW`? I see H3IndexHandler can handle both dict-encoded and raw values though. ``` "fieldConfigList": [ { "name": "location", "encodingType": "RAW", "indexType": "H3", "properties": { "resolutions": "5" } } ], ``` ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java: ########## @@ -19,60 +19,183 @@ package org.apache.pinot.segment.local.segment.index.forward; +import com.google.common.collect.Sets; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.function.Supplier; import javax.annotation.Nullable; +import org.apache.pinot.segment.local.segment.index.loader.ConfigurableFromIndexLoadingConfig; +import org.apache.pinot.segment.local.segment.index.loader.ForwardIndexHandler; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.compression.ChunkCompressionType; import org.apache.pinot.segment.spi.creator.IndexCreationContext; +import org.apache.pinot.segment.spi.index.AbstractIndexType; +import org.apache.pinot.segment.spi.index.ColumnConfigDeserializer; import org.apache.pinot.segment.spi.index.FieldIndexConfigs; -import org.apache.pinot.segment.spi.index.IndexCreator; +import org.apache.pinot.segment.spi.index.ForwardIndexConfig; +import org.apache.pinot.segment.spi.index.IndexConfigDeserializer; import org.apache.pinot.segment.spi.index.IndexHandler; -import org.apache.pinot.segment.spi.index.IndexReader; +import org.apache.pinot.segment.spi.index.IndexReaderConstraintException; import org.apache.pinot.segment.spi.index.IndexReaderFactory; -import org.apache.pinot.segment.spi.index.IndexType; import org.apache.pinot.segment.spi.index.StandardIndexes; +import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator; +import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; import org.apache.pinot.segment.spi.store.SegmentDirectory; -import org.apache.pinot.spi.config.table.IndexConfig; +import org.apache.pinot.spi.config.table.FieldConfig; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; -public class ForwardIndexType implements IndexType<IndexConfig, IndexReader, IndexCreator> { +public class ForwardIndexType + extends AbstractIndexType<ForwardIndexConfig, ForwardIndexReader, ForwardIndexCreator> + implements ConfigurableFromIndexLoadingConfig<ForwardIndexConfig> { - public static final ForwardIndexType INSTANCE = new ForwardIndexType(); - - private ForwardIndexType() { + protected ForwardIndexType() { + super(StandardIndexes.FORWARD_ID); } @Override - public String getId() { - return StandardIndexes.FORWARD_ID; + public Class<ForwardIndexConfig> getIndexConfigClass() { + return ForwardIndexConfig.class; } @Override - public Class<IndexConfig> getIndexConfigClass() { - return null; + public Map<String, ForwardIndexConfig> fromIndexLoadingConfig(IndexLoadingConfig indexLoadingConfig) { + Set<String> disabledCols = indexLoadingConfig.getForwardIndexDisabledColumns(); + Map<String, ForwardIndexConfig> result = new HashMap<>(); + Set<String> allColumns = Sets.union(disabledCols, indexLoadingConfig.getAllKnownColumns()); + for (String column : allColumns) { + ChunkCompressionType compressionType = + indexLoadingConfig.getCompressionConfigs() != null + ? indexLoadingConfig.getCompressionConfigs().get(column) + : null; + Supplier<ForwardIndexConfig> defaultConfig = () -> { + if (compressionType == null) { + return ForwardIndexConfig.DEFAULT; + } else { + return new ForwardIndexConfig.Builder().withCompressionType(compressionType).build(); + } + }; + if (!disabledCols.contains(column)) { + TableConfig tableConfig = indexLoadingConfig.getTableConfig(); + if (tableConfig == null) { + result.put(column, defaultConfig.get()); + } else { + List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList(); + if (fieldConfigList == null) { + result.put(column, defaultConfig.get()); + continue; + } + FieldConfig fieldConfig = fieldConfigList.stream() + .filter(fc -> fc.getName().equals(column)) + .findAny() + .orElse(null); + if (fieldConfig == null) { + result.put(column, defaultConfig.get()); + continue; + } + ForwardIndexConfig.Builder builder = new ForwardIndexConfig.Builder(); + if (compressionType != null) { + builder.withCompressionType(compressionType); + } else { + FieldConfig.CompressionCodec compressionCodec = fieldConfig.getCompressionCodec(); + if (compressionCodec != null) { + builder.withCompressionType(ChunkCompressionType.valueOf(compressionCodec.name())); + } + } + + result.put(column, builder.build()); + } + } else { + result.put(column, ForwardIndexConfig.DISABLED); + } + } + return result; } @Override - public IndexConfig getDefaultConfig() { - return IndexConfig.ENABLED; + public ForwardIndexConfig getDefaultConfig() { + return ForwardIndexConfig.DEFAULT; } @Override - public IndexConfig getConfig(TableConfig tableConfig, Schema schema) { - throw new UnsupportedOperationException(); + public ColumnConfigDeserializer<ForwardIndexConfig> getDeserializer() { + // reads tableConfig.fieldConfigList and decides what to create using the FieldConfig properties and encoding + ColumnConfigDeserializer<ForwardIndexConfig> fromFieldConfig = IndexConfigDeserializer.fromCollection( + TableConfig::getFieldConfigList, + (accum, fieldConfig) -> { + Map<String, String> properties = fieldConfig.getProperties(); + if (properties != null && isDisabled(properties)) { + accum.put(fieldConfig.getName(), ForwardIndexConfig.DISABLED); + } else if (fieldConfig.getEncodingType() == FieldConfig.EncodingType.RAW) { + accum.put(fieldConfig.getName(), createConfigFromFieldConfig(fieldConfig)); + } + } + ); + return IndexConfigDeserializer.fromIndexes("forward", getIndexConfigClass()) + .withExclusiveAlternative(fromFieldConfig); + } + + private boolean isDisabled(Map<String, String> props) { + return Boolean.parseBoolean( + props.getOrDefault(FieldConfig.FORWARD_INDEX_DISABLED, FieldConfig.DEFAULT_FORWARD_INDEX_DISABLED)); + } + + private ForwardIndexConfig createConfigFromFieldConfig(FieldConfig fieldConfig) { + if (fieldConfig.getEncodingType() != FieldConfig.EncodingType.RAW) { + throw new IllegalArgumentException("Cannot build a forward index on a field whose encoding is " Review Comment: Not a new feature, but we might be talking about different things. Lemme try again: A column can be dict-encoded and have its fwd index made of dict ids; or have its fwd index simply made of raw values (i.e. encodingType=RAW). ``` public enum EncodingType { RAW, DICTIONARY } ``` So when I saw this if-check, I was wondering why this method only deals with RAW. ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java: ########## @@ -264,51 +265,42 @@ private void createBloomFilterForColumn(SegmentDirectory.Writer segmentWriter, C if (!columnMetadata.hasDictionary()) { // Create a temporary forward index if it is disabled and does not exist - columnMetadata = createForwardIndexIfNeeded(segmentWriter, columnName, indexCreatorProvider, true); + columnMetadata = createForwardIndexIfNeeded(segmentWriter, columnName, true); } // Create new bloom filter for the column. BloomFilterConfig bloomFilterConfig = _bloomFilterConfigs.get(columnName); LOGGER.info("Creating new bloom filter for segment: {}, column: {} with config: {}", segmentName, columnName, bloomFilterConfig); if (columnMetadata.hasDictionary()) { - createAndSealBloomFilterForDictionaryColumn(bloomFilterCreatorProvider, indexDir, columnMetadata, - bloomFilterConfig, segmentWriter); + createAndSealBloomFilterForDictionaryColumn(indexDir, columnMetadata, bloomFilterConfig, segmentWriter); } else { - createAndSealBloomFilterForNonDictionaryColumn(bloomFilterCreatorProvider, indexDir, columnMetadata, - bloomFilterConfig, segmentWriter); + createAndSealBloomFilterForNonDictionaryColumn(indexDir, columnMetadata, bloomFilterConfig, segmentWriter); } // For v3, write the generated bloom filter file into the single file and remove it. if (_segmentDirectory.getSegmentMetadata().getVersion() == SegmentVersion.v3) { - LoaderUtils.writeIndexToV3Format(segmentWriter, columnName, bloomFilterFile, ColumnIndexType.BLOOM_FILTER); + LoaderUtils.writeIndexToV3Format(segmentWriter, columnName, bloomFilterFile, StandardIndexes.bloomFilter()); } // Delete the marker file. FileUtils.deleteQuietly(bloomFilterFileInProgress); LOGGER.info("Created bloom filter for segment: {}, column: {}", segmentName, columnName); } - private BaseImmutableDictionary getDictionaryReader(ColumnMetadata columnMetadata, - SegmentDirectory.Writer segmentWriter) + private Dictionary getDictionaryReader(ColumnMetadata columnMetadata, SegmentDirectory.Writer segmentWriter) throws IOException { - PinotDataBuffer dictionaryBuffer = - segmentWriter.getIndexFor(columnMetadata.getColumnName(), ColumnIndexType.DICTIONARY); - int cardinality = columnMetadata.getCardinality(); DataType dataType = columnMetadata.getDataType(); + switch (dataType) { case INT: - return new IntDictionary(dictionaryBuffer, cardinality); case LONG: - return new LongDictionary(dictionaryBuffer, cardinality); case FLOAT: - return new FloatDictionary(dictionaryBuffer, cardinality); case DOUBLE: - return new DoubleDictionary(dictionaryBuffer, cardinality); case STRING: - return new StringDictionary(dictionaryBuffer, cardinality, columnMetadata.getColumnMaxLength()); case BYTES: - return new BytesDictionary(dictionaryBuffer, cardinality, columnMetadata.getColumnMaxLength()); + PinotDataBuffer buf = segmentWriter.getIndexFor(columnMetadata.getColumnName(), StandardIndexes.dictionary()); + return DictionaryIndexType.read(buf, columnMetadata, DictionaryIndexConfig.DEFAULT_OFFHEAP); Review Comment: I'm thinking of this way, to avoid referring to concrete XXXIndexType class: ``` StandardIndexes.dictionary().createReaderFactory().read(buf, columnMetadata, DictionaryIndexConfig.DEFAULT_OFFHEAP) ``` ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/text/TextIndexConfigBuilder.java: ########## @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.segment.local.segment.index.text; + +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.pinot.segment.local.segment.store.TextIndexUtils; +import org.apache.pinot.segment.spi.index.TextIndexConfig; +import org.apache.pinot.spi.config.table.FSTType; +import org.apache.pinot.spi.config.table.FieldConfig; + + +public class TextIndexConfigBuilder extends TextIndexConfig.AbstractBuilder { + public TextIndexConfigBuilder(@Nullable FSTType fstType) { + super(fstType); + } + + public TextIndexConfigBuilder(TextIndexConfig other) { + super(other); + } + + @Override + public TextIndexConfig.AbstractBuilder withProperties(@Nullable Map<String, String> textIndexProperties) { + if (textIndexProperties != null) { + if (Boolean.parseBoolean(textIndexProperties.get(FieldConfig.TEXT_INDEX_NO_RAW_DATA))) { + _rawValueForTextIndex = textIndexProperties.get(FieldConfig.TEXT_INDEX_RAW_VALUE); + if (_rawValueForTextIndex == null) { + _rawValueForTextIndex = FieldConfig.TEXT_INDEX_DEFAULT_RAW_VALUE; + } + } + _enableQueryCache = Boolean.parseBoolean(textIndexProperties.get(FieldConfig.TEXT_INDEX_ENABLE_QUERY_CACHE)); + _useANDForMultiTermQueries = Boolean.parseBoolean( + textIndexProperties.get(FieldConfig.TEXT_INDEX_USE_AND_FOR_MULTI_TERM_QUERIES)); + _stopWordsInclude = TextIndexUtils.extractStopWordsInclude(textIndexProperties); + _stopWordsExclude = TextIndexUtils.extractStopWordsExclude(textIndexProperties); + } + return this; + } + + @Override + public TextIndexConfigBuilder withStopWordsInclude(List<String> stopWordsInclude) { Review Comment: > This class should return TextIndexConfigBuilder instead of TextIndexConfig.AbstractBuilder as it superclass does. why so? there is no extra methods on this concrete class TextIndexConfigBuilder for caller to access. Actually, `withProperties()` above returns parent class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org