klsince commented on code in PR #10184: URL: https://github.com/apache/pinot/pull/10184#discussion_r1147922497
########## pinot-core/src/main/java/org/apache/pinot/core/operator/filter/BitmapBasedFilterOperator.java: ########## @@ -46,7 +46,7 @@ public class BitmapBasedFilterOperator extends BaseFilterOperator { private final int _numDocs; @SuppressWarnings("unchecked") - BitmapBasedFilterOperator(PredicateEvaluator predicateEvaluator, DataSource dataSource, int numDocs) { + public BitmapBasedFilterOperator(PredicateEvaluator predicateEvaluator, DataSource dataSource, int numDocs) { Review Comment: looks like this change is not necessary ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java: ########## @@ -133,183 +122,144 @@ public void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreatio return; } - Collection<FieldSpec> fieldSpecs = schema.getAllFieldSpecs(); - Set<String> invertedIndexColumns = new HashSet<>(); - for (String columnName : _config.getInvertedIndexCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create inverted index for column: %s because it is not in schema", columnName); - invertedIndexColumns.add(columnName); - } + Map<String, FieldIndexConfigs> indexConfigs = segmentCreationSpec.getIndexConfigsByColName(); - Set<String> bloomFilterColumns = new HashSet<>(); - for (String columnName : _config.getBloomFilterCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create bloom filter for column: %s because it is not in schema", columnName); - bloomFilterColumns.add(columnName); - } - - Set<String> rangeIndexColumns = new HashSet<>(); - for (String columnName : _config.getRangeIndexCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create range index for column: %s because it is not in schema", columnName); - rangeIndexColumns.add(columnName); - } - - Set<String> textIndexColumns = new HashSet<>(); - for (String columnName : _config.getTextIndexCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create text index for column: %s because it is not in schema", columnName); - textIndexColumns.add(columnName); - } - - Set<String> fstIndexColumns = new HashSet<>(); - for (String columnName : _config.getFSTIndexCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create FST index for column: %s because it is not in schema", columnName); - fstIndexColumns.add(columnName); - } - - Map<String, JsonIndexConfig> jsonIndexConfigs = _config.getJsonIndexConfigs(); - for (String columnName : jsonIndexConfigs.keySet()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create json index for column: %s because it is not in schema", columnName); - } - - Set<String> forwardIndexDisabledColumns = new HashSet<>(); - for (String columnName : _config.getForwardIndexDisabledColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), String.format("Invalid config. Can't disable " - + "forward index creation for a column: %s that does not exist in schema", columnName)); - forwardIndexDisabledColumns.add(columnName); - } - - Map<String, H3IndexConfig> h3IndexConfigs = _config.getH3IndexConfigs(); - for (String columnName : h3IndexConfigs.keySet()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create H3 index for column: %s because it is not in schema", columnName); - } + _creatorsByColAndIndex = Maps.newHashMapWithExpectedSize(indexConfigs.keySet().size()); - // Initialize creators for dictionary, forward index and inverted index - IndexingConfig indexingConfig = _config.getTableConfig().getIndexingConfig(); - int rangeIndexVersion = indexingConfig.getRangeIndexVersion(); - for (FieldSpec fieldSpec : fieldSpecs) { - // Ignore virtual columns + for (String columnName : indexConfigs.keySet()) { + FieldSpec fieldSpec = schema.getFieldSpecFor(columnName); + if (fieldSpec == null) { + Preconditions.checkState(schema.hasColumn(columnName), + "Cannot create inverted index for column: %s because it is not in schema", columnName); + } if (fieldSpec.isVirtualColumn()) { + LOGGER.warn("Ignoring index creation for virtual column " + columnName); continue; } - String columnName = fieldSpec.getName(); + FieldIndexConfigs originalConfig = indexConfigs.get(columnName); ColumnIndexCreationInfo columnIndexCreationInfo = indexCreationInfoMap.get(columnName); Preconditions.checkNotNull(columnIndexCreationInfo, "Missing index creation info for column: %s", columnName); boolean dictEnabledColumn = createDictionaryForColumn(columnIndexCreationInfo, segmentCreationSpec, fieldSpec); - Preconditions.checkState(dictEnabledColumn || !invertedIndexColumns.contains(columnName), + Preconditions.checkState(dictEnabledColumn || !originalConfig.getConfig(StandardIndexes.inverted()).isEnabled(), "Cannot create inverted index for raw index column: %s", columnName); - boolean forwardIndexDisabled = forwardIndexDisabledColumns.contains(columnName); + IndexType<ForwardIndexConfig, ?, ForwardIndexCreator> forwardIdx = StandardIndexes.forward(); + boolean forwardIndexDisabled = !originalConfig.getConfig(forwardIdx).isEnabled(); IndexCreationContext.Common context = IndexCreationContext.builder() .withIndexDir(_indexDir) - .withCardinality(columnIndexCreationInfo.getDistinctValueCount()) .withDictionary(dictEnabledColumn) .withFieldSpec(fieldSpec) .withTotalDocs(segmentIndexCreationInfo.getTotalDocs()) - .withMinValue((Comparable<?>) columnIndexCreationInfo.getMin()) - .withMaxValue((Comparable<?>) columnIndexCreationInfo.getMax()) - .withTotalNumberOfEntries(columnIndexCreationInfo.getTotalNumberOfEntries()) .withColumnIndexCreationInfo(columnIndexCreationInfo) - .sorted(columnIndexCreationInfo.isSorted()) + .withIsOptimizedDictionary(_config.isOptimizeDictionary() + || _config.isOptimizeDictionaryForMetrics() && fieldSpec.getFieldType() == FieldSpec.FieldType.METRIC) .onHeap(segmentCreationSpec.isOnHeap()) .withForwardIndexDisabled(forwardIndexDisabled) + .withFixedLength(columnIndexCreationInfo.isFixedLength()) Review Comment: move withFixedLength() inside build() as the other withMinValue... , as they are all from columnIndexCreationInfo. ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java: ########## @@ -133,183 +122,144 @@ public void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreatio return; } - Collection<FieldSpec> fieldSpecs = schema.getAllFieldSpecs(); - Set<String> invertedIndexColumns = new HashSet<>(); - for (String columnName : _config.getInvertedIndexCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create inverted index for column: %s because it is not in schema", columnName); - invertedIndexColumns.add(columnName); - } + Map<String, FieldIndexConfigs> indexConfigs = segmentCreationSpec.getIndexConfigsByColName(); - Set<String> bloomFilterColumns = new HashSet<>(); - for (String columnName : _config.getBloomFilterCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create bloom filter for column: %s because it is not in schema", columnName); - bloomFilterColumns.add(columnName); - } - - Set<String> rangeIndexColumns = new HashSet<>(); - for (String columnName : _config.getRangeIndexCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create range index for column: %s because it is not in schema", columnName); - rangeIndexColumns.add(columnName); - } - - Set<String> textIndexColumns = new HashSet<>(); - for (String columnName : _config.getTextIndexCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create text index for column: %s because it is not in schema", columnName); - textIndexColumns.add(columnName); - } - - Set<String> fstIndexColumns = new HashSet<>(); - for (String columnName : _config.getFSTIndexCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create FST index for column: %s because it is not in schema", columnName); - fstIndexColumns.add(columnName); - } - - Map<String, JsonIndexConfig> jsonIndexConfigs = _config.getJsonIndexConfigs(); - for (String columnName : jsonIndexConfigs.keySet()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create json index for column: %s because it is not in schema", columnName); - } - - Set<String> forwardIndexDisabledColumns = new HashSet<>(); - for (String columnName : _config.getForwardIndexDisabledColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), String.format("Invalid config. Can't disable " - + "forward index creation for a column: %s that does not exist in schema", columnName)); - forwardIndexDisabledColumns.add(columnName); - } - - Map<String, H3IndexConfig> h3IndexConfigs = _config.getH3IndexConfigs(); - for (String columnName : h3IndexConfigs.keySet()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create H3 index for column: %s because it is not in schema", columnName); - } + _creatorsByColAndIndex = Maps.newHashMapWithExpectedSize(indexConfigs.keySet().size()); - // Initialize creators for dictionary, forward index and inverted index - IndexingConfig indexingConfig = _config.getTableConfig().getIndexingConfig(); - int rangeIndexVersion = indexingConfig.getRangeIndexVersion(); - for (FieldSpec fieldSpec : fieldSpecs) { - // Ignore virtual columns + for (String columnName : indexConfigs.keySet()) { + FieldSpec fieldSpec = schema.getFieldSpecFor(columnName); + if (fieldSpec == null) { + Preconditions.checkState(schema.hasColumn(columnName), + "Cannot create inverted index for column: %s because it is not in schema", columnName); + } if (fieldSpec.isVirtualColumn()) { + LOGGER.warn("Ignoring index creation for virtual column " + columnName); continue; } - String columnName = fieldSpec.getName(); + FieldIndexConfigs originalConfig = indexConfigs.get(columnName); ColumnIndexCreationInfo columnIndexCreationInfo = indexCreationInfoMap.get(columnName); Preconditions.checkNotNull(columnIndexCreationInfo, "Missing index creation info for column: %s", columnName); boolean dictEnabledColumn = createDictionaryForColumn(columnIndexCreationInfo, segmentCreationSpec, fieldSpec); - Preconditions.checkState(dictEnabledColumn || !invertedIndexColumns.contains(columnName), + Preconditions.checkState(dictEnabledColumn || !originalConfig.getConfig(StandardIndexes.inverted()).isEnabled(), "Cannot create inverted index for raw index column: %s", columnName); - boolean forwardIndexDisabled = forwardIndexDisabledColumns.contains(columnName); + IndexType<ForwardIndexConfig, ?, ForwardIndexCreator> forwardIdx = StandardIndexes.forward(); + boolean forwardIndexDisabled = !originalConfig.getConfig(forwardIdx).isEnabled(); IndexCreationContext.Common context = IndexCreationContext.builder() .withIndexDir(_indexDir) - .withCardinality(columnIndexCreationInfo.getDistinctValueCount()) .withDictionary(dictEnabledColumn) .withFieldSpec(fieldSpec) .withTotalDocs(segmentIndexCreationInfo.getTotalDocs()) - .withMinValue((Comparable<?>) columnIndexCreationInfo.getMin()) - .withMaxValue((Comparable<?>) columnIndexCreationInfo.getMax()) - .withTotalNumberOfEntries(columnIndexCreationInfo.getTotalNumberOfEntries()) .withColumnIndexCreationInfo(columnIndexCreationInfo) - .sorted(columnIndexCreationInfo.isSorted()) + .withIsOptimizedDictionary(_config.isOptimizeDictionary() + || _config.isOptimizeDictionaryForMetrics() && fieldSpec.getFieldType() == FieldSpec.FieldType.METRIC) .onHeap(segmentCreationSpec.isOnHeap()) .withForwardIndexDisabled(forwardIndexDisabled) + .withFixedLength(columnIndexCreationInfo.isFixedLength()) + .withTextCommitOnClose(true) .build(); - // Initialize forward index creator - ChunkCompressionType chunkCompressionType = - dictEnabledColumn ? null : getColumnCompressionType(segmentCreationSpec, fieldSpec); - // Sorted columns treat the 'forwardIndexDisabled' flag as a no-op - _forwardIndexCreatorMap.put(columnName, (forwardIndexDisabled && !columnIndexCreationInfo.isSorted()) - ? null : _indexCreatorProvider.newForwardIndexCreator( - context.forForwardIndex(chunkCompressionType, segmentCreationSpec.getColumnProperties()))); - - // Initialize inverted index creator; skip creating inverted index if sorted - if (invertedIndexColumns.contains(columnName) && !columnIndexCreationInfo.isSorted()) { - _invertedIndexCreatorMap.put(columnName, - _indexCreatorProvider.newInvertedIndexCreator(context.forInvertedIndex())); - } + + FieldIndexConfigs config = adaptConfig(columnName, originalConfig, columnIndexCreationInfo, segmentCreationSpec); + if (dictEnabledColumn) { // Create dictionary-encoded index // Initialize dictionary creator // TODO: Dictionary creator holds all unique values on heap. Consider keeping dictionary instead of creator // which uses off-heap memory. - SegmentDictionaryCreator dictionaryCreator = - new SegmentDictionaryCreator(fieldSpec, _indexDir, columnIndexCreationInfo.isUseVarLengthDictionary()); - _dictionaryCreatorMap.put(columnName, dictionaryCreator); - // Create dictionary + + DictionaryIndexType dictIdx = DictionaryIndexType.INSTANCE; + // Index conf should be present if dictEnabledColumn is true. In case it doesn't, getConfig will throw an + // exception + DictionaryIndexConfig dictConfig = config.getConfig(dictIdx); + if (!dictConfig.isEnabled()) { + throw new IllegalArgumentException("Dictionary index should be enabled"); + } + SegmentDictionaryCreator creator = dictIdx.createIndexCreator(context, dictConfig); + try { - dictionaryCreator.build(columnIndexCreationInfo.getSortedUniqueElementsArray()); + creator.build(context.getSortedUniqueElementsArray()); } catch (Exception e) { LOGGER.error("Error building dictionary for field: {}, cardinality: {}, number of bytes per entry: {}", - fieldSpec.getName(), columnIndexCreationInfo.getDistinctValueCount(), - dictionaryCreator.getNumBytesPerEntry()); + context.getFieldSpec().getName(), context.getCardinality(), creator.getNumBytesPerEntry()); throw e; } - } - if (bloomFilterColumns.contains(columnName)) { - if (indexingConfig.getBloomFilterConfigs() != null - && indexingConfig.getBloomFilterConfigs().containsKey(columnName)) { - _bloomFilterCreatorMap.put(columnName, _indexCreatorProvider.newBloomFilterCreator( - context.forBloomFilter(indexingConfig.getBloomFilterConfigs().get(columnName)))); - } else { - _bloomFilterCreatorMap.put(columnName, _indexCreatorProvider.newBloomFilterCreator( - context.forBloomFilter(new BloomFilterConfig(BloomFilterConfig.DEFAULT_FPP, 0, false)))); - } + _dictionaryCreatorMap.put(columnName, creator); } - if (!columnIndexCreationInfo.isSorted() && rangeIndexColumns.contains(columnName)) { - _rangeIndexFilterCreatorMap.put(columnName, - _indexCreatorProvider.newRangeIndexCreator(context.forRangeIndex(rangeIndexVersion))); + Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex = + Maps.newHashMapWithExpectedSize(IndexService.getInstance().getAllIndexes().size()); + for (IndexType<?, ?, ?> index : IndexService.getInstance().getAllIndexes()) { + if (skipIndexType(index)) { Review Comment: extend skipIndexType() to include checks: config.isEnabled and index.shouldBeCreated too? nit: call it tryCreaetIndexCreator for a bit more clarity ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java: ########## @@ -363,7 +314,7 @@ private boolean createDictionaryForColumn(ColumnIndexCreationInfo info, SegmentG private boolean shouldCreateDictionaryWithinThreshold(ColumnIndexCreationInfo info, SegmentGeneratorConfig config, FieldSpec spec) { - long dictionarySize = info.getDistinctValueCount() * spec.getDataType().size(); + long dictionarySize = info.getDistinctValueCount() * (long) spec.getDataType().size(); Review Comment: nti: open a small PR to land this one-liner fix quickly? ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java: ########## @@ -133,183 +122,144 @@ public void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreatio return; } - Collection<FieldSpec> fieldSpecs = schema.getAllFieldSpecs(); - Set<String> invertedIndexColumns = new HashSet<>(); - for (String columnName : _config.getInvertedIndexCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create inverted index for column: %s because it is not in schema", columnName); - invertedIndexColumns.add(columnName); - } + Map<String, FieldIndexConfigs> indexConfigs = segmentCreationSpec.getIndexConfigsByColName(); - Set<String> bloomFilterColumns = new HashSet<>(); - for (String columnName : _config.getBloomFilterCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create bloom filter for column: %s because it is not in schema", columnName); - bloomFilterColumns.add(columnName); - } - - Set<String> rangeIndexColumns = new HashSet<>(); - for (String columnName : _config.getRangeIndexCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create range index for column: %s because it is not in schema", columnName); - rangeIndexColumns.add(columnName); - } - - Set<String> textIndexColumns = new HashSet<>(); - for (String columnName : _config.getTextIndexCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create text index for column: %s because it is not in schema", columnName); - textIndexColumns.add(columnName); - } - - Set<String> fstIndexColumns = new HashSet<>(); - for (String columnName : _config.getFSTIndexCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create FST index for column: %s because it is not in schema", columnName); - fstIndexColumns.add(columnName); - } - - Map<String, JsonIndexConfig> jsonIndexConfigs = _config.getJsonIndexConfigs(); - for (String columnName : jsonIndexConfigs.keySet()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create json index for column: %s because it is not in schema", columnName); - } - - Set<String> forwardIndexDisabledColumns = new HashSet<>(); - for (String columnName : _config.getForwardIndexDisabledColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), String.format("Invalid config. Can't disable " - + "forward index creation for a column: %s that does not exist in schema", columnName)); - forwardIndexDisabledColumns.add(columnName); - } - - Map<String, H3IndexConfig> h3IndexConfigs = _config.getH3IndexConfigs(); - for (String columnName : h3IndexConfigs.keySet()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create H3 index for column: %s because it is not in schema", columnName); - } + _creatorsByColAndIndex = Maps.newHashMapWithExpectedSize(indexConfigs.keySet().size()); - // Initialize creators for dictionary, forward index and inverted index - IndexingConfig indexingConfig = _config.getTableConfig().getIndexingConfig(); - int rangeIndexVersion = indexingConfig.getRangeIndexVersion(); - for (FieldSpec fieldSpec : fieldSpecs) { - // Ignore virtual columns + for (String columnName : indexConfigs.keySet()) { + FieldSpec fieldSpec = schema.getFieldSpecFor(columnName); + if (fieldSpec == null) { + Preconditions.checkState(schema.hasColumn(columnName), + "Cannot create inverted index for column: %s because it is not in schema", columnName); + } if (fieldSpec.isVirtualColumn()) { + LOGGER.warn("Ignoring index creation for virtual column " + columnName); continue; } - String columnName = fieldSpec.getName(); + FieldIndexConfigs originalConfig = indexConfigs.get(columnName); ColumnIndexCreationInfo columnIndexCreationInfo = indexCreationInfoMap.get(columnName); Preconditions.checkNotNull(columnIndexCreationInfo, "Missing index creation info for column: %s", columnName); boolean dictEnabledColumn = createDictionaryForColumn(columnIndexCreationInfo, segmentCreationSpec, fieldSpec); - Preconditions.checkState(dictEnabledColumn || !invertedIndexColumns.contains(columnName), + Preconditions.checkState(dictEnabledColumn || !originalConfig.getConfig(StandardIndexes.inverted()).isEnabled(), "Cannot create inverted index for raw index column: %s", columnName); - boolean forwardIndexDisabled = forwardIndexDisabledColumns.contains(columnName); + IndexType<ForwardIndexConfig, ?, ForwardIndexCreator> forwardIdx = StandardIndexes.forward(); + boolean forwardIndexDisabled = !originalConfig.getConfig(forwardIdx).isEnabled(); IndexCreationContext.Common context = IndexCreationContext.builder() .withIndexDir(_indexDir) - .withCardinality(columnIndexCreationInfo.getDistinctValueCount()) .withDictionary(dictEnabledColumn) .withFieldSpec(fieldSpec) .withTotalDocs(segmentIndexCreationInfo.getTotalDocs()) - .withMinValue((Comparable<?>) columnIndexCreationInfo.getMin()) - .withMaxValue((Comparable<?>) columnIndexCreationInfo.getMax()) - .withTotalNumberOfEntries(columnIndexCreationInfo.getTotalNumberOfEntries()) .withColumnIndexCreationInfo(columnIndexCreationInfo) - .sorted(columnIndexCreationInfo.isSorted()) + .withIsOptimizedDictionary(_config.isOptimizeDictionary() + || _config.isOptimizeDictionaryForMetrics() && fieldSpec.getFieldType() == FieldSpec.FieldType.METRIC) .onHeap(segmentCreationSpec.isOnHeap()) .withForwardIndexDisabled(forwardIndexDisabled) + .withFixedLength(columnIndexCreationInfo.isFixedLength()) + .withTextCommitOnClose(true) .build(); - // Initialize forward index creator - ChunkCompressionType chunkCompressionType = - dictEnabledColumn ? null : getColumnCompressionType(segmentCreationSpec, fieldSpec); - // Sorted columns treat the 'forwardIndexDisabled' flag as a no-op - _forwardIndexCreatorMap.put(columnName, (forwardIndexDisabled && !columnIndexCreationInfo.isSorted()) - ? null : _indexCreatorProvider.newForwardIndexCreator( - context.forForwardIndex(chunkCompressionType, segmentCreationSpec.getColumnProperties()))); - - // Initialize inverted index creator; skip creating inverted index if sorted - if (invertedIndexColumns.contains(columnName) && !columnIndexCreationInfo.isSorted()) { - _invertedIndexCreatorMap.put(columnName, - _indexCreatorProvider.newInvertedIndexCreator(context.forInvertedIndex())); - } + + FieldIndexConfigs config = adaptConfig(columnName, originalConfig, columnIndexCreationInfo, segmentCreationSpec); + if (dictEnabledColumn) { // Create dictionary-encoded index // Initialize dictionary creator // TODO: Dictionary creator holds all unique values on heap. Consider keeping dictionary instead of creator // which uses off-heap memory. - SegmentDictionaryCreator dictionaryCreator = - new SegmentDictionaryCreator(fieldSpec, _indexDir, columnIndexCreationInfo.isUseVarLengthDictionary()); - _dictionaryCreatorMap.put(columnName, dictionaryCreator); - // Create dictionary + + DictionaryIndexType dictIdx = DictionaryIndexType.INSTANCE; + // Index conf should be present if dictEnabledColumn is true. In case it doesn't, getConfig will throw an + // exception + DictionaryIndexConfig dictConfig = config.getConfig(dictIdx); + if (!dictConfig.isEnabled()) { + throw new IllegalArgumentException("Dictionary index should be enabled"); + } + SegmentDictionaryCreator creator = dictIdx.createIndexCreator(context, dictConfig); Review Comment: is there need to call dictIdx.shouldBeCreated()? I see only dictionary creator overrides this method but it's only called in tryCreateCreator in the for-loop below, which actually skips null/dict creator. ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java: ########## @@ -133,183 +122,144 @@ public void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreatio return; } - Collection<FieldSpec> fieldSpecs = schema.getAllFieldSpecs(); - Set<String> invertedIndexColumns = new HashSet<>(); - for (String columnName : _config.getInvertedIndexCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create inverted index for column: %s because it is not in schema", columnName); - invertedIndexColumns.add(columnName); - } + Map<String, FieldIndexConfigs> indexConfigs = segmentCreationSpec.getIndexConfigsByColName(); - Set<String> bloomFilterColumns = new HashSet<>(); - for (String columnName : _config.getBloomFilterCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create bloom filter for column: %s because it is not in schema", columnName); - bloomFilterColumns.add(columnName); - } - - Set<String> rangeIndexColumns = new HashSet<>(); - for (String columnName : _config.getRangeIndexCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create range index for column: %s because it is not in schema", columnName); - rangeIndexColumns.add(columnName); - } - - Set<String> textIndexColumns = new HashSet<>(); - for (String columnName : _config.getTextIndexCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create text index for column: %s because it is not in schema", columnName); - textIndexColumns.add(columnName); - } - - Set<String> fstIndexColumns = new HashSet<>(); - for (String columnName : _config.getFSTIndexCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create FST index for column: %s because it is not in schema", columnName); - fstIndexColumns.add(columnName); - } - - Map<String, JsonIndexConfig> jsonIndexConfigs = _config.getJsonIndexConfigs(); - for (String columnName : jsonIndexConfigs.keySet()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create json index for column: %s because it is not in schema", columnName); - } - - Set<String> forwardIndexDisabledColumns = new HashSet<>(); - for (String columnName : _config.getForwardIndexDisabledColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), String.format("Invalid config. Can't disable " - + "forward index creation for a column: %s that does not exist in schema", columnName)); - forwardIndexDisabledColumns.add(columnName); - } - - Map<String, H3IndexConfig> h3IndexConfigs = _config.getH3IndexConfigs(); - for (String columnName : h3IndexConfigs.keySet()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create H3 index for column: %s because it is not in schema", columnName); - } + _creatorsByColAndIndex = Maps.newHashMapWithExpectedSize(indexConfigs.keySet().size()); - // Initialize creators for dictionary, forward index and inverted index - IndexingConfig indexingConfig = _config.getTableConfig().getIndexingConfig(); - int rangeIndexVersion = indexingConfig.getRangeIndexVersion(); - for (FieldSpec fieldSpec : fieldSpecs) { - // Ignore virtual columns + for (String columnName : indexConfigs.keySet()) { + FieldSpec fieldSpec = schema.getFieldSpecFor(columnName); + if (fieldSpec == null) { + Preconditions.checkState(schema.hasColumn(columnName), + "Cannot create inverted index for column: %s because it is not in schema", columnName); + } if (fieldSpec.isVirtualColumn()) { + LOGGER.warn("Ignoring index creation for virtual column " + columnName); continue; } - String columnName = fieldSpec.getName(); + FieldIndexConfigs originalConfig = indexConfigs.get(columnName); ColumnIndexCreationInfo columnIndexCreationInfo = indexCreationInfoMap.get(columnName); Preconditions.checkNotNull(columnIndexCreationInfo, "Missing index creation info for column: %s", columnName); boolean dictEnabledColumn = createDictionaryForColumn(columnIndexCreationInfo, segmentCreationSpec, fieldSpec); - Preconditions.checkState(dictEnabledColumn || !invertedIndexColumns.contains(columnName), + Preconditions.checkState(dictEnabledColumn || !originalConfig.getConfig(StandardIndexes.inverted()).isEnabled(), "Cannot create inverted index for raw index column: %s", columnName); - boolean forwardIndexDisabled = forwardIndexDisabledColumns.contains(columnName); + IndexType<ForwardIndexConfig, ?, ForwardIndexCreator> forwardIdx = StandardIndexes.forward(); + boolean forwardIndexDisabled = !originalConfig.getConfig(forwardIdx).isEnabled(); IndexCreationContext.Common context = IndexCreationContext.builder() .withIndexDir(_indexDir) - .withCardinality(columnIndexCreationInfo.getDistinctValueCount()) .withDictionary(dictEnabledColumn) .withFieldSpec(fieldSpec) .withTotalDocs(segmentIndexCreationInfo.getTotalDocs()) - .withMinValue((Comparable<?>) columnIndexCreationInfo.getMin()) - .withMaxValue((Comparable<?>) columnIndexCreationInfo.getMax()) - .withTotalNumberOfEntries(columnIndexCreationInfo.getTotalNumberOfEntries()) .withColumnIndexCreationInfo(columnIndexCreationInfo) - .sorted(columnIndexCreationInfo.isSorted()) + .withIsOptimizedDictionary(_config.isOptimizeDictionary() + || _config.isOptimizeDictionaryForMetrics() && fieldSpec.getFieldType() == FieldSpec.FieldType.METRIC) .onHeap(segmentCreationSpec.isOnHeap()) .withForwardIndexDisabled(forwardIndexDisabled) + .withFixedLength(columnIndexCreationInfo.isFixedLength()) + .withTextCommitOnClose(true) .build(); - // Initialize forward index creator - ChunkCompressionType chunkCompressionType = - dictEnabledColumn ? null : getColumnCompressionType(segmentCreationSpec, fieldSpec); - // Sorted columns treat the 'forwardIndexDisabled' flag as a no-op - _forwardIndexCreatorMap.put(columnName, (forwardIndexDisabled && !columnIndexCreationInfo.isSorted()) - ? null : _indexCreatorProvider.newForwardIndexCreator( - context.forForwardIndex(chunkCompressionType, segmentCreationSpec.getColumnProperties()))); - - // Initialize inverted index creator; skip creating inverted index if sorted - if (invertedIndexColumns.contains(columnName) && !columnIndexCreationInfo.isSorted()) { - _invertedIndexCreatorMap.put(columnName, - _indexCreatorProvider.newInvertedIndexCreator(context.forInvertedIndex())); - } + + FieldIndexConfigs config = adaptConfig(columnName, originalConfig, columnIndexCreationInfo, segmentCreationSpec); + if (dictEnabledColumn) { // Create dictionary-encoded index // Initialize dictionary creator // TODO: Dictionary creator holds all unique values on heap. Consider keeping dictionary instead of creator // which uses off-heap memory. - SegmentDictionaryCreator dictionaryCreator = - new SegmentDictionaryCreator(fieldSpec, _indexDir, columnIndexCreationInfo.isUseVarLengthDictionary()); - _dictionaryCreatorMap.put(columnName, dictionaryCreator); - // Create dictionary + + DictionaryIndexType dictIdx = DictionaryIndexType.INSTANCE; + // Index conf should be present if dictEnabledColumn is true. In case it doesn't, getConfig will throw an + // exception + DictionaryIndexConfig dictConfig = config.getConfig(dictIdx); + if (!dictConfig.isEnabled()) { + throw new IllegalArgumentException("Dictionary index should be enabled"); + } + SegmentDictionaryCreator creator = dictIdx.createIndexCreator(context, dictConfig); + try { - dictionaryCreator.build(columnIndexCreationInfo.getSortedUniqueElementsArray()); + creator.build(context.getSortedUniqueElementsArray()); } catch (Exception e) { LOGGER.error("Error building dictionary for field: {}, cardinality: {}, number of bytes per entry: {}", - fieldSpec.getName(), columnIndexCreationInfo.getDistinctValueCount(), - dictionaryCreator.getNumBytesPerEntry()); + context.getFieldSpec().getName(), context.getCardinality(), creator.getNumBytesPerEntry()); throw e; } - } - if (bloomFilterColumns.contains(columnName)) { - if (indexingConfig.getBloomFilterConfigs() != null - && indexingConfig.getBloomFilterConfigs().containsKey(columnName)) { - _bloomFilterCreatorMap.put(columnName, _indexCreatorProvider.newBloomFilterCreator( - context.forBloomFilter(indexingConfig.getBloomFilterConfigs().get(columnName)))); - } else { - _bloomFilterCreatorMap.put(columnName, _indexCreatorProvider.newBloomFilterCreator( - context.forBloomFilter(new BloomFilterConfig(BloomFilterConfig.DEFAULT_FPP, 0, false)))); - } + _dictionaryCreatorMap.put(columnName, creator); } - if (!columnIndexCreationInfo.isSorted() && rangeIndexColumns.contains(columnName)) { - _rangeIndexFilterCreatorMap.put(columnName, - _indexCreatorProvider.newRangeIndexCreator(context.forRangeIndex(rangeIndexVersion))); + Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex = + Maps.newHashMapWithExpectedSize(IndexService.getInstance().getAllIndexes().size()); + for (IndexType<?, ?, ?> index : IndexService.getInstance().getAllIndexes()) { + if (skipIndexType(index)) { + continue; + } + if (config.getConfig(index).isEnabled()) { + tryCreateCreator(creatorsByIndex, index, context, config); + } } - - if (textIndexColumns.contains(columnName)) { - FSTType fstType = FSTType.LUCENE; - List<FieldConfig> fieldConfigList = _config.getTableConfig().getFieldConfigList(); - if (fieldConfigList != null) { - for (FieldConfig fieldConfig : fieldConfigList) { - if (fieldConfig.getName().equals(columnName)) { - Map<String, String> properties = fieldConfig.getProperties(); - if (TextIndexUtils.isFstTypeNative(properties)) { - fstType = FSTType.NATIVE; - } - } - } + // TODO: Remove this when values stored as ForwardIndex stop depending on TextIndex config + IndexCreator oldFwdCreator = creatorsByIndex.get(forwardIdx); + if (oldFwdCreator instanceof AbstractForwardIndexCreator) { // this implies that oldFwdCreator != null + Object fakeForwardValue = calculateAlternativeValue(dictEnabledColumn, config, fieldSpec); + if (fakeForwardValue != null) { + AbstractForwardIndexCreator castedOldFwdCreator = (AbstractForwardIndexCreator) oldFwdCreator; + SameValueForwardIndexCreator fakeValueFwdCreator = + new SameValueForwardIndexCreator(fakeForwardValue, castedOldFwdCreator); + creatorsByIndex.put(forwardIdx, fakeValueFwdCreator); } - _textIndexCreatorMap.put(columnName, - _indexCreatorProvider.newTextIndexCreator(context.forTextIndex(fstType, true, - TextIndexUtils.extractStopWordsInclude(columnName, _columnProperties), - TextIndexUtils.extractStopWordsExclude(columnName, _columnProperties)))); } + _creatorsByColAndIndex.put(columnName, creatorsByIndex); + } - if (fstIndexColumns.contains(columnName)) { - _fstIndexCreatorMap.put(columnName, _indexCreatorProvider.newTextIndexCreator( - context.forFSTIndex(_config.getFSTIndexType(), - (String[]) columnIndexCreationInfo.getSortedUniqueElementsArray()))); + // Although NullValueVector is implemented as an index, it needs to be treated in a different way than other indexes + _nullHandlingEnabled = _config.isNullHandlingEnabled(); + if (_nullHandlingEnabled) { + for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) { + // Initialize Null value vector map + String columnName = fieldSpec.getName(); + _nullValueVectorCreatorMap.put(columnName, new NullValueVectorCreator(_indexDir, columnName)); } + } + } - JsonIndexConfig jsonIndexConfig = jsonIndexConfigs.get(columnName); - if (jsonIndexConfig != null) { - _jsonIndexCreatorMap.put(columnName, - _indexCreatorProvider.newJsonIndexCreator(context.forJsonIndex(jsonIndexConfig))); - } + private FieldIndexConfigs adaptConfig(String columnName, FieldIndexConfigs config, + ColumnIndexCreationInfo columnIndexCreationInfo, SegmentGeneratorConfig segmentCreationSpec) { + FieldIndexConfigs.Builder builder = new FieldIndexConfigs.Builder(config); + // Sorted columns treat the 'forwardIndexDisabled' flag as a no-op + if (config.getConfig(StandardIndexes.forward()).isEnabled() && columnIndexCreationInfo.isSorted()) { + builder.add(StandardIndexes.forward(), new ForwardIndexConfig.Builder() + .withLegacyProperties(segmentCreationSpec.getColumnProperties(), columnName) + .build()); + } + // Initialize inverted index creator; skip creating inverted index if sorted + if (config.getConfig(StandardIndexes.inverted()).isEnabled() && columnIndexCreationInfo.isSorted()) { + builder.undeclare(StandardIndexes.inverted()); + } + return builder.build(); + } - H3IndexConfig h3IndexConfig = h3IndexConfigs.get(columnName); - if (h3IndexConfig != null) { - _h3IndexCreatorMap.put(columnName, - _indexCreatorProvider.newGeoSpatialIndexCreator(context.forGeospatialIndex(h3IndexConfig))); - } + /** + * Returns true if the given index type has their own construction lifecycle and therefore should not be instantiated + * in the general index loop and shouldn't be notified of each new column. + */ + private boolean skipIndexType(IndexType<?, ?, ?> indexType) { + return indexType == StandardIndexes.nullValueVector() || indexType == StandardIndexes.dictionary(); + } - _nullHandlingEnabled = _config.isNullHandlingEnabled(); - if (_nullHandlingEnabled) { - // Initialize Null value vector map - _nullValueVectorCreatorMap.put(columnName, new NullValueVectorCreator(_indexDir, columnName)); - } + /** + * Creates the {@link IndexCreator} in a type safe way. + * + * This code needs to be in a specific method instead of inlined in the main loop in order to be able to use the + * limited generic capabilities of Java. + */ + private <C extends IndexConfig> void tryCreateCreator(Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex, + IndexType<C, ?, ?> index, IndexCreationContext.Common context, FieldIndexConfigs fieldIndexConfigs) + throws Exception { + C declaration = fieldIndexConfigs.getConfig(index); + if (declaration.isEnabled() && index.shouldBeCreated(context, fieldIndexConfigs)) { Review Comment: s/declaration/config? I see IndexDeclaration was mentioned as an alternative in the PEP but looks like this PR is not using that way? ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java: ########## @@ -379,340 +330,94 @@ private boolean shouldCreateDictionaryWithinThreshold(ColumnIndexCreationInfo in } /** - * Helper method that returns compression type to use based on segment creation spec and field type. - * <ul> - * <li> Returns compression type from segment creation spec, if specified there.</li> - * <li> Else, returns PASS_THROUGH for metrics, and SNAPPY for dimensions. This is because metrics are likely - * to be spread in different chunks after applying predicates. Same could be true for dimensions, but in that - * case, clients are expected to explicitly specify the appropriate compression type in the spec. </li> - * </ul> - * @param segmentCreationSpec Segment creation spec - * @param fieldSpec Field spec for the column - * @return Compression type to use + * @deprecated use {@link ForwardIndexType#getDefaultCompressionType(FieldType)} instead */ - private ChunkCompressionType getColumnCompressionType(SegmentGeneratorConfig segmentCreationSpec, - FieldSpec fieldSpec) { - ChunkCompressionType compressionType = segmentCreationSpec.getRawIndexCompressionType().get(fieldSpec.getName()); - if (compressionType == null) { - compressionType = getDefaultCompressionType(fieldSpec.getFieldType()); - } - - return compressionType; - } - + @Deprecated public static ChunkCompressionType getDefaultCompressionType(FieldType fieldType) { - if (fieldType == FieldSpec.FieldType.METRIC) { - return ChunkCompressionType.PASS_THROUGH; - } else { - return ChunkCompressionType.LZ4; - } + return ForwardIndexType.getDefaultCompressionType(fieldType); } @Override public void indexRow(GenericRow row) throws IOException { - for (Map.Entry<String, ForwardIndexCreator> entry : _forwardIndexCreatorMap.entrySet()) { - String columnName = entry.getKey(); - ForwardIndexCreator forwardIndexCreator = entry.getValue(); - - Object columnValueToIndex = row.getValue(columnName); - if (columnValueToIndex == null) { - throw new RuntimeException("Null value for column:" + columnName); - } + for (Map.Entry<String, Map<IndexType<?, ?, ?>, IndexCreator>> byColEntry : _creatorsByColAndIndex.entrySet()) { + String columnName = byColEntry.getKey(); + Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex = byColEntry.getValue(); FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName); - - //get dictionaryCreator, will be null if column is not dictionaryEncoded SegmentDictionaryCreator dictionaryCreator = _dictionaryCreatorMap.get(columnName); - // bloom filter - BloomFilterCreator bloomFilterCreator = _bloomFilterCreatorMap.get(columnName); - if (bloomFilterCreator != null) { - if (fieldSpec.isSingleValueField()) { - if (fieldSpec.getDataType() == DataType.BYTES) { - bloomFilterCreator.add(BytesUtils.toHexString((byte[]) columnValueToIndex)); - } else { - bloomFilterCreator.add(columnValueToIndex.toString()); - } - } else { - Object[] values = (Object[]) columnValueToIndex; - if (fieldSpec.getDataType() == DataType.BYTES) { - for (Object value : values) { - bloomFilterCreator.add(BytesUtils.toHexString((byte[]) value)); - } - } else { - for (Object value : values) { - bloomFilterCreator.add(value.toString()); - } - } - } - } - - // range index - CombinedInvertedIndexCreator combinedInvertedIndexCreator = _rangeIndexFilterCreatorMap.get(columnName); - if (combinedInvertedIndexCreator != null) { - if (dictionaryCreator != null) { - if (fieldSpec.isSingleValueField()) { - combinedInvertedIndexCreator.add(dictionaryCreator.indexOfSV(columnValueToIndex)); - } else { - int[] dictIds = dictionaryCreator.indexOfMV(columnValueToIndex); - combinedInvertedIndexCreator.add(dictIds, dictIds.length); - } - } else { - if (fieldSpec.isSingleValueField()) { - switch (fieldSpec.getDataType()) { - case INT: - combinedInvertedIndexCreator.add((Integer) columnValueToIndex); - break; - case LONG: - combinedInvertedIndexCreator.add((Long) columnValueToIndex); - break; - case FLOAT: - combinedInvertedIndexCreator.add((Float) columnValueToIndex); - break; - case DOUBLE: - combinedInvertedIndexCreator.add((Double) columnValueToIndex); - break; - default: - throw new RuntimeException("Unsupported data type " + fieldSpec.getDataType() + " for range index"); - } - } else { - Object[] values = (Object[]) columnValueToIndex; - switch (fieldSpec.getDataType()) { - case INT: - int[] intValues = new int[values.length]; - for (int i = 0; i < values.length; i++) { - intValues[i] = (Integer) values[i]; - } - combinedInvertedIndexCreator.add(intValues, values.length); - break; - case LONG: - long[] longValues = new long[values.length]; - for (int i = 0; i < values.length; i++) { - longValues[i] = (Long) values[i]; - } - combinedInvertedIndexCreator.add(longValues, values.length); - break; - case FLOAT: - float[] floatValues = new float[values.length]; - for (int i = 0; i < values.length; i++) { - floatValues[i] = (Float) values[i]; - } - combinedInvertedIndexCreator.add(floatValues, values.length); - break; - case DOUBLE: - double[] doubleValues = new double[values.length]; - for (int i = 0; i < values.length; i++) { - doubleValues[i] = (Double) values[i]; - } - combinedInvertedIndexCreator.add(doubleValues, values.length); - break; - default: - throw new RuntimeException("Unsupported data type " + fieldSpec.getDataType() + " for range index"); - } - } - } - } - - // text-index - TextIndexCreator textIndexCreator = _textIndexCreatorMap.get(columnName); - if (textIndexCreator != null) { - if (fieldSpec.isSingleValueField()) { - textIndexCreator.add((String) columnValueToIndex); - } else { - Object[] values = (Object[]) columnValueToIndex; - int length = values.length; - if (values instanceof String[]) { - textIndexCreator.add((String[]) values, length); - } else { - String[] strings = new String[length]; - for (int i = 0; i < length; i++) { - strings[i] = (String) values[i]; - } - textIndexCreator.add(strings, length); - columnValueToIndex = strings; - } - } + Object columnValueToIndex = row.getValue(columnName); Review Comment: can do this null check earlier to be more consistent with old logic, (actually saving some map lookups) ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java: ########## @@ -133,183 +122,144 @@ public void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreatio return; } - Collection<FieldSpec> fieldSpecs = schema.getAllFieldSpecs(); - Set<String> invertedIndexColumns = new HashSet<>(); - for (String columnName : _config.getInvertedIndexCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create inverted index for column: %s because it is not in schema", columnName); - invertedIndexColumns.add(columnName); - } + Map<String, FieldIndexConfigs> indexConfigs = segmentCreationSpec.getIndexConfigsByColName(); - Set<String> bloomFilterColumns = new HashSet<>(); - for (String columnName : _config.getBloomFilterCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create bloom filter for column: %s because it is not in schema", columnName); - bloomFilterColumns.add(columnName); - } - - Set<String> rangeIndexColumns = new HashSet<>(); - for (String columnName : _config.getRangeIndexCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create range index for column: %s because it is not in schema", columnName); - rangeIndexColumns.add(columnName); - } - - Set<String> textIndexColumns = new HashSet<>(); - for (String columnName : _config.getTextIndexCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create text index for column: %s because it is not in schema", columnName); - textIndexColumns.add(columnName); - } - - Set<String> fstIndexColumns = new HashSet<>(); - for (String columnName : _config.getFSTIndexCreationColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create FST index for column: %s because it is not in schema", columnName); - fstIndexColumns.add(columnName); - } - - Map<String, JsonIndexConfig> jsonIndexConfigs = _config.getJsonIndexConfigs(); - for (String columnName : jsonIndexConfigs.keySet()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create json index for column: %s because it is not in schema", columnName); - } - - Set<String> forwardIndexDisabledColumns = new HashSet<>(); - for (String columnName : _config.getForwardIndexDisabledColumns()) { - Preconditions.checkState(schema.hasColumn(columnName), String.format("Invalid config. Can't disable " - + "forward index creation for a column: %s that does not exist in schema", columnName)); - forwardIndexDisabledColumns.add(columnName); - } - - Map<String, H3IndexConfig> h3IndexConfigs = _config.getH3IndexConfigs(); - for (String columnName : h3IndexConfigs.keySet()) { - Preconditions.checkState(schema.hasColumn(columnName), - "Cannot create H3 index for column: %s because it is not in schema", columnName); - } + _creatorsByColAndIndex = Maps.newHashMapWithExpectedSize(indexConfigs.keySet().size()); - // Initialize creators for dictionary, forward index and inverted index - IndexingConfig indexingConfig = _config.getTableConfig().getIndexingConfig(); - int rangeIndexVersion = indexingConfig.getRangeIndexVersion(); - for (FieldSpec fieldSpec : fieldSpecs) { - // Ignore virtual columns + for (String columnName : indexConfigs.keySet()) { + FieldSpec fieldSpec = schema.getFieldSpecFor(columnName); + if (fieldSpec == null) { + Preconditions.checkState(schema.hasColumn(columnName), + "Cannot create inverted index for column: %s because it is not in schema", columnName); + } if (fieldSpec.isVirtualColumn()) { + LOGGER.warn("Ignoring index creation for virtual column " + columnName); continue; } - String columnName = fieldSpec.getName(); + FieldIndexConfigs originalConfig = indexConfigs.get(columnName); ColumnIndexCreationInfo columnIndexCreationInfo = indexCreationInfoMap.get(columnName); Preconditions.checkNotNull(columnIndexCreationInfo, "Missing index creation info for column: %s", columnName); boolean dictEnabledColumn = createDictionaryForColumn(columnIndexCreationInfo, segmentCreationSpec, fieldSpec); - Preconditions.checkState(dictEnabledColumn || !invertedIndexColumns.contains(columnName), + Preconditions.checkState(dictEnabledColumn || !originalConfig.getConfig(StandardIndexes.inverted()).isEnabled(), "Cannot create inverted index for raw index column: %s", columnName); - boolean forwardIndexDisabled = forwardIndexDisabledColumns.contains(columnName); + IndexType<ForwardIndexConfig, ?, ForwardIndexCreator> forwardIdx = StandardIndexes.forward(); + boolean forwardIndexDisabled = !originalConfig.getConfig(forwardIdx).isEnabled(); IndexCreationContext.Common context = IndexCreationContext.builder() .withIndexDir(_indexDir) - .withCardinality(columnIndexCreationInfo.getDistinctValueCount()) .withDictionary(dictEnabledColumn) .withFieldSpec(fieldSpec) .withTotalDocs(segmentIndexCreationInfo.getTotalDocs()) - .withMinValue((Comparable<?>) columnIndexCreationInfo.getMin()) - .withMaxValue((Comparable<?>) columnIndexCreationInfo.getMax()) - .withTotalNumberOfEntries(columnIndexCreationInfo.getTotalNumberOfEntries()) .withColumnIndexCreationInfo(columnIndexCreationInfo) - .sorted(columnIndexCreationInfo.isSorted()) + .withIsOptimizedDictionary(_config.isOptimizeDictionary() + || _config.isOptimizeDictionaryForMetrics() && fieldSpec.getFieldType() == FieldSpec.FieldType.METRIC) .onHeap(segmentCreationSpec.isOnHeap()) .withForwardIndexDisabled(forwardIndexDisabled) + .withFixedLength(columnIndexCreationInfo.isFixedLength()) + .withTextCommitOnClose(true) .build(); - // Initialize forward index creator - ChunkCompressionType chunkCompressionType = - dictEnabledColumn ? null : getColumnCompressionType(segmentCreationSpec, fieldSpec); - // Sorted columns treat the 'forwardIndexDisabled' flag as a no-op - _forwardIndexCreatorMap.put(columnName, (forwardIndexDisabled && !columnIndexCreationInfo.isSorted()) - ? null : _indexCreatorProvider.newForwardIndexCreator( - context.forForwardIndex(chunkCompressionType, segmentCreationSpec.getColumnProperties()))); - - // Initialize inverted index creator; skip creating inverted index if sorted - if (invertedIndexColumns.contains(columnName) && !columnIndexCreationInfo.isSorted()) { - _invertedIndexCreatorMap.put(columnName, - _indexCreatorProvider.newInvertedIndexCreator(context.forInvertedIndex())); - } + + FieldIndexConfigs config = adaptConfig(columnName, originalConfig, columnIndexCreationInfo, segmentCreationSpec); + if (dictEnabledColumn) { // Create dictionary-encoded index // Initialize dictionary creator // TODO: Dictionary creator holds all unique values on heap. Consider keeping dictionary instead of creator // which uses off-heap memory. - SegmentDictionaryCreator dictionaryCreator = - new SegmentDictionaryCreator(fieldSpec, _indexDir, columnIndexCreationInfo.isUseVarLengthDictionary()); - _dictionaryCreatorMap.put(columnName, dictionaryCreator); - // Create dictionary + + DictionaryIndexType dictIdx = DictionaryIndexType.INSTANCE; + // Index conf should be present if dictEnabledColumn is true. In case it doesn't, getConfig will throw an + // exception + DictionaryIndexConfig dictConfig = config.getConfig(dictIdx); + if (!dictConfig.isEnabled()) { + throw new IllegalArgumentException("Dictionary index should be enabled"); + } + SegmentDictionaryCreator creator = dictIdx.createIndexCreator(context, dictConfig); + try { - dictionaryCreator.build(columnIndexCreationInfo.getSortedUniqueElementsArray()); + creator.build(context.getSortedUniqueElementsArray()); } catch (Exception e) { LOGGER.error("Error building dictionary for field: {}, cardinality: {}, number of bytes per entry: {}", - fieldSpec.getName(), columnIndexCreationInfo.getDistinctValueCount(), - dictionaryCreator.getNumBytesPerEntry()); + context.getFieldSpec().getName(), context.getCardinality(), creator.getNumBytesPerEntry()); throw e; } - } - if (bloomFilterColumns.contains(columnName)) { - if (indexingConfig.getBloomFilterConfigs() != null - && indexingConfig.getBloomFilterConfigs().containsKey(columnName)) { - _bloomFilterCreatorMap.put(columnName, _indexCreatorProvider.newBloomFilterCreator( - context.forBloomFilter(indexingConfig.getBloomFilterConfigs().get(columnName)))); - } else { - _bloomFilterCreatorMap.put(columnName, _indexCreatorProvider.newBloomFilterCreator( - context.forBloomFilter(new BloomFilterConfig(BloomFilterConfig.DEFAULT_FPP, 0, false)))); - } + _dictionaryCreatorMap.put(columnName, creator); } - if (!columnIndexCreationInfo.isSorted() && rangeIndexColumns.contains(columnName)) { - _rangeIndexFilterCreatorMap.put(columnName, - _indexCreatorProvider.newRangeIndexCreator(context.forRangeIndex(rangeIndexVersion))); + Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex = + Maps.newHashMapWithExpectedSize(IndexService.getInstance().getAllIndexes().size()); + for (IndexType<?, ?, ?> index : IndexService.getInstance().getAllIndexes()) { + if (skipIndexType(index)) { + continue; + } + if (config.getConfig(index).isEnabled()) { + tryCreateCreator(creatorsByIndex, index, context, config); + } } - - if (textIndexColumns.contains(columnName)) { - FSTType fstType = FSTType.LUCENE; - List<FieldConfig> fieldConfigList = _config.getTableConfig().getFieldConfigList(); - if (fieldConfigList != null) { - for (FieldConfig fieldConfig : fieldConfigList) { - if (fieldConfig.getName().equals(columnName)) { - Map<String, String> properties = fieldConfig.getProperties(); - if (TextIndexUtils.isFstTypeNative(properties)) { - fstType = FSTType.NATIVE; - } - } - } + // TODO: Remove this when values stored as ForwardIndex stop depending on TextIndex config + IndexCreator oldFwdCreator = creatorsByIndex.get(forwardIdx); + if (oldFwdCreator instanceof AbstractForwardIndexCreator) { // this implies that oldFwdCreator != null + Object fakeForwardValue = calculateAlternativeValue(dictEnabledColumn, config, fieldSpec); Review Comment: this part seems quite different from previous logic. 1. why call it fake? what's the fakeFwdValue used for? 2. `when values stored as ForwardIndex stop depending on TextIndex config`, is this about the `_rawValueForTextIndex` set in TextIndexConfig? It sounds like we are tracking raw value with the config object? ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java: ########## @@ -379,340 +330,94 @@ private boolean shouldCreateDictionaryWithinThreshold(ColumnIndexCreationInfo in } /** - * Helper method that returns compression type to use based on segment creation spec and field type. - * <ul> - * <li> Returns compression type from segment creation spec, if specified there.</li> - * <li> Else, returns PASS_THROUGH for metrics, and SNAPPY for dimensions. This is because metrics are likely - * to be spread in different chunks after applying predicates. Same could be true for dimensions, but in that - * case, clients are expected to explicitly specify the appropriate compression type in the spec. </li> - * </ul> - * @param segmentCreationSpec Segment creation spec - * @param fieldSpec Field spec for the column - * @return Compression type to use + * @deprecated use {@link ForwardIndexType#getDefaultCompressionType(FieldType)} instead */ - private ChunkCompressionType getColumnCompressionType(SegmentGeneratorConfig segmentCreationSpec, - FieldSpec fieldSpec) { - ChunkCompressionType compressionType = segmentCreationSpec.getRawIndexCompressionType().get(fieldSpec.getName()); - if (compressionType == null) { - compressionType = getDefaultCompressionType(fieldSpec.getFieldType()); - } - - return compressionType; - } - + @Deprecated public static ChunkCompressionType getDefaultCompressionType(FieldType fieldType) { - if (fieldType == FieldSpec.FieldType.METRIC) { - return ChunkCompressionType.PASS_THROUGH; - } else { - return ChunkCompressionType.LZ4; - } + return ForwardIndexType.getDefaultCompressionType(fieldType); } @Override public void indexRow(GenericRow row) throws IOException { - for (Map.Entry<String, ForwardIndexCreator> entry : _forwardIndexCreatorMap.entrySet()) { - String columnName = entry.getKey(); - ForwardIndexCreator forwardIndexCreator = entry.getValue(); - - Object columnValueToIndex = row.getValue(columnName); - if (columnValueToIndex == null) { - throw new RuntimeException("Null value for column:" + columnName); - } + for (Map.Entry<String, Map<IndexType<?, ?, ?>, IndexCreator>> byColEntry : _creatorsByColAndIndex.entrySet()) { + String columnName = byColEntry.getKey(); + Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex = byColEntry.getValue(); FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName); - - //get dictionaryCreator, will be null if column is not dictionaryEncoded SegmentDictionaryCreator dictionaryCreator = _dictionaryCreatorMap.get(columnName); - // bloom filter - BloomFilterCreator bloomFilterCreator = _bloomFilterCreatorMap.get(columnName); - if (bloomFilterCreator != null) { - if (fieldSpec.isSingleValueField()) { - if (fieldSpec.getDataType() == DataType.BYTES) { - bloomFilterCreator.add(BytesUtils.toHexString((byte[]) columnValueToIndex)); - } else { - bloomFilterCreator.add(columnValueToIndex.toString()); - } - } else { - Object[] values = (Object[]) columnValueToIndex; - if (fieldSpec.getDataType() == DataType.BYTES) { - for (Object value : values) { - bloomFilterCreator.add(BytesUtils.toHexString((byte[]) value)); - } - } else { - for (Object value : values) { - bloomFilterCreator.add(value.toString()); - } - } - } - } - - // range index - CombinedInvertedIndexCreator combinedInvertedIndexCreator = _rangeIndexFilterCreatorMap.get(columnName); - if (combinedInvertedIndexCreator != null) { - if (dictionaryCreator != null) { - if (fieldSpec.isSingleValueField()) { - combinedInvertedIndexCreator.add(dictionaryCreator.indexOfSV(columnValueToIndex)); - } else { - int[] dictIds = dictionaryCreator.indexOfMV(columnValueToIndex); - combinedInvertedIndexCreator.add(dictIds, dictIds.length); - } - } else { - if (fieldSpec.isSingleValueField()) { - switch (fieldSpec.getDataType()) { - case INT: - combinedInvertedIndexCreator.add((Integer) columnValueToIndex); - break; - case LONG: - combinedInvertedIndexCreator.add((Long) columnValueToIndex); - break; - case FLOAT: - combinedInvertedIndexCreator.add((Float) columnValueToIndex); - break; - case DOUBLE: - combinedInvertedIndexCreator.add((Double) columnValueToIndex); - break; - default: - throw new RuntimeException("Unsupported data type " + fieldSpec.getDataType() + " for range index"); - } - } else { - Object[] values = (Object[]) columnValueToIndex; - switch (fieldSpec.getDataType()) { - case INT: - int[] intValues = new int[values.length]; - for (int i = 0; i < values.length; i++) { - intValues[i] = (Integer) values[i]; - } - combinedInvertedIndexCreator.add(intValues, values.length); - break; - case LONG: - long[] longValues = new long[values.length]; - for (int i = 0; i < values.length; i++) { - longValues[i] = (Long) values[i]; - } - combinedInvertedIndexCreator.add(longValues, values.length); - break; - case FLOAT: - float[] floatValues = new float[values.length]; - for (int i = 0; i < values.length; i++) { - floatValues[i] = (Float) values[i]; - } - combinedInvertedIndexCreator.add(floatValues, values.length); - break; - case DOUBLE: - double[] doubleValues = new double[values.length]; - for (int i = 0; i < values.length; i++) { - doubleValues[i] = (Double) values[i]; - } - combinedInvertedIndexCreator.add(doubleValues, values.length); - break; - default: - throw new RuntimeException("Unsupported data type " + fieldSpec.getDataType() + " for range index"); - } - } - } - } - - // text-index - TextIndexCreator textIndexCreator = _textIndexCreatorMap.get(columnName); - if (textIndexCreator != null) { - if (fieldSpec.isSingleValueField()) { - textIndexCreator.add((String) columnValueToIndex); - } else { - Object[] values = (Object[]) columnValueToIndex; - int length = values.length; - if (values instanceof String[]) { - textIndexCreator.add((String[]) values, length); - } else { - String[] strings = new String[length]; - for (int i = 0; i < length; i++) { - strings[i] = (String) values[i]; - } - textIndexCreator.add(strings, length); - columnValueToIndex = strings; - } - } + Object columnValueToIndex = row.getValue(columnName); + if (columnValueToIndex == null) { + throw new RuntimeException("Null value for column:" + columnName); } if (fieldSpec.isSingleValueField()) { - // Single Value column - JsonIndexCreator jsonIndexCreator = _jsonIndexCreatorMap.get(columnName); - if (jsonIndexCreator != null) { - jsonIndexCreator.add((String) columnValueToIndex); - } - GeoSpatialIndexCreator h3IndexCreator = _h3IndexCreatorMap.get(columnName); - if (h3IndexCreator != null) { - h3IndexCreator.add(GeometrySerializer.deserialize((byte[]) columnValueToIndex)); - } - if (dictionaryCreator != null) { - // dictionary encoded SV column - // get dictID from dictionary - int dictId = dictionaryCreator.indexOfSV(columnValueToIndex); - // store the docID -> dictID mapping in forward index - if (forwardIndexCreator != null) { - forwardIndexCreator.putDictId(dictId); - } - DictionaryBasedInvertedIndexCreator invertedIndexCreator = _invertedIndexCreatorMap.get(columnName); - if (invertedIndexCreator != null) { - // if inverted index enabled during segment creation, - // then store dictID -> docID mapping in inverted index - invertedIndexCreator.add(dictId); - } - } else { - // non-dictionary encoded SV column - // store the docId -> raw value mapping in forward index - if (textIndexCreator != null && !shouldStoreRawValueForTextIndex(columnName)) { - // for text index on raw columns, check the config to determine if actual raw value should - // be stored or not - columnValueToIndex = _columnProperties.get(columnName).get(FieldConfig.TEXT_INDEX_RAW_VALUE); - if (columnValueToIndex == null) { - columnValueToIndex = FieldConfig.TEXT_INDEX_DEFAULT_RAW_VALUE; - } - } - if (forwardIndexCreator != null) { - switch (forwardIndexCreator.getValueType()) { - case INT: - forwardIndexCreator.putInt((int) columnValueToIndex); - break; - case LONG: - forwardIndexCreator.putLong((long) columnValueToIndex); - break; - case FLOAT: - forwardIndexCreator.putFloat((float) columnValueToIndex); - break; - case DOUBLE: - forwardIndexCreator.putDouble((double) columnValueToIndex); - break; - case BIG_DECIMAL: - forwardIndexCreator.putBigDecimal((BigDecimal) columnValueToIndex); - break; - case STRING: - forwardIndexCreator.putString((String) columnValueToIndex); - break; - case BYTES: - forwardIndexCreator.putBytes((byte[]) columnValueToIndex); - break; - case JSON: - if (columnValueToIndex instanceof String) { - forwardIndexCreator.putString((String) columnValueToIndex); - } else if (columnValueToIndex instanceof byte[]) { - forwardIndexCreator.putBytes((byte[]) columnValueToIndex); - } - break; - default: - throw new IllegalStateException(); - } - } - } + indexSingleValueRow(dictionaryCreator, columnValueToIndex, creatorsByIndex); } else { - if (dictionaryCreator != null) { - //dictionary encoded - int[] dictIds = dictionaryCreator.indexOfMV(columnValueToIndex); - if (forwardIndexCreator != null) { - forwardIndexCreator.putDictIdMV(dictIds); - } - DictionaryBasedInvertedIndexCreator invertedIndexCreator = _invertedIndexCreatorMap.get(columnName); - if (invertedIndexCreator != null) { - invertedIndexCreator.add(dictIds, dictIds.length); - } - } else { - // for text index on raw columns, check the config to determine if actual raw value should - // be stored or not - if (forwardIndexCreator != null) { - if (textIndexCreator != null && !shouldStoreRawValueForTextIndex(columnName)) { - Object value = _columnProperties.get(columnName).get(FieldConfig.TEXT_INDEX_RAW_VALUE); - if (value == null) { - value = FieldConfig.TEXT_INDEX_DEFAULT_RAW_VALUE; - } - if (forwardIndexCreator.getValueType().getStoredType() == DataType.STRING) { - columnValueToIndex = new String[]{String.valueOf(value)}; - } else if (forwardIndexCreator.getValueType().getStoredType() == DataType.BYTES) { - columnValueToIndex = new byte[][]{String.valueOf(value).getBytes(UTF_8)}; - } else { - throw new RuntimeException("Text Index is only supported for STRING and BYTES stored type"); - } - } - Object[] values = (Object[]) columnValueToIndex; - int length = values.length; - switch (forwardIndexCreator.getValueType()) { - case INT: - int[] ints = new int[length]; - for (int i = 0; i < length; i++) { - ints[i] = (Integer) values[i]; - } - forwardIndexCreator.putIntMV(ints); - break; - case LONG: - long[] longs = new long[length]; - for (int i = 0; i < length; i++) { - longs[i] = (Long) values[i]; - } - forwardIndexCreator.putLongMV(longs); - break; - case FLOAT: - float[] floats = new float[length]; - for (int i = 0; i < length; i++) { - floats[i] = (Float) values[i]; - } - forwardIndexCreator.putFloatMV(floats); - break; - case DOUBLE: - double[] doubles = new double[length]; - for (int i = 0; i < length; i++) { - doubles[i] = (Double) values[i]; - } - forwardIndexCreator.putDoubleMV(doubles); - break; - case STRING: - if (values instanceof String[]) { - forwardIndexCreator.putStringMV((String[]) values); - } else { - String[] strings = new String[length]; - for (int i = 0; i < length; i++) { - strings[i] = (String) values[i]; - } - forwardIndexCreator.putStringMV(strings); - } - break; - case BYTES: - if (values instanceof byte[][]) { - forwardIndexCreator.putBytesMV((byte[][]) values); - } else { - byte[][] bytesArray = new byte[length][]; - for (int i = 0; i < length; i++) { - bytesArray[i] = (byte[]) values[i]; - } - forwardIndexCreator.putBytesMV(bytesArray); - } - break; - default: - throw new IllegalStateException(); - } - } - } + indexMultiValueRow(dictionaryCreator, (Object[]) columnValueToIndex, creatorsByIndex); } + } - if (_nullHandlingEnabled) { + if (_nullHandlingEnabled) { + for (Map.Entry<String, NullValueVectorCreator> entry : _nullValueVectorCreatorMap.entrySet()) { + String columnName = entry.getKey(); // If row has null value for given column name, add to null value vector if (row.isNullValue(columnName)) { _nullValueVectorCreatorMap.get(columnName).setNull(_docIdCounter); } } } + _docIdCounter++; } - private boolean shouldStoreRawValueForTextIndex(String column) { - if (_columnProperties != null) { - Map<String, String> props = _columnProperties.get(column); - // by default always store the raw value - // if the config is set to true, don't store the actual raw value - // there will be a dummy value - return props == null || !Boolean.parseBoolean(props.get(FieldConfig.TEXT_INDEX_NO_RAW_DATA)); + private void indexSingleValueRow(SegmentDictionaryCreator dictionaryCreator, Object value, + Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex) + throws IOException { + int dictId = dictionaryCreator != null ? dictionaryCreator.indexOfSV(value) : -1; + for (IndexCreator creator : creatorsByIndex.values()) { + creator.add(value, dictId); } + } - return true; + private void indexMultiValueRow(SegmentDictionaryCreator dictionaryCreator, Object[] values, + Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex) + throws IOException { + int[] dictId = dictionaryCreator != null ? dictionaryCreator.indexOfMV(values) : null; + for (IndexCreator creator : creatorsByIndex.values()) { + creator.add(values, dictId); + } + } + + @Nullable + private Object calculateAlternativeValue(boolean dictEnabledColumn, FieldIndexConfigs configs, FieldSpec fieldSpec) { + if (dictEnabledColumn) { + return null; + } + TextIndexConfig textDeclaration = configs.getConfig(StandardIndexes.text()); Review Comment: s/textDeclaration/config -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org