klsince commented on code in PR #10184: URL: https://github.com/apache/pinot/pull/10184#discussion_r1151133210
########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java: ########## @@ -162,22 +165,27 @@ private File getValidDocIdsSnapshotFile() { } @Override - public Dictionary getDictionary(String column) { + public <I extends IndexReader> I getIndex(String column, IndexType<?, I, ?> type) { ColumnIndexContainer container = _indexContainerMap.get(column); if (container == null) { throw new NullPointerException("Invalid column: " + column); } - return container.getDictionary(); + return type.getIndexReader(container); Review Comment: directly call `container.getIndex(type)`? and remove this method from IndexType ``` @Nullable default IR getIndexReader(ColumnIndexContainer indexContainer) { return indexContainer.getIndex(this); } ``` ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java: ########## @@ -133,183 +120,141 @@ public void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreatio return; } - Collection<FieldSpec> fieldSpecs = schema.getAllFieldSpecs(); - Set<String> invertedIndexColumns = new HashSet<>(); - for (String columnName : _config.getInvertedIndexCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create inverted index for column: %s because it is not in schema", columnName); - invertedIndexColumns.add(columnName); - } + Map<String, FieldIndexConfigs> indexConfigs = segmentCreationSpec.getIndexConfigsByColName(); - Set<String> bloomFilterColumns = new HashSet<>(); - for (String columnName : _config.getBloomFilterCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create bloom filter for column: %s because it is not in schema", columnName); - bloomFilterColumns.add(columnName); - } - - Set<String> rangeIndexColumns = new HashSet<>(); - for (String columnName : _config.getRangeIndexCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create range index for column: %s because it is not in schema", columnName); - rangeIndexColumns.add(columnName); - } - - Set<String> textIndexColumns = new HashSet<>(); - for (String columnName : _config.getTextIndexCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create text index for column: %s because it is not in schema", columnName); - textIndexColumns.add(columnName); - } - - Set<String> fstIndexColumns = new HashSet<>(); - for (String columnName : _config.getFSTIndexCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create FST index for column: %s because it is not in schema", columnName); - fstIndexColumns.add(columnName); - } - - Map<String, JsonIndexConfig> jsonIndexConfigs = _config.getJsonIndexConfigs(); - for (String columnName : jsonIndexConfigs.keySet()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create json index for column: %s because it is not in schema", columnName); - } - - Set<String> forwardIndexDisabledColumns = new HashSet<>(); - for (String columnName : _config.getForwardIndexDisabledColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), String.format("Invalid config. Can't disable " - + "forward index creation for a column: %s that does not exist in schema", columnName)); - forwardIndexDisabledColumns.add(columnName); - } - - Map<String, H3IndexConfig> h3IndexConfigs = _config.getH3IndexConfigs(); - for (String columnName : h3IndexConfigs.keySet()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create H3 index for column: %s because it is not in schema", columnName); - } + _creatorsByColAndIndex = Maps.newHashMapWithExpectedSize(indexConfigs.keySet().size()); - // Initialize creators for dictionary, forward index and inverted index - IndexingConfig indexingConfig = _config.getTableConfig().getIndexingConfig(); - int rangeIndexVersion = indexingConfig.getRangeIndexVersion(); - for (FieldSpec fieldSpec : fieldSpecs) { - // Ignore virtual columns + for (String columnName : indexConfigs.keySet()) { + FieldSpec fieldSpec = schema.getFieldSpecFor(columnName); + if (fieldSpec == null) { + Preconditions.checkState(schema.hasColumn(columnName), + "Cannot create index for column: %s because it is not in schema", columnName); + } if (fieldSpec.isVirtualColumn()) { + LOGGER.warn("Ignoring index creation for virtual column " + columnName); continue; } - String columnName = fieldSpec.getName(); + FieldIndexConfigs originalConfig = indexConfigs.get(columnName); ColumnIndexCreationInfo columnIndexCreationInfo = indexCreationInfoMap.get(columnName); Preconditions.checkNotNull(columnIndexCreationInfo, "Missing index creation info for column: %s", columnName); boolean dictEnabledColumn = createDictionaryForColumn(columnIndexCreationInfo, segmentCreationSpec, fieldSpec); - Preconditions.checkState(dictEnabledColumn || !invertedIndexColumns.contains(columnName), + Preconditions.checkState(dictEnabledColumn || !originalConfig.getConfig(StandardIndexes.inverted()).isEnabled(), "Cannot create inverted index for raw index column: %s", columnName); - boolean forwardIndexDisabled = forwardIndexDisabledColumns.contains(columnName); + IndexType<ForwardIndexConfig, ?, ForwardIndexCreator> forwardIdx = StandardIndexes.forward(); + boolean forwardIndexDisabled = !originalConfig.getConfig(forwardIdx).isEnabled(); IndexCreationContext.Common context = IndexCreationContext.builder() .withIndexDir(_indexDir) - .withCardinality(columnIndexCreationInfo.getDistinctValueCount()) .withDictionary(dictEnabledColumn) .withFieldSpec(fieldSpec) .withTotalDocs(segmentIndexCreationInfo.getTotalDocs()) - .withMinValue((Comparable<?>) columnIndexCreationInfo.getMin()) - .withMaxValue((Comparable<?>) columnIndexCreationInfo.getMax()) - .withTotalNumberOfEntries(columnIndexCreationInfo.getTotalNumberOfEntries()) .withColumnIndexCreationInfo(columnIndexCreationInfo) - .sorted(columnIndexCreationInfo.isSorted()) + .withIsOptimizedDictionary(_config.isOptimizeDictionary() + || _config.isOptimizeDictionaryForMetrics() && fieldSpec.getFieldType() == FieldSpec.FieldType.METRIC) .onHeap(segmentCreationSpec.isOnHeap()) .withForwardIndexDisabled(forwardIndexDisabled) + .withTextCommitOnClose(true) Review Comment: looks like `true` is set for all calls of this method, so maybe we can use `true` as default and save all those calls. ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java: ########## @@ -133,183 +120,141 @@ public void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreatio return; } - Collection<FieldSpec> fieldSpecs = schema.getAllFieldSpecs(); - Set<String> invertedIndexColumns = new HashSet<>(); - for (String columnName : _config.getInvertedIndexCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create inverted index for column: %s because it is not in schema", columnName); - invertedIndexColumns.add(columnName); - } + Map<String, FieldIndexConfigs> indexConfigs = segmentCreationSpec.getIndexConfigsByColName(); - Set<String> bloomFilterColumns = new HashSet<>(); - for (String columnName : _config.getBloomFilterCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create bloom filter for column: %s because it is not in schema", columnName); - bloomFilterColumns.add(columnName); - } - - Set<String> rangeIndexColumns = new HashSet<>(); - for (String columnName : _config.getRangeIndexCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create range index for column: %s because it is not in schema", columnName); - rangeIndexColumns.add(columnName); - } - - Set<String> textIndexColumns = new HashSet<>(); - for (String columnName : _config.getTextIndexCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create text index for column: %s because it is not in schema", columnName); - textIndexColumns.add(columnName); - } - - Set<String> fstIndexColumns = new HashSet<>(); - for (String columnName : _config.getFSTIndexCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create FST index for column: %s because it is not in schema", columnName); - fstIndexColumns.add(columnName); - } - - Map<String, JsonIndexConfig> jsonIndexConfigs = _config.getJsonIndexConfigs(); - for (String columnName : jsonIndexConfigs.keySet()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create json index for column: %s because it is not in schema", columnName); - } - - Set<String> forwardIndexDisabledColumns = new HashSet<>(); - for (String columnName : _config.getForwardIndexDisabledColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), String.format("Invalid config. Can't disable " - + "forward index creation for a column: %s that does not exist in schema", columnName)); - forwardIndexDisabledColumns.add(columnName); - } - - Map<String, H3IndexConfig> h3IndexConfigs = _config.getH3IndexConfigs(); - for (String columnName : h3IndexConfigs.keySet()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create H3 index for column: %s because it is not in schema", columnName); - } + _creatorsByColAndIndex = Maps.newHashMapWithExpectedSize(indexConfigs.keySet().size()); - // Initialize creators for dictionary, forward index and inverted index - IndexingConfig indexingConfig = _config.getTableConfig().getIndexingConfig(); - int rangeIndexVersion = indexingConfig.getRangeIndexVersion(); - for (FieldSpec fieldSpec : fieldSpecs) { - // Ignore virtual columns + for (String columnName : indexConfigs.keySet()) { + FieldSpec fieldSpec = schema.getFieldSpecFor(columnName); + if (fieldSpec == null) { + Preconditions.checkState(schema.hasColumn(columnName), + "Cannot create index for column: %s because it is not in schema", columnName); + } if (fieldSpec.isVirtualColumn()) { + LOGGER.warn("Ignoring index creation for virtual column " + columnName); continue; } - String columnName = fieldSpec.getName(); + FieldIndexConfigs originalConfig = indexConfigs.get(columnName); ColumnIndexCreationInfo columnIndexCreationInfo = indexCreationInfoMap.get(columnName); Preconditions.checkNotNull(columnIndexCreationInfo, "Missing index creation info for column: %s", columnName); boolean dictEnabledColumn = createDictionaryForColumn(columnIndexCreationInfo, segmentCreationSpec, fieldSpec); - Preconditions.checkState(dictEnabledColumn || !invertedIndexColumns.contains(columnName), + Preconditions.checkState(dictEnabledColumn || !originalConfig.getConfig(StandardIndexes.inverted()).isEnabled(), "Cannot create inverted index for raw index column: %s", columnName); - boolean forwardIndexDisabled = forwardIndexDisabledColumns.contains(columnName); + IndexType<ForwardIndexConfig, ?, ForwardIndexCreator> forwardIdx = StandardIndexes.forward(); + boolean forwardIndexDisabled = !originalConfig.getConfig(forwardIdx).isEnabled(); IndexCreationContext.Common context = IndexCreationContext.builder() .withIndexDir(_indexDir) - .withCardinality(columnIndexCreationInfo.getDistinctValueCount()) .withDictionary(dictEnabledColumn) .withFieldSpec(fieldSpec) .withTotalDocs(segmentIndexCreationInfo.getTotalDocs()) - .withMinValue((Comparable<?>) columnIndexCreationInfo.getMin()) - .withMaxValue((Comparable<?>) columnIndexCreationInfo.getMax()) - .withTotalNumberOfEntries(columnIndexCreationInfo.getTotalNumberOfEntries()) .withColumnIndexCreationInfo(columnIndexCreationInfo) - .sorted(columnIndexCreationInfo.isSorted()) + .withIsOptimizedDictionary(_config.isOptimizeDictionary() + || _config.isOptimizeDictionaryForMetrics() && fieldSpec.getFieldType() == FieldSpec.FieldType.METRIC) .onHeap(segmentCreationSpec.isOnHeap()) .withForwardIndexDisabled(forwardIndexDisabled) + .withTextCommitOnClose(true) .build(); - // Initialize forward index creator - ChunkCompressionType chunkCompressionType = - dictEnabledColumn ? null : getColumnCompressionType(segmentCreationSpec, fieldSpec); - // Sorted columns treat the 'forwardIndexDisabled' flag as a no-op - _forwardIndexCreatorMap.put(columnName, (forwardIndexDisabled && !columnIndexCreationInfo.isSorted()) - ? null : _indexCreatorProvider.newForwardIndexCreator( - context.forForwardIndex(chunkCompressionType, segmentCreationSpec.getColumnProperties()))); - - // Initialize inverted index creator; skip creating inverted index if sorted - if (invertedIndexColumns.contains(columnName) && !columnIndexCreationInfo.isSorted()) { - _invertedIndexCreatorMap.put(columnName, - _indexCreatorProvider.newInvertedIndexCreator(context.forInvertedIndex())); - } + + FieldIndexConfigs config = adaptConfig(columnName, originalConfig, columnIndexCreationInfo, segmentCreationSpec); + if (dictEnabledColumn) { // Create dictionary-encoded index // Initialize dictionary creator // TODO: Dictionary creator holds all unique values on heap. Consider keeping dictionary instead of creator // which uses off-heap memory. - SegmentDictionaryCreator dictionaryCreator = - new SegmentDictionaryCreator(fieldSpec, _indexDir, columnIndexCreationInfo.isUseVarLengthDictionary()); - _dictionaryCreatorMap.put(columnName, dictionaryCreator); - // Create dictionary + + // Index conf should be present if dictEnabledColumn is true. In case it doesn't, getConfig will throw an + // exception + DictionaryIndexConfig dictConfig = config.getConfig(StandardIndexes.dictionary()); + if (!dictConfig.isEnabled()) { + throw new IllegalArgumentException("Dictionary index should be enabled"); + } + SegmentDictionaryCreator creator = new DictionaryIndexPlugin().getIndexType() + .createIndexCreator(context, dictConfig); + try { - dictionaryCreator.build(columnIndexCreationInfo.getSortedUniqueElementsArray()); + creator.build(context.getSortedUniqueElementsArray()); } catch (Exception e) { LOGGER.error("Error building dictionary for field: {}, cardinality: {}, number of bytes per entry: {}", - fieldSpec.getName(), columnIndexCreationInfo.getDistinctValueCount(), - dictionaryCreator.getNumBytesPerEntry()); + context.getFieldSpec().getName(), context.getCardinality(), creator.getNumBytesPerEntry()); throw e; } - } - if (bloomFilterColumns.contains(columnName)) { - if (indexingConfig.getBloomFilterConfigs() != null - && indexingConfig.getBloomFilterConfigs().containsKey(columnName)) { - _bloomFilterCreatorMap.put(columnName, _indexCreatorProvider.newBloomFilterCreator( - context.forBloomFilter(indexingConfig.getBloomFilterConfigs().get(columnName)))); - } else { - _bloomFilterCreatorMap.put(columnName, _indexCreatorProvider.newBloomFilterCreator( - context.forBloomFilter(new BloomFilterConfig(BloomFilterConfig.DEFAULT_FPP, 0, false)))); - } + _dictionaryCreatorMap.put(columnName, creator); } - if (!columnIndexCreationInfo.isSorted() && rangeIndexColumns.contains(columnName)) { - _rangeIndexFilterCreatorMap.put(columnName, - _indexCreatorProvider.newRangeIndexCreator(context.forRangeIndex(rangeIndexVersion))); + Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex = + Maps.newHashMapWithExpectedSize(IndexService.getInstance().getAllIndexes().size()); + for (IndexType<?, ?, ?> index : IndexService.getInstance().getAllIndexes()) { + if (hasSpecialLifecycle(index)) { + continue; + } + tryCreateIndexCreator(creatorsByIndex, index, context, config); } - - if (textIndexColumns.contains(columnName)) { - FSTType fstType = FSTType.LUCENE; - List<FieldConfig> fieldConfigList = _config.getTableConfig().getFieldConfigList(); - if (fieldConfigList != null) { - for (FieldConfig fieldConfig : fieldConfigList) { - if (fieldConfig.getName().equals(columnName)) { - Map<String, String> properties = fieldConfig.getProperties(); - if (TextIndexUtils.isFstTypeNative(properties)) { - fstType = FSTType.NATIVE; - } - } - } + // TODO: Remove this when values stored as ForwardIndex stop depending on TextIndex config + IndexCreator oldFwdCreator = creatorsByIndex.get(forwardIdx); + if (oldFwdCreator instanceof ForwardIndexCreator) { // this implies that oldFwdCreator != null Review Comment: Is it possible to get a different index creator from `get(forwardIdx)`? If not, perhaps just need a simple null check here. ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java: ########## @@ -133,183 +120,141 @@ public void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreatio return; } - Collection<FieldSpec> fieldSpecs = schema.getAllFieldSpecs(); - Set<String> invertedIndexColumns = new HashSet<>(); - for (String columnName : _config.getInvertedIndexCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create inverted index for column: %s because it is not in schema", columnName); - invertedIndexColumns.add(columnName); - } + Map<String, FieldIndexConfigs> indexConfigs = segmentCreationSpec.getIndexConfigsByColName(); - Set<String> bloomFilterColumns = new HashSet<>(); - for (String columnName : _config.getBloomFilterCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create bloom filter for column: %s because it is not in schema", columnName); - bloomFilterColumns.add(columnName); - } - - Set<String> rangeIndexColumns = new HashSet<>(); - for (String columnName : _config.getRangeIndexCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create range index for column: %s because it is not in schema", columnName); - rangeIndexColumns.add(columnName); - } - - Set<String> textIndexColumns = new HashSet<>(); - for (String columnName : _config.getTextIndexCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create text index for column: %s because it is not in schema", columnName); - textIndexColumns.add(columnName); - } - - Set<String> fstIndexColumns = new HashSet<>(); - for (String columnName : _config.getFSTIndexCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create FST index for column: %s because it is not in schema", columnName); - fstIndexColumns.add(columnName); - } - - Map<String, JsonIndexConfig> jsonIndexConfigs = _config.getJsonIndexConfigs(); - for (String columnName : jsonIndexConfigs.keySet()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create json index for column: %s because it is not in schema", columnName); - } - - Set<String> forwardIndexDisabledColumns = new HashSet<>(); - for (String columnName : _config.getForwardIndexDisabledColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), String.format("Invalid config. Can't disable " - + "forward index creation for a column: %s that does not exist in schema", columnName)); - forwardIndexDisabledColumns.add(columnName); - } - - Map<String, H3IndexConfig> h3IndexConfigs = _config.getH3IndexConfigs(); - for (String columnName : h3IndexConfigs.keySet()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create H3 index for column: %s because it is not in schema", columnName); - } + _creatorsByColAndIndex = Maps.newHashMapWithExpectedSize(indexConfigs.keySet().size()); - // Initialize creators for dictionary, forward index and inverted index - IndexingConfig indexingConfig = _config.getTableConfig().getIndexingConfig(); - int rangeIndexVersion = indexingConfig.getRangeIndexVersion(); - for (FieldSpec fieldSpec : fieldSpecs) { - // Ignore virtual columns + for (String columnName : indexConfigs.keySet()) { + FieldSpec fieldSpec = schema.getFieldSpecFor(columnName); + if (fieldSpec == null) { + Preconditions.checkState(schema.hasColumn(columnName), + "Cannot create index for column: %s because it is not in schema", columnName); + } if (fieldSpec.isVirtualColumn()) { + LOGGER.warn("Ignoring index creation for virtual column " + columnName); continue; } - String columnName = fieldSpec.getName(); + FieldIndexConfigs originalConfig = indexConfigs.get(columnName); ColumnIndexCreationInfo columnIndexCreationInfo = indexCreationInfoMap.get(columnName); Preconditions.checkNotNull(columnIndexCreationInfo, "Missing index creation info for column: %s", columnName); boolean dictEnabledColumn = createDictionaryForColumn(columnIndexCreationInfo, segmentCreationSpec, fieldSpec); - Preconditions.checkState(dictEnabledColumn || !invertedIndexColumns.contains(columnName), + Preconditions.checkState(dictEnabledColumn || !originalConfig.getConfig(StandardIndexes.inverted()).isEnabled(), "Cannot create inverted index for raw index column: %s", columnName); - boolean forwardIndexDisabled = forwardIndexDisabledColumns.contains(columnName); + IndexType<ForwardIndexConfig, ?, ForwardIndexCreator> forwardIdx = StandardIndexes.forward(); + boolean forwardIndexDisabled = !originalConfig.getConfig(forwardIdx).isEnabled(); IndexCreationContext.Common context = IndexCreationContext.builder() .withIndexDir(_indexDir) - .withCardinality(columnIndexCreationInfo.getDistinctValueCount()) .withDictionary(dictEnabledColumn) .withFieldSpec(fieldSpec) .withTotalDocs(segmentIndexCreationInfo.getTotalDocs()) - .withMinValue((Comparable<?>) columnIndexCreationInfo.getMin()) - .withMaxValue((Comparable<?>) columnIndexCreationInfo.getMax()) - .withTotalNumberOfEntries(columnIndexCreationInfo.getTotalNumberOfEntries()) .withColumnIndexCreationInfo(columnIndexCreationInfo) - .sorted(columnIndexCreationInfo.isSorted()) + .withIsOptimizedDictionary(_config.isOptimizeDictionary() + || _config.isOptimizeDictionaryForMetrics() && fieldSpec.getFieldType() == FieldSpec.FieldType.METRIC) .onHeap(segmentCreationSpec.isOnHeap()) .withForwardIndexDisabled(forwardIndexDisabled) + .withTextCommitOnClose(true) .build(); - // Initialize forward index creator - ChunkCompressionType chunkCompressionType = - dictEnabledColumn ? null : getColumnCompressionType(segmentCreationSpec, fieldSpec); - // Sorted columns treat the 'forwardIndexDisabled' flag as a no-op - _forwardIndexCreatorMap.put(columnName, (forwardIndexDisabled && !columnIndexCreationInfo.isSorted()) - ? null : _indexCreatorProvider.newForwardIndexCreator( - context.forForwardIndex(chunkCompressionType, segmentCreationSpec.getColumnProperties()))); - - // Initialize inverted index creator; skip creating inverted index if sorted - if (invertedIndexColumns.contains(columnName) && !columnIndexCreationInfo.isSorted()) { - _invertedIndexCreatorMap.put(columnName, - _indexCreatorProvider.newInvertedIndexCreator(context.forInvertedIndex())); - } + + FieldIndexConfigs config = adaptConfig(columnName, originalConfig, columnIndexCreationInfo, segmentCreationSpec); + if (dictEnabledColumn) { // Create dictionary-encoded index // Initialize dictionary creator // TODO: Dictionary creator holds all unique values on heap. Consider keeping dictionary instead of creator // which uses off-heap memory. - SegmentDictionaryCreator dictionaryCreator = - new SegmentDictionaryCreator(fieldSpec, _indexDir, columnIndexCreationInfo.isUseVarLengthDictionary()); - _dictionaryCreatorMap.put(columnName, dictionaryCreator); - // Create dictionary + + // Index conf should be present if dictEnabledColumn is true. In case it doesn't, getConfig will throw an + // exception + DictionaryIndexConfig dictConfig = config.getConfig(StandardIndexes.dictionary()); + if (!dictConfig.isEnabled()) { Review Comment: can this `isEnabled` check be done when calculating `dictEnabledColumn` boolean? ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java: ########## @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.segment.local.segment.index.forward; + +import java.io.File; +import java.io.IOException; +import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueFixedByteRawIndexCreator; +import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueUnsortedForwardIndexCreator; +import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator; +import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueFixedByteRawIndexCreator; +import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueSortedForwardIndexCreator; +import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueUnsortedForwardIndexCreator; +import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueVarByteRawIndexCreator; +import org.apache.pinot.segment.spi.compression.ChunkCompressionType; +import org.apache.pinot.segment.spi.creator.IndexCreationContext; +import org.apache.pinot.segment.spi.index.ForwardIndexConfig; +import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator; +import org.apache.pinot.spi.data.FieldSpec; + + +public class ForwardIndexCreatorFactory { + private ForwardIndexCreatorFactory() { + } + + public static ForwardIndexCreator createIndexCreator(IndexCreationContext context, ForwardIndexConfig indexConfig) + throws Exception { + String colName = context.getFieldSpec().getName(); + + if (!context.hasDictionary()) { + ChunkCompressionType chunkCompressionType = indexConfig.getChunkCompressionType(); + if (chunkCompressionType == null) { + chunkCompressionType = ForwardIndexType.getDefaultCompressionType(context.getFieldSpec().getFieldType()); + } + + // Dictionary disabled columns + boolean deriveNumDocsPerChunk = indexConfig.isDeriveNumDocsPerChunk(); + int writerVersion = indexConfig.getRawIndexWriterVersion(); + if (context.getFieldSpec().isSingleValueField()) { + return getRawIndexCreatorForSVColumn(context.getIndexDir(), chunkCompressionType, colName, + context.getFieldSpec().getDataType().getStoredType(), + context.getTotalDocs(), context.getLengthOfLongestEntry(), deriveNumDocsPerChunk, writerVersion); + } else { + return getRawIndexCreatorForMVColumn(context.getIndexDir(), chunkCompressionType, colName, + context.getFieldSpec().getDataType().getStoredType(), + context.getTotalDocs(), context.getMaxNumberOfMultiValueElements(), deriveNumDocsPerChunk, writerVersion, + context.getMaxRowLengthInBytes()); + } + } else { + // Dictionary enabled columns + if (context.getFieldSpec().isSingleValueField()) { + if (context.isSorted()) { + return new SingleValueSortedForwardIndexCreator(context.getIndexDir(), colName, + context.getCardinality()); + } else { + return new SingleValueUnsortedForwardIndexCreator(context.getIndexDir(), colName, + context.getCardinality(), context.getTotalDocs()); + } + } else { + return new MultiValueUnsortedForwardIndexCreator(context.getIndexDir(), colName, + context.getCardinality(), context.getTotalDocs(), context.getTotalNumberOfEntries()); + } + } + } + + /** + * Helper method to build the raw index creator for the column. + * Assumes that column to be indexed is single valued. + * + * @param file Output index file + * @param column Column name + * @param totalDocs Total number of documents to index + * @param lengthOfLongestEntry Length of longest entry + * @param deriveNumDocsPerChunk true if varbyte writer should auto-derive the number of rows per chunk + * @param writerVersion version to use for the raw index writer + * @return raw index creator + */ + public static ForwardIndexCreator getRawIndexCreatorForSVColumn(File file, ChunkCompressionType compressionType, + String column, FieldSpec.DataType dataType, int totalDocs, int lengthOfLongestEntry, + boolean deriveNumDocsPerChunk, + int writerVersion) Review Comment: format (interesting this was not caught by linter) ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java: ########## @@ -379,340 +325,95 @@ private boolean shouldCreateDictionaryWithinThreshold(ColumnIndexCreationInfo in } /** - * Helper method that returns compression type to use based on segment creation spec and field type. - * <ul> - * <li> Returns compression type from segment creation spec, if specified there.</li> - * <li> Else, returns PASS_THROUGH for metrics, and SNAPPY for dimensions. This is because metrics are likely - * to be spread in different chunks after applying predicates. Same could be true for dimensions, but in that - * case, clients are expected to explicitly specify the appropriate compression type in the spec. </li> - * </ul> - * @param segmentCreationSpec Segment creation spec - * @param fieldSpec Field spec for the column - * @return Compression type to use + * @deprecated use {@link ForwardIndexType#getDefaultCompressionType(FieldType)} instead */ - private ChunkCompressionType getColumnCompressionType(SegmentGeneratorConfig segmentCreationSpec, - FieldSpec fieldSpec) { - ChunkCompressionType compressionType = segmentCreationSpec.getRawIndexCompressionType().get(fieldSpec.getName()); - if (compressionType == null) { - compressionType = getDefaultCompressionType(fieldSpec.getFieldType()); - } - - return compressionType; - } - + @Deprecated public static ChunkCompressionType getDefaultCompressionType(FieldType fieldType) { - if (fieldType == FieldSpec.FieldType.METRIC) { - return ChunkCompressionType.PASS_THROUGH; - } else { - return ChunkCompressionType.LZ4; - } + return ForwardIndexType.getDefaultCompressionType(fieldType); } @Override public void indexRow(GenericRow row) throws IOException { - for (Map.Entry<String, ForwardIndexCreator> entry : _forwardIndexCreatorMap.entrySet()) { - String columnName = entry.getKey(); - ForwardIndexCreator forwardIndexCreator = entry.getValue(); + for (Map.Entry<String, Map<IndexType<?, ?, ?>, IndexCreator>> byColEntry : _creatorsByColAndIndex.entrySet()) { + String columnName = byColEntry.getKey(); Object columnValueToIndex = row.getValue(columnName); if (columnValueToIndex == null) { throw new RuntimeException("Null value for column:" + columnName); } - FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName); + Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex = byColEntry.getValue(); - //get dictionaryCreator, will be null if column is not dictionaryEncoded + FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName); SegmentDictionaryCreator dictionaryCreator = _dictionaryCreatorMap.get(columnName); - // bloom filter - BloomFilterCreator bloomFilterCreator = _bloomFilterCreatorMap.get(columnName); - if (bloomFilterCreator != null) { - if (fieldSpec.isSingleValueField()) { - if (fieldSpec.getDataType() == DataType.BYTES) { - bloomFilterCreator.add(BytesUtils.toHexString((byte[]) columnValueToIndex)); - } else { - bloomFilterCreator.add(columnValueToIndex.toString()); - } - } else { - Object[] values = (Object[]) columnValueToIndex; - if (fieldSpec.getDataType() == DataType.BYTES) { - for (Object value : values) { - bloomFilterCreator.add(BytesUtils.toHexString((byte[]) value)); - } - } else { - for (Object value : values) { - bloomFilterCreator.add(value.toString()); - } - } - } - } - - // range index - CombinedInvertedIndexCreator combinedInvertedIndexCreator = _rangeIndexFilterCreatorMap.get(columnName); - if (combinedInvertedIndexCreator != null) { - if (dictionaryCreator != null) { - if (fieldSpec.isSingleValueField()) { - combinedInvertedIndexCreator.add(dictionaryCreator.indexOfSV(columnValueToIndex)); - } else { - int[] dictIds = dictionaryCreator.indexOfMV(columnValueToIndex); - combinedInvertedIndexCreator.add(dictIds, dictIds.length); - } - } else { - if (fieldSpec.isSingleValueField()) { - switch (fieldSpec.getDataType()) { - case INT: - combinedInvertedIndexCreator.add((Integer) columnValueToIndex); - break; - case LONG: - combinedInvertedIndexCreator.add((Long) columnValueToIndex); - break; - case FLOAT: - combinedInvertedIndexCreator.add((Float) columnValueToIndex); - break; - case DOUBLE: - combinedInvertedIndexCreator.add((Double) columnValueToIndex); - break; - default: - throw new RuntimeException("Unsupported data type " + fieldSpec.getDataType() + " for range index"); - } - } else { - Object[] values = (Object[]) columnValueToIndex; - switch (fieldSpec.getDataType()) { - case INT: - int[] intValues = new int[values.length]; - for (int i = 0; i < values.length; i++) { - intValues[i] = (Integer) values[i]; - } - combinedInvertedIndexCreator.add(intValues, values.length); - break; - case LONG: - long[] longValues = new long[values.length]; - for (int i = 0; i < values.length; i++) { - longValues[i] = (Long) values[i]; - } - combinedInvertedIndexCreator.add(longValues, values.length); - break; - case FLOAT: - float[] floatValues = new float[values.length]; - for (int i = 0; i < values.length; i++) { - floatValues[i] = (Float) values[i]; - } - combinedInvertedIndexCreator.add(floatValues, values.length); - break; - case DOUBLE: - double[] doubleValues = new double[values.length]; - for (int i = 0; i < values.length; i++) { - doubleValues[i] = (Double) values[i]; - } - combinedInvertedIndexCreator.add(doubleValues, values.length); - break; - default: - throw new RuntimeException("Unsupported data type " + fieldSpec.getDataType() + " for range index"); - } - } - } - } - - // text-index - TextIndexCreator textIndexCreator = _textIndexCreatorMap.get(columnName); - if (textIndexCreator != null) { - if (fieldSpec.isSingleValueField()) { - textIndexCreator.add((String) columnValueToIndex); - } else { - Object[] values = (Object[]) columnValueToIndex; - int length = values.length; - if (values instanceof String[]) { - textIndexCreator.add((String[]) values, length); - } else { - String[] strings = new String[length]; - for (int i = 0; i < length; i++) { - strings[i] = (String) values[i]; - } - textIndexCreator.add(strings, length); - columnValueToIndex = strings; - } - } - } - if (fieldSpec.isSingleValueField()) { - // Single Value column - JsonIndexCreator jsonIndexCreator = _jsonIndexCreatorMap.get(columnName); - if (jsonIndexCreator != null) { - jsonIndexCreator.add((String) columnValueToIndex); - } - GeoSpatialIndexCreator h3IndexCreator = _h3IndexCreatorMap.get(columnName); - if (h3IndexCreator != null) { - h3IndexCreator.add(GeometrySerializer.deserialize((byte[]) columnValueToIndex)); - } - if (dictionaryCreator != null) { - // dictionary encoded SV column - // get dictID from dictionary - int dictId = dictionaryCreator.indexOfSV(columnValueToIndex); - // store the docID -> dictID mapping in forward index - if (forwardIndexCreator != null) { - forwardIndexCreator.putDictId(dictId); - } - DictionaryBasedInvertedIndexCreator invertedIndexCreator = _invertedIndexCreatorMap.get(columnName); - if (invertedIndexCreator != null) { - // if inverted index enabled during segment creation, - // then store dictID -> docID mapping in inverted index - invertedIndexCreator.add(dictId); - } - } else { - // non-dictionary encoded SV column - // store the docId -> raw value mapping in forward index - if (textIndexCreator != null && !shouldStoreRawValueForTextIndex(columnName)) { - // for text index on raw columns, check the config to determine if actual raw value should - // be stored or not - columnValueToIndex = _columnProperties.get(columnName).get(FieldConfig.TEXT_INDEX_RAW_VALUE); - if (columnValueToIndex == null) { - columnValueToIndex = FieldConfig.TEXT_INDEX_DEFAULT_RAW_VALUE; - } - } - if (forwardIndexCreator != null) { - switch (forwardIndexCreator.getValueType()) { - case INT: - forwardIndexCreator.putInt((int) columnValueToIndex); - break; - case LONG: - forwardIndexCreator.putLong((long) columnValueToIndex); - break; - case FLOAT: - forwardIndexCreator.putFloat((float) columnValueToIndex); - break; - case DOUBLE: - forwardIndexCreator.putDouble((double) columnValueToIndex); - break; - case BIG_DECIMAL: - forwardIndexCreator.putBigDecimal((BigDecimal) columnValueToIndex); - break; - case STRING: - forwardIndexCreator.putString((String) columnValueToIndex); - break; - case BYTES: - forwardIndexCreator.putBytes((byte[]) columnValueToIndex); - break; - case JSON: - if (columnValueToIndex instanceof String) { - forwardIndexCreator.putString((String) columnValueToIndex); - } else if (columnValueToIndex instanceof byte[]) { - forwardIndexCreator.putBytes((byte[]) columnValueToIndex); - } - break; - default: - throw new IllegalStateException(); - } - } - } + indexSingleValueRow(dictionaryCreator, columnValueToIndex, creatorsByIndex); } else { - if (dictionaryCreator != null) { - //dictionary encoded - int[] dictIds = dictionaryCreator.indexOfMV(columnValueToIndex); - if (forwardIndexCreator != null) { - forwardIndexCreator.putDictIdMV(dictIds); - } - DictionaryBasedInvertedIndexCreator invertedIndexCreator = _invertedIndexCreatorMap.get(columnName); - if (invertedIndexCreator != null) { - invertedIndexCreator.add(dictIds, dictIds.length); - } - } else { - // for text index on raw columns, check the config to determine if actual raw value should - // be stored or not - if (forwardIndexCreator != null) { - if (textIndexCreator != null && !shouldStoreRawValueForTextIndex(columnName)) { - Object value = _columnProperties.get(columnName).get(FieldConfig.TEXT_INDEX_RAW_VALUE); - if (value == null) { - value = FieldConfig.TEXT_INDEX_DEFAULT_RAW_VALUE; - } - if (forwardIndexCreator.getValueType().getStoredType() == DataType.STRING) { - columnValueToIndex = new String[]{String.valueOf(value)}; - } else if (forwardIndexCreator.getValueType().getStoredType() == DataType.BYTES) { - columnValueToIndex = new byte[][]{String.valueOf(value).getBytes(UTF_8)}; - } else { - throw new RuntimeException("Text Index is only supported for STRING and BYTES stored type"); - } - } - Object[] values = (Object[]) columnValueToIndex; - int length = values.length; - switch (forwardIndexCreator.getValueType()) { - case INT: - int[] ints = new int[length]; - for (int i = 0; i < length; i++) { - ints[i] = (Integer) values[i]; - } - forwardIndexCreator.putIntMV(ints); - break; - case LONG: - long[] longs = new long[length]; - for (int i = 0; i < length; i++) { - longs[i] = (Long) values[i]; - } - forwardIndexCreator.putLongMV(longs); - break; - case FLOAT: - float[] floats = new float[length]; - for (int i = 0; i < length; i++) { - floats[i] = (Float) values[i]; - } - forwardIndexCreator.putFloatMV(floats); - break; - case DOUBLE: - double[] doubles = new double[length]; - for (int i = 0; i < length; i++) { - doubles[i] = (Double) values[i]; - } - forwardIndexCreator.putDoubleMV(doubles); - break; - case STRING: - if (values instanceof String[]) { - forwardIndexCreator.putStringMV((String[]) values); - } else { - String[] strings = new String[length]; - for (int i = 0; i < length; i++) { - strings[i] = (String) values[i]; - } - forwardIndexCreator.putStringMV(strings); - } - break; - case BYTES: - if (values instanceof byte[][]) { - forwardIndexCreator.putBytesMV((byte[][]) values); - } else { - byte[][] bytesArray = new byte[length][]; - for (int i = 0; i < length; i++) { - bytesArray[i] = (byte[]) values[i]; - } - forwardIndexCreator.putBytesMV(bytesArray); - } - break; - default: - throw new IllegalStateException(); - } - } - } + indexMultiValueRow(dictionaryCreator, (Object[]) columnValueToIndex, creatorsByIndex); } + } - if (_nullHandlingEnabled) { + if (_nullHandlingEnabled) { + for (Map.Entry<String, NullValueVectorCreator> entry : _nullValueVectorCreatorMap.entrySet()) { + String columnName = entry.getKey(); // If row has null value for given column name, add to null value vector if (row.isNullValue(columnName)) { _nullValueVectorCreatorMap.get(columnName).setNull(_docIdCounter); } } } + _docIdCounter++; } - private boolean shouldStoreRawValueForTextIndex(String column) { - if (_columnProperties != null) { - Map<String, String> props = _columnProperties.get(column); - // by default always store the raw value - // if the config is set to true, don't store the actual raw value - // there will be a dummy value - return props == null || !Boolean.parseBoolean(props.get(FieldConfig.TEXT_INDEX_NO_RAW_DATA)); + private void indexSingleValueRow(SegmentDictionaryCreator dictionaryCreator, Object value, + Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex) + throws IOException { + int dictId = dictionaryCreator != null ? dictionaryCreator.indexOfSV(value) : -1; + for (IndexCreator creator : creatorsByIndex.values()) { + creator.add(value, dictId); + } + } + + private void indexMultiValueRow(SegmentDictionaryCreator dictionaryCreator, Object[] values, + Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex) + throws IOException { + int[] dictId = dictionaryCreator != null ? dictionaryCreator.indexOfMV(values) : null; + for (IndexCreator creator : creatorsByIndex.values()) { + creator.add(values, dictId); + } + } + + @Nullable + private Object calculateAlternativeValue(boolean dictEnabledColumn, FieldIndexConfigs configs, FieldSpec fieldSpec) { + if (dictEnabledColumn) { + return null; } + TextIndexConfig textConfig = configs.getConfig(StandardIndexes.text()); + if (!textConfig.isEnabled()) { + return null; + } + + Object alternativeValue = textConfig.getRawValueForTextIndex(); - return true; + if (alternativeValue == null) { + return null; + } else if (!fieldSpec.isSingleValueField()) { Review Comment: nit: rename this method calculateRawValueForTextIndex? ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/bloom/BloomIndexType.java: ########## @@ -40,27 +46,43 @@ import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; -public class BloomIndexType implements IndexType<BloomFilterConfig, BloomFilterReader, BloomFilterCreator> { - public static final BloomIndexType INSTANCE = new BloomIndexType(); - @Override - public String getId() { - return StandardIndexes.BLOOM_FILTER_ID; +public class BloomIndexType + extends AbstractIndexType<BloomFilterConfig, BloomFilterReader, BloomFilterCreator> + implements ConfigurableFromIndexLoadingConfig<BloomFilterConfig> { + + protected BloomIndexType() { + super(StandardIndexes.BLOOM_FILTER_ID); } @Override public Class<BloomFilterConfig> getIndexConfigClass() { return BloomFilterConfig.class; } + @Override + public Map<String, BloomFilterConfig> fromIndexLoadingConfig(IndexLoadingConfig indexLoadingConfig) { + return indexLoadingConfig.getBloomFilterConfigs(); + } + @Override public BloomFilterConfig getDefaultConfig() { return BloomFilterConfig.DISABLED; } @Override - public BloomFilterConfig getConfig(TableConfig tableConfig, Schema schema) { - throw new UnsupportedOperationException("To be implemented in a future PR"); + public ColumnConfigDeserializer<BloomFilterConfig> getDeserializer() { + return IndexConfigDeserializer.fromIndexes("bloom", getIndexConfigClass()) + .withExclusiveAlternative( + IndexConfigDeserializer.ifIndexingConfig( + // reads tableConfig.indexingConfig.bloomFilterConfigs Review Comment: nit: format the comments more consistently? ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/fst/FstIndexType.java: ########## @@ -19,74 +19,152 @@ package org.apache.pinot.segment.local.segment.index.fst; +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.util.HashMap; import java.util.Map; +import java.util.Set; import javax.annotation.Nullable; +import org.apache.pinot.segment.local.segment.creator.impl.inv.text.LuceneFSTIndexCreator; +import org.apache.pinot.segment.local.segment.index.loader.ConfigurableFromIndexLoadingConfig; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.local.segment.index.loader.invertedindex.FSTIndexHandler; +import org.apache.pinot.segment.local.segment.index.readers.LuceneFSTIndexReader; +import org.apache.pinot.segment.local.utils.nativefst.FSTHeader; +import org.apache.pinot.segment.local.utils.nativefst.NativeFSTIndexCreator; +import org.apache.pinot.segment.local.utils.nativefst.NativeFSTIndexReader; import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.V1Constants; import org.apache.pinot.segment.spi.creator.IndexCreationContext; +import org.apache.pinot.segment.spi.index.AbstractIndexType; +import org.apache.pinot.segment.spi.index.ColumnConfigDeserializer; import org.apache.pinot.segment.spi.index.FieldIndexConfigs; -import org.apache.pinot.segment.spi.index.IndexCreator; +import org.apache.pinot.segment.spi.index.FstIndexConfig; +import org.apache.pinot.segment.spi.index.IndexConfigDeserializer; import org.apache.pinot.segment.spi.index.IndexHandler; -import org.apache.pinot.segment.spi.index.IndexReader; +import org.apache.pinot.segment.spi.index.IndexReaderConstraintException; import org.apache.pinot.segment.spi.index.IndexReaderFactory; import org.apache.pinot.segment.spi.index.IndexType; import org.apache.pinot.segment.spi.index.StandardIndexes; +import org.apache.pinot.segment.spi.index.creator.FSTIndexCreator; +import org.apache.pinot.segment.spi.index.reader.TextIndexReader; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; import org.apache.pinot.segment.spi.store.SegmentDirectory; -import org.apache.pinot.spi.config.table.IndexConfig; +import org.apache.pinot.spi.config.table.FSTType; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.IndexingConfig; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; -public class FstIndexType implements IndexType<IndexConfig, IndexReader, IndexCreator> { - public static final FstIndexType INSTANCE = new FstIndexType(); +public class FstIndexType extends AbstractIndexType<FstIndexConfig, TextIndexReader, FSTIndexCreator> + implements ConfigurableFromIndexLoadingConfig<FstIndexConfig> { - private FstIndexType() { + protected FstIndexType() { + super(StandardIndexes.FST_ID); } @Override - public String getId() { - return StandardIndexes.FST_ID; + public Class<FstIndexConfig> getIndexConfigClass() { + return FstIndexConfig.class; } @Override - public Class<IndexConfig> getIndexConfigClass() { - return IndexConfig.class; + public Map<String, FstIndexConfig> fromIndexLoadingConfig(IndexLoadingConfig indexLoadingConfig) { + Map<String, FstIndexConfig> result = new HashMap<>(); + Set<String> fstIndexColumns = indexLoadingConfig.getFSTIndexColumns(); + for (String column : indexLoadingConfig.getAllKnownColumns()) { + if (fstIndexColumns.contains(column)) { + FstIndexConfig conf = new FstIndexConfig(indexLoadingConfig.getFSTIndexType()); + result.put(column, conf); + } else { + result.put(column, FstIndexConfig.DISABLED); + } + } + return result; } @Override - public IndexConfig getDefaultConfig() { - return IndexConfig.DISABLED; + public FstIndexConfig getDefaultConfig() { + return FstIndexConfig.DISABLED; } @Override - public IndexConfig getConfig(TableConfig tableConfig, Schema schema) { - throw new UnsupportedOperationException(); + public ColumnConfigDeserializer<FstIndexConfig> getDeserializer() { + return IndexConfigDeserializer.fromIndexes("fst", getIndexConfigClass()) + .withExclusiveAlternative(IndexConfigDeserializer.fromIndexTypes( + FieldConfig.IndexType.FST, + (tableConfig, fieldConfig) -> { + IndexingConfig indexingConfig = tableConfig.getIndexingConfig(); + FSTType fstIndexType = indexingConfig != null ? indexingConfig.getFSTIndexType() : null; + return new FstIndexConfig(fstIndexType); + })); } @Override - public IndexCreator createIndexCreator(IndexCreationContext context, IndexConfig indexConfig) - throws Exception { - throw new UnsupportedOperationException(); + public FSTIndexCreator createIndexCreator(IndexCreationContext context, FstIndexConfig indexConfig) + throws IOException { + Preconditions.checkState(context.getFieldSpec().isSingleValueField(), + "FST index is currently only supported on single-value columns"); + Preconditions.checkState(context.getFieldSpec().getDataType().getStoredType() == FieldSpec.DataType.STRING, + "FST index is currently only supported on STRING type columns"); + Preconditions.checkState(context.hasDictionary(), + "FST index is currently only supported on dictionary-encoded columns"); + if (indexConfig.getFstType() == FSTType.NATIVE) { + return new NativeFSTIndexCreator(context); + } else { + return new LuceneFSTIndexCreator(context); + } } @Override - public IndexReaderFactory<IndexReader> getReaderFactory() { - throw new UnsupportedOperationException(); + public ReaderFactory getReaderFactory() { + return ReaderFactory.INSTANCE; + } + + public static TextIndexReader read(PinotDataBuffer dataBuffer, ColumnMetadata metadata) + throws IndexReaderConstraintException, IOException { + return ReaderFactory.INSTANCE.createIndexReader(dataBuffer, metadata); } @Override public IndexHandler createIndexHandler(SegmentDirectory segmentDirectory, Map<String, FieldIndexConfigs> configsByCol, @Nullable Schema schema, @Nullable TableConfig tableConfig) { - throw new UnsupportedOperationException(); + return new FSTIndexHandler(segmentDirectory, configsByCol, tableConfig); } @Override public String getFileExtension(ColumnMetadata columnMetadata) { return V1Constants.Indexes.FST_INDEX_FILE_EXTENSION; } - @Override - public String toString() { - return getId(); + public static class ReaderFactory extends IndexReaderFactory.Default<FstIndexConfig, TextIndexReader> { + public static final ReaderFactory INSTANCE = new ReaderFactory(); + @Override + protected IndexType<FstIndexConfig, TextIndexReader, ?> getIndexType() { + return StandardIndexes.fst(); + } + + protected TextIndexReader createIndexReader(PinotDataBuffer dataBuffer, ColumnMetadata metadata) + throws IndexReaderConstraintException, IOException { + if (!metadata.hasDictionary()) { + throw new IndexReaderConstraintException(metadata.getColumnName(), StandardIndexes.fst(), + "This index requires a dictionary"); Review Comment: I didn't find this error msg in current logic. could you point me back to where this would be needed in the current logic? ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/dictionary/DictionaryIndexType.java: ########## @@ -19,74 +19,265 @@ package org.apache.pinot.segment.local.segment.index.dictionary; +import com.google.common.collect.Sets; +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; import javax.annotation.Nullable; +import org.apache.pinot.segment.local.segment.creator.impl.SegmentDictionaryCreator; +import org.apache.pinot.segment.local.segment.index.loader.ConfigurableFromIndexLoadingConfig; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.local.segment.index.readers.BigDecimalDictionary; +import org.apache.pinot.segment.local.segment.index.readers.BytesDictionary; +import org.apache.pinot.segment.local.segment.index.readers.DoubleDictionary; +import org.apache.pinot.segment.local.segment.index.readers.FloatDictionary; +import org.apache.pinot.segment.local.segment.index.readers.IntDictionary; +import org.apache.pinot.segment.local.segment.index.readers.LongDictionary; +import org.apache.pinot.segment.local.segment.index.readers.OnHeapBigDecimalDictionary; +import org.apache.pinot.segment.local.segment.index.readers.OnHeapBytesDictionary; +import org.apache.pinot.segment.local.segment.index.readers.OnHeapDoubleDictionary; +import org.apache.pinot.segment.local.segment.index.readers.OnHeapFloatDictionary; +import org.apache.pinot.segment.local.segment.index.readers.OnHeapIntDictionary; +import org.apache.pinot.segment.local.segment.index.readers.OnHeapLongDictionary; +import org.apache.pinot.segment.local.segment.index.readers.OnHeapStringDictionary; +import org.apache.pinot.segment.local.segment.index.readers.StringDictionary; import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.creator.ColumnStatistics; import org.apache.pinot.segment.spi.creator.IndexCreationContext; +import org.apache.pinot.segment.spi.index.AbstractIndexType; +import org.apache.pinot.segment.spi.index.ColumnConfigDeserializer; +import org.apache.pinot.segment.spi.index.DictionaryIndexConfig; import org.apache.pinot.segment.spi.index.FieldIndexConfigs; -import org.apache.pinot.segment.spi.index.IndexCreator; +import org.apache.pinot.segment.spi.index.IndexConfigDeserializer; import org.apache.pinot.segment.spi.index.IndexHandler; -import org.apache.pinot.segment.spi.index.IndexReader; +import org.apache.pinot.segment.spi.index.IndexReaderConstraintException; import org.apache.pinot.segment.spi.index.IndexReaderFactory; import org.apache.pinot.segment.spi.index.IndexType; import org.apache.pinot.segment.spi.index.StandardIndexes; +import org.apache.pinot.segment.spi.index.reader.Dictionary; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; import org.apache.pinot.segment.spi.store.SegmentDirectory; -import org.apache.pinot.spi.config.table.IndexConfig; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.IndexingConfig; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class DictionaryIndexType implements IndexType<IndexConfig, IndexReader, IndexCreator> { - public static final DictionaryIndexType INSTANCE = new DictionaryIndexType(); +public class DictionaryIndexType + extends AbstractIndexType<DictionaryIndexConfig, Dictionary, SegmentDictionaryCreator> + implements ConfigurableFromIndexLoadingConfig<DictionaryIndexConfig> { + private static final Logger LOGGER = LoggerFactory.getLogger(DictionaryIndexType.class); - private DictionaryIndexType() { + protected DictionaryIndexType() { + super(StandardIndexes.DICTIONARY_ID); } @Override - public String getId() { - return StandardIndexes.DICTIONARY_ID; + public Class<DictionaryIndexConfig> getIndexConfigClass() { + return DictionaryIndexConfig.class; } @Override - public Class<IndexConfig> getIndexConfigClass() { - return IndexConfig.class; + public Map<String, DictionaryIndexConfig> fromIndexLoadingConfig( + IndexLoadingConfig indexLoadingConfig) { + Map<String, DictionaryIndexConfig> result = new HashMap<>(); + Set<String> noDictionaryColumns = indexLoadingConfig.getNoDictionaryColumns(); + Set<String> onHeapCols = indexLoadingConfig.getOnHeapDictionaryColumns(); + Set<String> varLengthCols = indexLoadingConfig.getVarLengthDictionaryColumns(); + for (String column : indexLoadingConfig.getAllKnownColumns()) { + if (noDictionaryColumns.contains(column)) { + result.put(column, DictionaryIndexConfig.disabled()); + } else { + result.put(column, new DictionaryIndexConfig(onHeapCols.contains(column), varLengthCols.contains(column))); + } + } + return result; } @Override - public IndexConfig getDefaultConfig() { - return IndexConfig.DISABLED; + public DictionaryIndexConfig getDefaultConfig() { + return DictionaryIndexConfig.DEFAULT; } @Override - public IndexConfig getConfig(TableConfig tableConfig, Schema schema) { - throw new UnsupportedOperationException(); + public ColumnConfigDeserializer<DictionaryIndexConfig> getDeserializer() { + // reads tableConfig.indexingConfig.noDictionaryConfig + ColumnConfigDeserializer<DictionaryIndexConfig> fromNoDictConf = IndexConfigDeserializer.fromMap( + tableConfig -> tableConfig.getIndexingConfig() == null ? Collections.emptyMap() + : tableConfig.getIndexingConfig().getNoDictionaryConfig(), + (accum, col, value) -> accum.put(col, DictionaryIndexConfig.disabled())); + + // reads tableConfig.indexingConfig.noDictionaryColumns + ColumnConfigDeserializer<DictionaryIndexConfig> fromNoDictCol = IndexConfigDeserializer.fromCollection( + tableConfig -> tableConfig.getIndexingConfig() == null ? Collections.emptyList() + : tableConfig.getIndexingConfig().getNoDictionaryColumns(), + (accum, noDictionaryCol) -> accum.put(noDictionaryCol, DictionaryIndexConfig.disabled())); + + // reads tableConfig.fieldConfigList.encodingType + ColumnConfigDeserializer<DictionaryIndexConfig> fromFieldConfigList = IndexConfigDeserializer.fromCollection( + TableConfig::getFieldConfigList, + (accum, fieldConfig) -> { + if (fieldConfig.getEncodingType() == FieldConfig.EncodingType.RAW) { + accum.put(fieldConfig.getName(), DictionaryIndexConfig.disabled()); + } + }); + + // reads tableConfig.indexingConfig.onHeapDictionaryColumns and + // tableConfig.indexingConfig.varLengthDictionaryColumns + ColumnConfigDeserializer<DictionaryIndexConfig> fromIndexingConfig = (tableConfig, schema) -> { + IndexingConfig ic = tableConfig.getIndexingConfig(); + if (ic == null) { + return Collections.emptyMap(); + } + Set<String> onHeap = new HashSet<>( + ic.getOnHeapDictionaryColumns() == null ? Collections.emptyList() : ic.getOnHeapDictionaryColumns()); + Set<String> varLength = new HashSet<>( + ic.getVarLengthDictionaryColumns() == null ? Collections.emptyList() : ic.getVarLengthDictionaryColumns() + ); + Function<String, DictionaryIndexConfig> valueCalculator = + column -> new DictionaryIndexConfig(onHeap.contains(column), varLength.contains(column)); + return Sets.union(onHeap, varLength).stream() + .collect(Collectors.toMap(Function.identity(), valueCalculator)); + }; + + return fromNoDictConf + .withFallbackAlternative(fromNoDictCol) + .withFallbackAlternative(fromFieldConfigList) + .withFallbackAlternative(fromIndexingConfig) + .withExclusiveAlternative(IndexConfigDeserializer.fromIndexes(getId(), getIndexConfigClass())); } @Override - public IndexCreator createIndexCreator(IndexCreationContext context, IndexConfig indexConfig) + public SegmentDictionaryCreator createIndexCreator(IndexCreationContext context, DictionaryIndexConfig indexConfig) { + boolean useVarLengthDictionary = shouldUseVarLengthDictionary(context, indexConfig); + return new SegmentDictionaryCreator(context.getFieldSpec(), context.getIndexDir(), useVarLengthDictionary); + } + + public boolean shouldUseVarLengthDictionary(IndexCreationContext context, DictionaryIndexConfig indexConfig) { + if (indexConfig.getUseVarLengthDictionary()) { + return true; + } + FieldSpec.DataType storedType = context.getFieldSpec().getDataType().getStoredType(); + if (storedType != FieldSpec.DataType.BYTES && storedType != FieldSpec.DataType.BIG_DECIMAL) { + return false; + } + return !context.isFixedLength(); + } + + public static boolean shouldUseVarLengthDictionary(String columnName, Set<String> varLengthDictColumns, + FieldSpec.DataType columnStoredType, ColumnStatistics columnProfile) { + if (varLengthDictColumns.contains(columnName)) { + return true; + } + + return shouldUseVarLengthDictionary(columnStoredType, columnProfile); + } + + public static boolean shouldUseVarLengthDictionary(FieldSpec.DataType columnStoredType, ColumnStatistics profile) { + if (columnStoredType == FieldSpec.DataType.BYTES || columnStoredType == FieldSpec.DataType.BIG_DECIMAL) { + return !profile.isFixedLength(); + } + + return false; + } + + public SegmentDictionaryCreator createIndexCreator(FieldSpec fieldSpec, File indexDir, boolean useVarLengthDictionary) throws Exception { - throw new UnsupportedOperationException(); + return new SegmentDictionaryCreator(fieldSpec, indexDir, useVarLengthDictionary); + } + + public static Dictionary read(SegmentDirectory.Reader segmentReader, ColumnMetadata columnMetadata) + throws IOException { + PinotDataBuffer dataBuffer = + segmentReader.getIndexFor(columnMetadata.getColumnName(), StandardIndexes.dictionary()); + return read(dataBuffer, columnMetadata, new DictionaryIndexConfig(false, true)); + } + + public static Dictionary read(PinotDataBuffer dataBuffer, ColumnMetadata metadata, DictionaryIndexConfig indexConfig) + throws IOException { + FieldSpec.DataType dataType = metadata.getDataType(); + boolean loadOnHeap = indexConfig.isOnHeap(); + if (loadOnHeap) { + String columnName = metadata.getColumnName(); + LOGGER.info("Loading on-heap dictionary for column: {}", columnName); + } + + int length = metadata.getCardinality(); + switch (dataType.getStoredType()) { + case INT: + return loadOnHeap ? new OnHeapIntDictionary(dataBuffer, length) + : new IntDictionary(dataBuffer, length); + case LONG: + return loadOnHeap ? new OnHeapLongDictionary(dataBuffer, length) + : new LongDictionary(dataBuffer, length); + case FLOAT: + return loadOnHeap ? new OnHeapFloatDictionary(dataBuffer, length) + : new FloatDictionary(dataBuffer, length); + case DOUBLE: + return loadOnHeap ? new OnHeapDoubleDictionary(dataBuffer, length) + : new DoubleDictionary(dataBuffer, length); + case BIG_DECIMAL: + int numBytesPerValue = metadata.getColumnMaxLength(); + return loadOnHeap ? new OnHeapBigDecimalDictionary(dataBuffer, length, numBytesPerValue) + : new BigDecimalDictionary(dataBuffer, length, numBytesPerValue); + case STRING: + numBytesPerValue = metadata.getColumnMaxLength(); + return loadOnHeap ? new OnHeapStringDictionary(dataBuffer, length, numBytesPerValue) + : new StringDictionary(dataBuffer, length, numBytesPerValue); + case BYTES: + numBytesPerValue = metadata.getColumnMaxLength(); + return loadOnHeap ? new OnHeapBytesDictionary(dataBuffer, length, numBytesPerValue) + : new BytesDictionary(dataBuffer, length, numBytesPerValue); + default: + throw new IllegalStateException("Unsupported data type for dictionary: " + dataType); + } } @Override - public IndexReaderFactory<IndexReader> getReaderFactory() { - throw new UnsupportedOperationException(); + public IndexReaderFactory<Dictionary> getReaderFactory() { + return ReaderFactory.INSTANCE; } @Override public IndexHandler createIndexHandler(SegmentDirectory segmentDirectory, Map<String, FieldIndexConfigs> configsByCol, @Nullable Schema schema, @Nullable TableConfig tableConfig) { - throw new UnsupportedOperationException(); + return IndexHandler.NoOp.INSTANCE; } - @Override - public String getFileExtension(ColumnMetadata columnMetadata) { + public static String getFileExtension() { return V1Constants.Dict.FILE_EXTENSION; } @Override - public String toString() { - return getId(); + public String getFileExtension(ColumnMetadata columnMetadata) { + return getFileExtension(); + } + + public static class ReaderFactory extends IndexReaderFactory.Default<DictionaryIndexConfig, Dictionary> { Review Comment: is there need to make this inner class `public`? from a quick search in this PR, I didn't see direct use of it outside the class. Some get reference of it via getReaderFactory() above. Same question for the other ReaderFactory classes for other index types. Looks like they don't need to be `public`. ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/inverted/InvertedIndexType.java: ########## @@ -86,7 +130,46 @@ public String getFileExtension(ColumnMetadata columnMetadata) { } @Override - public String toString() { - return getId(); + public IndexHandler createIndexHandler(SegmentDirectory segmentDirectory, Map<String, FieldIndexConfigs> configsByCol, + @Nullable Schema schema, @Nullable TableConfig tableConfig) { + return new InvertedIndexHandler(segmentDirectory, configsByCol, tableConfig); + } + + public static class ReaderFactory implements IndexReaderFactory<InvertedIndexReader> { + public static final ReaderFactory INSTANCE = new ReaderFactory(); + + private ReaderFactory() { + } + + @Override + public InvertedIndexReader createIndexReader(SegmentDirectory.Reader segmentReader, + FieldIndexConfigs fieldIndexConfigs, ColumnMetadata metadata) + throws IOException, IndexReaderConstraintException { + if (fieldIndexConfigs == null || !fieldIndexConfigs.getConfig(StandardIndexes.inverted()).isEnabled()) { + return null; + } + if (!metadata.hasDictionary()) { + throw new IllegalStateException("Column " + metadata.getColumnName() + " cannot be indexed by an inverted " + + "index if it has no dictionary"); + } + if (metadata.isSorted() && metadata.isSingleValue()) { + ForwardIndexReader fwdReader = StandardIndexes.forward().getReaderFactory() + .createIndexReader(segmentReader, fieldIndexConfigs, metadata); + Preconditions.checkState(fwdReader instanceof SortedIndexReader); + return (SortedIndexReader) fwdReader; + } else { + return createSkippingForward(segmentReader, metadata); + } + } + + public InvertedIndexReader createSkippingForward(SegmentDirectory.Reader segmentReader, ColumnMetadata metadata) Review Comment: why the method is called create`SkippingForward`? looks like we're creating the ordinary bitmap-based inverted index here. ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/BaseIndexHandler.java: ########## @@ -40,14 +46,31 @@ */ public abstract class BaseIndexHandler implements IndexHandler { private static final Logger LOGGER = LoggerFactory.getLogger(BaseIndexHandler.class); - - protected final IndexLoadingConfig _indexLoadingConfig; protected final Set<String> _tmpForwardIndexColumns; protected final SegmentDirectory _segmentDirectory; + protected final Map<String, FieldIndexConfigs> _fieldIndexConfigs; + @Nullable + protected final TableConfig _tableConfig; public BaseIndexHandler(SegmentDirectory segmentDirectory, IndexLoadingConfig indexLoadingConfig) { + this(segmentDirectory, indexLoadingConfig.getFieldIndexConfigByColName(), indexLoadingConfig.getTableConfig()); + } + + public BaseIndexHandler(SegmentDirectory segmentDirectory, Map<String, FieldIndexConfigs> fieldIndexConfigs, + @Nullable TableConfig tableConfig) { _segmentDirectory = segmentDirectory; - _indexLoadingConfig = indexLoadingConfig; + SegmentMetadataImpl segmentMetadata = segmentDirectory.getSegmentMetadata(); + if (fieldIndexConfigs.keySet().equals(segmentMetadata.getAllColumns())) { + _fieldIndexConfigs = fieldIndexConfigs; + } else { Review Comment: save this else branch, and do `_fieldIndexConfigs.getOrDefault(..., FieldIndexConfigs.EMPTY)`? ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/fst/FstIndexType.java: ########## @@ -19,74 +19,152 @@ package org.apache.pinot.segment.local.segment.index.fst; +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.util.HashMap; import java.util.Map; +import java.util.Set; import javax.annotation.Nullable; +import org.apache.pinot.segment.local.segment.creator.impl.inv.text.LuceneFSTIndexCreator; +import org.apache.pinot.segment.local.segment.index.loader.ConfigurableFromIndexLoadingConfig; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.local.segment.index.loader.invertedindex.FSTIndexHandler; +import org.apache.pinot.segment.local.segment.index.readers.LuceneFSTIndexReader; +import org.apache.pinot.segment.local.utils.nativefst.FSTHeader; +import org.apache.pinot.segment.local.utils.nativefst.NativeFSTIndexCreator; +import org.apache.pinot.segment.local.utils.nativefst.NativeFSTIndexReader; import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.V1Constants; import org.apache.pinot.segment.spi.creator.IndexCreationContext; +import org.apache.pinot.segment.spi.index.AbstractIndexType; +import org.apache.pinot.segment.spi.index.ColumnConfigDeserializer; import org.apache.pinot.segment.spi.index.FieldIndexConfigs; -import org.apache.pinot.segment.spi.index.IndexCreator; +import org.apache.pinot.segment.spi.index.FstIndexConfig; +import org.apache.pinot.segment.spi.index.IndexConfigDeserializer; import org.apache.pinot.segment.spi.index.IndexHandler; -import org.apache.pinot.segment.spi.index.IndexReader; +import org.apache.pinot.segment.spi.index.IndexReaderConstraintException; import org.apache.pinot.segment.spi.index.IndexReaderFactory; import org.apache.pinot.segment.spi.index.IndexType; import org.apache.pinot.segment.spi.index.StandardIndexes; +import org.apache.pinot.segment.spi.index.creator.FSTIndexCreator; +import org.apache.pinot.segment.spi.index.reader.TextIndexReader; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; import org.apache.pinot.segment.spi.store.SegmentDirectory; -import org.apache.pinot.spi.config.table.IndexConfig; +import org.apache.pinot.spi.config.table.FSTType; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.IndexingConfig; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; -public class FstIndexType implements IndexType<IndexConfig, IndexReader, IndexCreator> { - public static final FstIndexType INSTANCE = new FstIndexType(); +public class FstIndexType extends AbstractIndexType<FstIndexConfig, TextIndexReader, FSTIndexCreator> + implements ConfigurableFromIndexLoadingConfig<FstIndexConfig> { - private FstIndexType() { + protected FstIndexType() { + super(StandardIndexes.FST_ID); } @Override - public String getId() { - return StandardIndexes.FST_ID; + public Class<FstIndexConfig> getIndexConfigClass() { + return FstIndexConfig.class; } @Override - public Class<IndexConfig> getIndexConfigClass() { - return IndexConfig.class; + public Map<String, FstIndexConfig> fromIndexLoadingConfig(IndexLoadingConfig indexLoadingConfig) { + Map<String, FstIndexConfig> result = new HashMap<>(); + Set<String> fstIndexColumns = indexLoadingConfig.getFSTIndexColumns(); + for (String column : indexLoadingConfig.getAllKnownColumns()) { + if (fstIndexColumns.contains(column)) { + FstIndexConfig conf = new FstIndexConfig(indexLoadingConfig.getFSTIndexType()); Review Comment: inline `conf = ..` with result.put(), or probably just shorten this if-else with ternary operator ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ConfigurableFromIndexLoadingConfig.java: ########## @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.segment.local.segment.index.loader; + +import java.util.Map; +import org.apache.pinot.segment.spi.index.ColumnConfigDeserializer; +import org.apache.pinot.spi.config.table.IndexConfig; + + +/** + * This interface can be optionally implemented by {@link org.apache.pinot.segment.spi.index.IndexType index types} to + * indicate that they can extract their configuration from an older {@link IndexLoadingConfig} object. + */ +public interface ConfigurableFromIndexLoadingConfig<C extends IndexConfig> { + + + /** + * Returns a {@link ColumnConfigDeserializer} that can be used to deserialize the index config. Review Comment: The method comment and the return type seem not consistent. ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/InvertedIndexAndDictionaryBasedForwardIndexCreator.java: ########## @@ -112,15 +113,13 @@ public class InvertedIndexAndDictionaryBasedForwardIndexCreator implements AutoC private PinotDataBuffer _forwardIndexMaxSizeBuffer; public InvertedIndexAndDictionaryBasedForwardIndexCreator(String columnName, SegmentDirectory segmentDirectory, - IndexLoadingConfig indexLoadingConfig, SegmentDirectory.Writer segmentWriter, - IndexCreatorProvider indexCreatorProvider, boolean isTemporaryForwardIndex) + boolean dictionaryEnabled, ForwardIndexConfig forwConf, SegmentDirectory.Writer segmentWriter, Review Comment: nit: fwdConf ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java: ########## @@ -102,6 +118,8 @@ public class IndexLoadingConfig { private String _instanceId; private Map<String, Map<String, String>> _instanceTierConfigs; + private boolean _dirty = true; Review Comment: could you comment a bit why the need of _dirty flag? ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java: ########## @@ -455,12 +458,16 @@ private void rewriteRawMVForwardIndexForCompressionChange(String column, ColumnM int maxRowLengthInBytes = MultiValueVarByteRawIndexCreator.getMaxRowDataLengthInBytes(lengthOfLongestEntry, maxNumberOfMVEntries); - IndexCreationContext.Forward context = - IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(existingColMetadata) - .withLengthOfLongestEntry(lengthOfLongestEntry).withMaxRowLengthInBytes(maxRowLengthInBytes).build() - .forForwardIndex(newCompressionType, _indexLoadingConfig.getColumnProperties()); + IndexCreationContext context = IndexCreationContext.builder() Review Comment: guess your IDE is using a different code style? but I'd assume linter should catch those. It seems like skipping those 'newline' changes. ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java: ########## @@ -264,51 +265,42 @@ private void createBloomFilterForColumn(SegmentDirectory.Writer segmentWriter, C if (!columnMetadata.hasDictionary()) { // Create a temporary forward index if it is disabled and does not exist - columnMetadata = createForwardIndexIfNeeded(segmentWriter, columnName, indexCreatorProvider, true); + columnMetadata = createForwardIndexIfNeeded(segmentWriter, columnName, true); } // Create new bloom filter for the column. BloomFilterConfig bloomFilterConfig = _bloomFilterConfigs.get(columnName); LOGGER.info("Creating new bloom filter for segment: {}, column: {} with config: {}", segmentName, columnName, bloomFilterConfig); if (columnMetadata.hasDictionary()) { - createAndSealBloomFilterForDictionaryColumn(bloomFilterCreatorProvider, indexDir, columnMetadata, - bloomFilterConfig, segmentWriter); + createAndSealBloomFilterForDictionaryColumn(indexDir, columnMetadata, bloomFilterConfig, segmentWriter); } else { - createAndSealBloomFilterForNonDictionaryColumn(bloomFilterCreatorProvider, indexDir, columnMetadata, - bloomFilterConfig, segmentWriter); + createAndSealBloomFilterForNonDictionaryColumn(indexDir, columnMetadata, bloomFilterConfig, segmentWriter); } // For v3, write the generated bloom filter file into the single file and remove it. if (_segmentDirectory.getSegmentMetadata().getVersion() == SegmentVersion.v3) { - LoaderUtils.writeIndexToV3Format(segmentWriter, columnName, bloomFilterFile, ColumnIndexType.BLOOM_FILTER); + LoaderUtils.writeIndexToV3Format(segmentWriter, columnName, bloomFilterFile, StandardIndexes.bloomFilter()); } // Delete the marker file. FileUtils.deleteQuietly(bloomFilterFileInProgress); LOGGER.info("Created bloom filter for segment: {}, column: {}", segmentName, columnName); } - private BaseImmutableDictionary getDictionaryReader(ColumnMetadata columnMetadata, - SegmentDirectory.Writer segmentWriter) + private Dictionary getDictionaryReader(ColumnMetadata columnMetadata, SegmentDirectory.Writer segmentWriter) throws IOException { - PinotDataBuffer dictionaryBuffer = - segmentWriter.getIndexFor(columnMetadata.getColumnName(), ColumnIndexType.DICTIONARY); - int cardinality = columnMetadata.getCardinality(); DataType dataType = columnMetadata.getDataType(); + switch (dataType) { case INT: - return new IntDictionary(dictionaryBuffer, cardinality); case LONG: - return new LongDictionary(dictionaryBuffer, cardinality); case FLOAT: - return new FloatDictionary(dictionaryBuffer, cardinality); case DOUBLE: - return new DoubleDictionary(dictionaryBuffer, cardinality); case STRING: - return new StringDictionary(dictionaryBuffer, cardinality, columnMetadata.getColumnMaxLength()); case BYTES: - return new BytesDictionary(dictionaryBuffer, cardinality, columnMetadata.getColumnMaxLength()); + PinotDataBuffer buf = segmentWriter.getIndexFor(columnMetadata.getColumnName(), StandardIndexes.dictionary()); + return DictionaryIndexType.read(buf, columnMetadata, new DictionaryIndexConfig(false, true)); Review Comment: define DictionaryIndexConfig.OFFHEAP constant for a bit more readability here ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java: ########## @@ -104,20 +104,28 @@ public void process() } // Update single-column indices, like inverted index, json index etc. - IndexCreatorProvider indexCreatorProvider = IndexingOverrides.getIndexCreatorProvider(); + + // ForwardIndexHandler may modify the segment metadata while rewriting forward index to create a dictionary + // This new metadata is needed to construct other indexes like RangeIndex. Review Comment: broken comment? ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/nullvalue/NullValueIndexType.java: ########## @@ -60,34 +66,56 @@ public IndexConfig getDefaultConfig() { } @Override - public IndexConfig getConfig(TableConfig tableConfig, Schema schema) { - throw new UnsupportedOperationException(); + public ColumnConfigDeserializer<IndexConfig> getDeserializer() { + return IndexConfigDeserializer.fromIndexes("null", getIndexConfigClass()) + .withFallbackAlternative( + IndexConfigDeserializer.ifIndexingConfig( + IndexConfigDeserializer.alwaysCall((TableConfig tableConfig, Schema schema) -> + tableConfig.getIndexingConfig().isNullHandlingEnabled() + ? IndexConfig.ENABLED + : IndexConfig.DISABLED)) Review Comment: unnecessary newline missed by linter ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentDictionaryCreator.java: ########## @@ -62,9 +73,20 @@ public class SegmentDictionaryCreator implements Closeable { public SegmentDictionaryCreator(FieldSpec fieldSpec, File indexDir, boolean useVarLengthDictionary) { _columnName = fieldSpec.getName(); _storedType = fieldSpec.getDataType().getStoredType(); - _dictionaryFile = new File(indexDir, _columnName + V1Constants.Dict.FILE_EXTENSION); + _dictionaryFile = new File(indexDir, _columnName + DictionaryIndexType.getFileExtension()); _useVarLengthDictionary = useVarLengthDictionary; } + @Override + public void add(@Nonnull Object value, int dictId) + throws IOException { + throw new UnsupportedOperationException("Dictionaries should not be build as a normal index"); Review Comment: s/build/built found same typo in a few other classes, you can grep and replace. ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/inverted/InvertedIndexType.java: ########## @@ -59,25 +79,49 @@ public IndexConfig getDefaultConfig() { } @Override - public IndexConfig getConfig(TableConfig tableConfig, Schema schema) { - throw new UnsupportedOperationException(); + public ColumnConfigDeserializer<IndexConfig> getDeserializer() { + ColumnConfigDeserializer<IndexConfig> fromInvertedCols = IndexConfigDeserializer.fromCollection( + tableConfig -> tableConfig.getIndexingConfig().getInvertedIndexColumns(), + (acum, column) -> acum.put(column, IndexConfig.ENABLED)); + return IndexConfigDeserializer.fromIndexes("inverted", getIndexConfigClass()) + .withExclusiveAlternative(IndexConfigDeserializer.ifIndexingConfig(fromInvertedCols)); + } + + public DictionaryBasedInvertedIndexCreator createIndexCreator(IndexCreationContext context) + throws IOException { + if (context.isOnHeap()) { + return new OnHeapBitmapInvertedIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(), + context.getCardinality()); + } else { + return new OffHeapBitmapInvertedIndexCreator(context.getIndexDir(), context.getFieldSpec(), + context.getCardinality(), context.getTotalDocs(), context.getTotalNumberOfEntries()); + } } @Override - public IndexCreator createIndexCreator(IndexCreationContext context, IndexConfig indexConfig) - throws Exception { - throw new UnsupportedOperationException(); + public DictionaryBasedInvertedIndexCreator createIndexCreator(IndexCreationContext context, + IndexConfig indexConfig) + throws IOException { + return createIndexCreator(context); } @Override - public IndexReaderFactory<IndexReader> getReaderFactory() { - throw new UnsupportedOperationException(); + public IndexReaderFactory<InvertedIndexReader> getReaderFactory() { + return ReaderFactory.INSTANCE; } @Override - public IndexHandler createIndexHandler(SegmentDirectory segmentDirectory, Map<String, FieldIndexConfigs> configsByCol, - @Nullable Schema schema, @Nullable TableConfig tableConfig) { - throw new UnsupportedOperationException(); + @Nullable + public InvertedIndexReader getIndexReader(ColumnIndexContainer indexContainer) { + InvertedIndexReader reader = super.getIndexReader(indexContainer); + if (reader != null) { + return reader; + } + ForwardIndexReader fwdReader = indexContainer.getIndex(StandardIndexes.forward()); + if (fwdReader != null && fwdReader instanceof SortedIndexReader) { Review Comment: just do the `instanceof` check? as it does null check too. and probably shorten here with ternary operator -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org