klsince commented on code in PR #10184: URL: https://github.com/apache/pinot/pull/10184#discussion_r1152440038
########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/dictionary/DictionaryIndexType.java: ########## @@ -19,74 +19,265 @@ package org.apache.pinot.segment.local.segment.index.dictionary; +import com.google.common.collect.Sets; +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; import javax.annotation.Nullable; +import org.apache.pinot.segment.local.segment.creator.impl.SegmentDictionaryCreator; +import org.apache.pinot.segment.local.segment.index.loader.ConfigurableFromIndexLoadingConfig; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.local.segment.index.readers.BigDecimalDictionary; +import org.apache.pinot.segment.local.segment.index.readers.BytesDictionary; +import org.apache.pinot.segment.local.segment.index.readers.DoubleDictionary; +import org.apache.pinot.segment.local.segment.index.readers.FloatDictionary; +import org.apache.pinot.segment.local.segment.index.readers.IntDictionary; +import org.apache.pinot.segment.local.segment.index.readers.LongDictionary; +import org.apache.pinot.segment.local.segment.index.readers.OnHeapBigDecimalDictionary; +import org.apache.pinot.segment.local.segment.index.readers.OnHeapBytesDictionary; +import org.apache.pinot.segment.local.segment.index.readers.OnHeapDoubleDictionary; +import org.apache.pinot.segment.local.segment.index.readers.OnHeapFloatDictionary; +import org.apache.pinot.segment.local.segment.index.readers.OnHeapIntDictionary; +import org.apache.pinot.segment.local.segment.index.readers.OnHeapLongDictionary; +import org.apache.pinot.segment.local.segment.index.readers.OnHeapStringDictionary; +import org.apache.pinot.segment.local.segment.index.readers.StringDictionary; import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.creator.ColumnStatistics; import org.apache.pinot.segment.spi.creator.IndexCreationContext; +import org.apache.pinot.segment.spi.index.AbstractIndexType; +import org.apache.pinot.segment.spi.index.ColumnConfigDeserializer; +import org.apache.pinot.segment.spi.index.DictionaryIndexConfig; import org.apache.pinot.segment.spi.index.FieldIndexConfigs; -import org.apache.pinot.segment.spi.index.IndexCreator; +import org.apache.pinot.segment.spi.index.IndexConfigDeserializer; import org.apache.pinot.segment.spi.index.IndexHandler; -import org.apache.pinot.segment.spi.index.IndexReader; +import org.apache.pinot.segment.spi.index.IndexReaderConstraintException; import org.apache.pinot.segment.spi.index.IndexReaderFactory; import org.apache.pinot.segment.spi.index.IndexType; import org.apache.pinot.segment.spi.index.StandardIndexes; +import org.apache.pinot.segment.spi.index.reader.Dictionary; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; import org.apache.pinot.segment.spi.store.SegmentDirectory; -import org.apache.pinot.spi.config.table.IndexConfig; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.IndexingConfig; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class DictionaryIndexType implements IndexType<IndexConfig, IndexReader, IndexCreator> { - public static final DictionaryIndexType INSTANCE = new DictionaryIndexType(); +public class DictionaryIndexType + extends AbstractIndexType<DictionaryIndexConfig, Dictionary, SegmentDictionaryCreator> + implements ConfigurableFromIndexLoadingConfig<DictionaryIndexConfig> { + private static final Logger LOGGER = LoggerFactory.getLogger(DictionaryIndexType.class); - private DictionaryIndexType() { + protected DictionaryIndexType() { + super(StandardIndexes.DICTIONARY_ID); } @Override - public String getId() { - return StandardIndexes.DICTIONARY_ID; + public Class<DictionaryIndexConfig> getIndexConfigClass() { + return DictionaryIndexConfig.class; } @Override - public Class<IndexConfig> getIndexConfigClass() { - return IndexConfig.class; + public Map<String, DictionaryIndexConfig> fromIndexLoadingConfig( + IndexLoadingConfig indexLoadingConfig) { + Map<String, DictionaryIndexConfig> result = new HashMap<>(); + Set<String> noDictionaryColumns = indexLoadingConfig.getNoDictionaryColumns(); Review Comment: s/noDictionaryColumns/noDictionaryCols to be consistent with the other two ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java: ########## @@ -379,340 +327,96 @@ private boolean shouldCreateDictionaryWithinThreshold(ColumnIndexCreationInfo in } /** - * Helper method that returns compression type to use based on segment creation spec and field type. - * <ul> - * <li> Returns compression type from segment creation spec, if specified there.</li> - * <li> Else, returns PASS_THROUGH for metrics, and SNAPPY for dimensions. This is because metrics are likely - * to be spread in different chunks after applying predicates. Same could be true for dimensions, but in that - * case, clients are expected to explicitly specify the appropriate compression type in the spec. </li> - * </ul> - * @param segmentCreationSpec Segment creation spec - * @param fieldSpec Field spec for the column - * @return Compression type to use + * @deprecated use {@link ForwardIndexType#getDefaultCompressionType(FieldType)} instead */ - private ChunkCompressionType getColumnCompressionType(SegmentGeneratorConfig segmentCreationSpec, - FieldSpec fieldSpec) { - ChunkCompressionType compressionType = segmentCreationSpec.getRawIndexCompressionType().get(fieldSpec.getName()); - if (compressionType == null) { - compressionType = getDefaultCompressionType(fieldSpec.getFieldType()); - } - - return compressionType; - } - + @Deprecated public static ChunkCompressionType getDefaultCompressionType(FieldType fieldType) { - if (fieldType == FieldSpec.FieldType.METRIC) { - return ChunkCompressionType.PASS_THROUGH; - } else { - return ChunkCompressionType.LZ4; - } + return ForwardIndexType.getDefaultCompressionType(fieldType); } @Override public void indexRow(GenericRow row) throws IOException { - for (Map.Entry<String, ForwardIndexCreator> entry : _forwardIndexCreatorMap.entrySet()) { - String columnName = entry.getKey(); - ForwardIndexCreator forwardIndexCreator = entry.getValue(); + for (Map.Entry<String, Map<IndexType<?, ?, ?>, IndexCreator>> byColEntry : _creatorsByColAndIndex.entrySet()) { + String columnName = byColEntry.getKey(); Object columnValueToIndex = row.getValue(columnName); if (columnValueToIndex == null) { throw new RuntimeException("Null value for column:" + columnName); } - FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName); + Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex = byColEntry.getValue(); - //get dictionaryCreator, will be null if column is not dictionaryEncoded + FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName); SegmentDictionaryCreator dictionaryCreator = _dictionaryCreatorMap.get(columnName); - // bloom filter - BloomFilterCreator bloomFilterCreator = _bloomFilterCreatorMap.get(columnName); - if (bloomFilterCreator != null) { - if (fieldSpec.isSingleValueField()) { - if (fieldSpec.getDataType() == DataType.BYTES) { - bloomFilterCreator.add(BytesUtils.toHexString((byte[]) columnValueToIndex)); - } else { - bloomFilterCreator.add(columnValueToIndex.toString()); - } - } else { - Object[] values = (Object[]) columnValueToIndex; - if (fieldSpec.getDataType() == DataType.BYTES) { - for (Object value : values) { - bloomFilterCreator.add(BytesUtils.toHexString((byte[]) value)); - } - } else { - for (Object value : values) { - bloomFilterCreator.add(value.toString()); - } - } - } - } - - // range index - CombinedInvertedIndexCreator combinedInvertedIndexCreator = _rangeIndexFilterCreatorMap.get(columnName); - if (combinedInvertedIndexCreator != null) { - if (dictionaryCreator != null) { - if (fieldSpec.isSingleValueField()) { - combinedInvertedIndexCreator.add(dictionaryCreator.indexOfSV(columnValueToIndex)); - } else { - int[] dictIds = dictionaryCreator.indexOfMV(columnValueToIndex); - combinedInvertedIndexCreator.add(dictIds, dictIds.length); - } - } else { - if (fieldSpec.isSingleValueField()) { - switch (fieldSpec.getDataType()) { - case INT: - combinedInvertedIndexCreator.add((Integer) columnValueToIndex); - break; - case LONG: - combinedInvertedIndexCreator.add((Long) columnValueToIndex); - break; - case FLOAT: - combinedInvertedIndexCreator.add((Float) columnValueToIndex); - break; - case DOUBLE: - combinedInvertedIndexCreator.add((Double) columnValueToIndex); - break; - default: - throw new RuntimeException("Unsupported data type " + fieldSpec.getDataType() + " for range index"); - } - } else { - Object[] values = (Object[]) columnValueToIndex; - switch (fieldSpec.getDataType()) { - case INT: - int[] intValues = new int[values.length]; - for (int i = 0; i < values.length; i++) { - intValues[i] = (Integer) values[i]; - } - combinedInvertedIndexCreator.add(intValues, values.length); - break; - case LONG: - long[] longValues = new long[values.length]; - for (int i = 0; i < values.length; i++) { - longValues[i] = (Long) values[i]; - } - combinedInvertedIndexCreator.add(longValues, values.length); - break; - case FLOAT: - float[] floatValues = new float[values.length]; - for (int i = 0; i < values.length; i++) { - floatValues[i] = (Float) values[i]; - } - combinedInvertedIndexCreator.add(floatValues, values.length); - break; - case DOUBLE: - double[] doubleValues = new double[values.length]; - for (int i = 0; i < values.length; i++) { - doubleValues[i] = (Double) values[i]; - } - combinedInvertedIndexCreator.add(doubleValues, values.length); - break; - default: - throw new RuntimeException("Unsupported data type " + fieldSpec.getDataType() + " for range index"); - } - } - } - } - - // text-index - TextIndexCreator textIndexCreator = _textIndexCreatorMap.get(columnName); - if (textIndexCreator != null) { - if (fieldSpec.isSingleValueField()) { - textIndexCreator.add((String) columnValueToIndex); - } else { - Object[] values = (Object[]) columnValueToIndex; - int length = values.length; - if (values instanceof String[]) { - textIndexCreator.add((String[]) values, length); - } else { - String[] strings = new String[length]; - for (int i = 0; i < length; i++) { - strings[i] = (String) values[i]; - } - textIndexCreator.add(strings, length); - columnValueToIndex = strings; - } - } - } - if (fieldSpec.isSingleValueField()) { - // Single Value column - JsonIndexCreator jsonIndexCreator = _jsonIndexCreatorMap.get(columnName); - if (jsonIndexCreator != null) { - jsonIndexCreator.add((String) columnValueToIndex); - } - GeoSpatialIndexCreator h3IndexCreator = _h3IndexCreatorMap.get(columnName); - if (h3IndexCreator != null) { - h3IndexCreator.add(GeometrySerializer.deserialize((byte[]) columnValueToIndex)); - } - if (dictionaryCreator != null) { - // dictionary encoded SV column - // get dictID from dictionary - int dictId = dictionaryCreator.indexOfSV(columnValueToIndex); - // store the docID -> dictID mapping in forward index - if (forwardIndexCreator != null) { - forwardIndexCreator.putDictId(dictId); - } - DictionaryBasedInvertedIndexCreator invertedIndexCreator = _invertedIndexCreatorMap.get(columnName); - if (invertedIndexCreator != null) { - // if inverted index enabled during segment creation, - // then store dictID -> docID mapping in inverted index - invertedIndexCreator.add(dictId); - } - } else { - // non-dictionary encoded SV column - // store the docId -> raw value mapping in forward index - if (textIndexCreator != null && !shouldStoreRawValueForTextIndex(columnName)) { - // for text index on raw columns, check the config to determine if actual raw value should - // be stored or not - columnValueToIndex = _columnProperties.get(columnName).get(FieldConfig.TEXT_INDEX_RAW_VALUE); - if (columnValueToIndex == null) { - columnValueToIndex = FieldConfig.TEXT_INDEX_DEFAULT_RAW_VALUE; - } - } - if (forwardIndexCreator != null) { - switch (forwardIndexCreator.getValueType()) { - case INT: - forwardIndexCreator.putInt((int) columnValueToIndex); - break; - case LONG: - forwardIndexCreator.putLong((long) columnValueToIndex); - break; - case FLOAT: - forwardIndexCreator.putFloat((float) columnValueToIndex); - break; - case DOUBLE: - forwardIndexCreator.putDouble((double) columnValueToIndex); - break; - case BIG_DECIMAL: - forwardIndexCreator.putBigDecimal((BigDecimal) columnValueToIndex); - break; - case STRING: - forwardIndexCreator.putString((String) columnValueToIndex); - break; - case BYTES: - forwardIndexCreator.putBytes((byte[]) columnValueToIndex); - break; - case JSON: - if (columnValueToIndex instanceof String) { - forwardIndexCreator.putString((String) columnValueToIndex); - } else if (columnValueToIndex instanceof byte[]) { - forwardIndexCreator.putBytes((byte[]) columnValueToIndex); - } - break; - default: - throw new IllegalStateException(); - } - } - } + indexSingleValueRow(dictionaryCreator, columnValueToIndex, creatorsByIndex); } else { - if (dictionaryCreator != null) { - //dictionary encoded - int[] dictIds = dictionaryCreator.indexOfMV(columnValueToIndex); - if (forwardIndexCreator != null) { - forwardIndexCreator.putDictIdMV(dictIds); - } - DictionaryBasedInvertedIndexCreator invertedIndexCreator = _invertedIndexCreatorMap.get(columnName); - if (invertedIndexCreator != null) { - invertedIndexCreator.add(dictIds, dictIds.length); - } - } else { - // for text index on raw columns, check the config to determine if actual raw value should - // be stored or not - if (forwardIndexCreator != null) { - if (textIndexCreator != null && !shouldStoreRawValueForTextIndex(columnName)) { - Object value = _columnProperties.get(columnName).get(FieldConfig.TEXT_INDEX_RAW_VALUE); - if (value == null) { - value = FieldConfig.TEXT_INDEX_DEFAULT_RAW_VALUE; - } - if (forwardIndexCreator.getValueType().getStoredType() == DataType.STRING) { - columnValueToIndex = new String[]{String.valueOf(value)}; - } else if (forwardIndexCreator.getValueType().getStoredType() == DataType.BYTES) { - columnValueToIndex = new byte[][]{String.valueOf(value).getBytes(UTF_8)}; - } else { - throw new RuntimeException("Text Index is only supported for STRING and BYTES stored type"); - } - } - Object[] values = (Object[]) columnValueToIndex; - int length = values.length; - switch (forwardIndexCreator.getValueType()) { - case INT: - int[] ints = new int[length]; - for (int i = 0; i < length; i++) { - ints[i] = (Integer) values[i]; - } - forwardIndexCreator.putIntMV(ints); - break; - case LONG: - long[] longs = new long[length]; - for (int i = 0; i < length; i++) { - longs[i] = (Long) values[i]; - } - forwardIndexCreator.putLongMV(longs); - break; - case FLOAT: - float[] floats = new float[length]; - for (int i = 0; i < length; i++) { - floats[i] = (Float) values[i]; - } - forwardIndexCreator.putFloatMV(floats); - break; - case DOUBLE: - double[] doubles = new double[length]; - for (int i = 0; i < length; i++) { - doubles[i] = (Double) values[i]; - } - forwardIndexCreator.putDoubleMV(doubles); - break; - case STRING: - if (values instanceof String[]) { - forwardIndexCreator.putStringMV((String[]) values); - } else { - String[] strings = new String[length]; - for (int i = 0; i < length; i++) { - strings[i] = (String) values[i]; - } - forwardIndexCreator.putStringMV(strings); - } - break; - case BYTES: - if (values instanceof byte[][]) { - forwardIndexCreator.putBytesMV((byte[][]) values); - } else { - byte[][] bytesArray = new byte[length][]; - for (int i = 0; i < length; i++) { - bytesArray[i] = (byte[]) values[i]; - } - forwardIndexCreator.putBytesMV(bytesArray); - } - break; - default: - throw new IllegalStateException(); - } - } - } + indexMultiValueRow(dictionaryCreator, (Object[]) columnValueToIndex, creatorsByIndex); } + } - if (_nullHandlingEnabled) { + if (_nullHandlingEnabled) { + for (Map.Entry<String, NullValueVectorCreator> entry : _nullValueVectorCreatorMap.entrySet()) { + String columnName = entry.getKey(); // If row has null value for given column name, add to null value vector if (row.isNullValue(columnName)) { _nullValueVectorCreatorMap.get(columnName).setNull(_docIdCounter); } } } + _docIdCounter++; } - private boolean shouldStoreRawValueForTextIndex(String column) { - if (_columnProperties != null) { - Map<String, String> props = _columnProperties.get(column); - // by default always store the raw value - // if the config is set to true, don't store the actual raw value - // there will be a dummy value - return props == null || !Boolean.parseBoolean(props.get(FieldConfig.TEXT_INDEX_NO_RAW_DATA)); + private void indexSingleValueRow(SegmentDictionaryCreator dictionaryCreator, Object value, + Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex) + throws IOException { + int dictId = dictionaryCreator != null ? dictionaryCreator.indexOfSV(value) : -1; + for (IndexCreator creator : creatorsByIndex.values()) { + creator.add(value, dictId); + } + } + + private void indexMultiValueRow(SegmentDictionaryCreator dictionaryCreator, Object[] values, + Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex) + throws IOException { + int[] dictId = dictionaryCreator != null ? dictionaryCreator.indexOfMV(values) : null; + for (IndexCreator creator : creatorsByIndex.values()) { + creator.add(values, dictId); + } + } + + @Nullable + private Object calculateRawValueForTextIndex(boolean dictEnabledColumn, FieldIndexConfigs configs, + FieldSpec fieldSpec) { + if (dictEnabledColumn) { + return null; } + TextIndexConfig textConfig = configs.getConfig(StandardIndexes.text()); + if (!textConfig.isEnabled()) { + return null; + } + + Object alternativeValue = textConfig.getRawValueForTextIndex(); Review Comment: s/alternativeValue/rawValue? ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java: ########## @@ -132,41 +144,42 @@ public void updateIndices(SegmentDirectory.Writer segmentWriter, IndexCreatorPro _tmpForwardIndexColumns.add(column); break; case ENABLE_FORWARD_INDEX: - ColumnMetadata columnMetadata = createForwardIndexIfNeeded(segmentWriter, column, indexCreatorProvider, + ColumnMetadata columnMetadata = createForwardIndexIfNeeded(segmentWriter, column, Review Comment: format: fold `false` up to L147 ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java: ########## @@ -444,26 +526,31 @@ public void setColumnProperties(Map<String, Map<String, String>> columnPropertie @VisibleForTesting public void setInvertedIndexColumns(Set<String> invertedIndexColumns) { _invertedIndexColumns = new HashSet<>(invertedIndexColumns); + _dirty = true; } @VisibleForTesting public void addInvertedIndexColumns(String... invertedIndexColumns) { _invertedIndexColumns.addAll(Arrays.asList(invertedIndexColumns)); + _dirty = true; } @VisibleForTesting public void addInvertedIndexColumns(Collection<String> invertedIndexColumns) { _invertedIndexColumns.addAll(invertedIndexColumns); + _dirty = true; } @VisibleForTesting public void removeInvertedIndexColumns(String... invertedIndexColumns) { removeInvertedIndexColumns(Arrays.asList(invertedIndexColumns)); + assert _dirty; Review Comment: just to be sure if this assert is added intentionally, as it seems always able to pass after calling removeInvertedIndexColumns(Collection...) ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/InvertedIndexAndDictionaryBasedForwardIndexCreator.java: ########## @@ -266,8 +247,9 @@ public void regenerateForwardIndex() private Map<String, String> createForwardIndexForSVColumn() throws IOException { try (BitmapInvertedIndexReader invertedIndexReader = - (BitmapInvertedIndexReader) LoaderUtils.getInvertedIndexReader(_segmentWriter, _columnMetadata); - Dictionary dictionary = LoaderUtils.getDictionary(_segmentWriter, _columnMetadata)) { + (BitmapInvertedIndexReader) InvertedIndexType.ReaderFactory Review Comment: use StandardIndexes.inverted() and .dictionary() to get their index reader? and same for another place a few lines below. ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/text/TextIndexType.java: ########## @@ -19,75 +19,178 @@ package org.apache.pinot.segment.local.segment.index.text; +import com.google.common.base.Preconditions; +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; import javax.annotation.Nullable; +import org.apache.pinot.segment.local.segment.creator.impl.text.LuceneTextIndexCreator; +import org.apache.pinot.segment.local.segment.creator.impl.text.NativeTextIndexCreator; +import org.apache.pinot.segment.local.segment.index.loader.ConfigurableFromIndexLoadingConfig; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.local.segment.index.loader.invertedindex.TextIndexHandler; +import org.apache.pinot.segment.local.segment.index.readers.text.LuceneTextIndexReader; +import org.apache.pinot.segment.local.segment.index.readers.text.NativeTextIndexReader; +import org.apache.pinot.segment.local.segment.store.TextIndexUtils; import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.V1Constants; import org.apache.pinot.segment.spi.creator.IndexCreationContext; +import org.apache.pinot.segment.spi.index.AbstractIndexType; +import org.apache.pinot.segment.spi.index.ColumnConfigDeserializer; import org.apache.pinot.segment.spi.index.FieldIndexConfigs; -import org.apache.pinot.segment.spi.index.IndexCreator; +import org.apache.pinot.segment.spi.index.IndexConfigDeserializer; import org.apache.pinot.segment.spi.index.IndexHandler; -import org.apache.pinot.segment.spi.index.IndexReader; +import org.apache.pinot.segment.spi.index.IndexReaderConstraintException; import org.apache.pinot.segment.spi.index.IndexReaderFactory; -import org.apache.pinot.segment.spi.index.IndexType; import org.apache.pinot.segment.spi.index.StandardIndexes; +import org.apache.pinot.segment.spi.index.TextIndexConfig; +import org.apache.pinot.segment.spi.index.creator.TextIndexCreator; +import org.apache.pinot.segment.spi.index.reader.TextIndexReader; import org.apache.pinot.segment.spi.store.SegmentDirectory; -import org.apache.pinot.spi.config.table.IndexConfig; +import org.apache.pinot.spi.config.table.FSTType; +import org.apache.pinot.spi.config.table.FieldConfig; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class TextIndexType implements IndexType<IndexConfig, IndexReader, IndexCreator> { +public class TextIndexType extends AbstractIndexType<TextIndexConfig, TextIndexReader, TextIndexCreator> + implements ConfigurableFromIndexLoadingConfig<TextIndexConfig> { + protected static final Logger LOGGER = LoggerFactory.getLogger(TextIndexType.class); - public static final TextIndexType INSTANCE = new TextIndexType(); + protected TextIndexType() { + super(StandardIndexes.TEXT_ID); + } - private TextIndexType() { + @Override + public Class<TextIndexConfig> getIndexConfigClass() { + return TextIndexConfig.class; } @Override - public String getId() { - return StandardIndexes.TEXT_ID; + public Map<String, TextIndexConfig> fromIndexLoadingConfig(IndexLoadingConfig indexLoadingConfig) { + Map<String, Map<String, String>> allColProps = indexLoadingConfig.getColumnProperties(); + return indexLoadingConfig.getTextIndexColumns().stream().collect(Collectors.toMap( + Function.identity(), + colName -> new TextIndexConfigBuilder(indexLoadingConfig.getFSTIndexType()) + .withProperties(allColProps.get(colName)) + .build() + )); } @Override - public Class<IndexConfig> getIndexConfigClass() { - return IndexConfig.class; + public TextIndexConfig getDefaultConfig() { + return TextIndexConfig.DISABLED; } @Override - public IndexConfig getDefaultConfig() { - return IndexConfig.DISABLED; + public ColumnConfigDeserializer<TextIndexConfig> getDeserializer() { + return IndexConfigDeserializer.fromIndexes("text", getIndexConfigClass()) + .withExclusiveAlternative((tableConfig, schema) -> { + List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList(); + if (fieldConfigList == null) { + return Collections.emptyMap(); + } + // TODO(index-spi): This doesn't feel right, but it is here to keep backward compatibility. + // If there are two text indexes in two different columns and one explicitly specifies LUCENE and the but + // the other specifies native, both are created as native. No error is thrown and no log is shown. Review Comment: since you've added a warning log for this now, perhaps rephrase the last sentence a bit. ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java: ########## @@ -19,60 +19,183 @@ package org.apache.pinot.segment.local.segment.index.forward; +import com.google.common.collect.Sets; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.function.Supplier; import javax.annotation.Nullable; +import org.apache.pinot.segment.local.segment.index.loader.ConfigurableFromIndexLoadingConfig; +import org.apache.pinot.segment.local.segment.index.loader.ForwardIndexHandler; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.compression.ChunkCompressionType; import org.apache.pinot.segment.spi.creator.IndexCreationContext; +import org.apache.pinot.segment.spi.index.AbstractIndexType; +import org.apache.pinot.segment.spi.index.ColumnConfigDeserializer; import org.apache.pinot.segment.spi.index.FieldIndexConfigs; -import org.apache.pinot.segment.spi.index.IndexCreator; +import org.apache.pinot.segment.spi.index.ForwardIndexConfig; +import org.apache.pinot.segment.spi.index.IndexConfigDeserializer; import org.apache.pinot.segment.spi.index.IndexHandler; -import org.apache.pinot.segment.spi.index.IndexReader; +import org.apache.pinot.segment.spi.index.IndexReaderConstraintException; import org.apache.pinot.segment.spi.index.IndexReaderFactory; -import org.apache.pinot.segment.spi.index.IndexType; import org.apache.pinot.segment.spi.index.StandardIndexes; +import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator; +import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; import org.apache.pinot.segment.spi.store.SegmentDirectory; -import org.apache.pinot.spi.config.table.IndexConfig; +import org.apache.pinot.spi.config.table.FieldConfig; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; -public class ForwardIndexType implements IndexType<IndexConfig, IndexReader, IndexCreator> { +public class ForwardIndexType + extends AbstractIndexType<ForwardIndexConfig, ForwardIndexReader, ForwardIndexCreator> + implements ConfigurableFromIndexLoadingConfig<ForwardIndexConfig> { - public static final ForwardIndexType INSTANCE = new ForwardIndexType(); - - private ForwardIndexType() { + protected ForwardIndexType() { + super(StandardIndexes.FORWARD_ID); } @Override - public String getId() { - return StandardIndexes.FORWARD_ID; + public Class<ForwardIndexConfig> getIndexConfigClass() { + return ForwardIndexConfig.class; } @Override - public Class<IndexConfig> getIndexConfigClass() { - return null; + public Map<String, ForwardIndexConfig> fromIndexLoadingConfig(IndexLoadingConfig indexLoadingConfig) { + Set<String> disabledCols = indexLoadingConfig.getForwardIndexDisabledColumns(); + Map<String, ForwardIndexConfig> result = new HashMap<>(); + Set<String> allColumns = Sets.union(disabledCols, indexLoadingConfig.getAllKnownColumns()); + for (String column : allColumns) { + ChunkCompressionType compressionType = + indexLoadingConfig.getCompressionConfigs() != null + ? indexLoadingConfig.getCompressionConfigs().get(column) + : null; + Supplier<ForwardIndexConfig> defaultConfig = () -> { + if (compressionType == null) { + return ForwardIndexConfig.DEFAULT; + } else { + return new ForwardIndexConfig.Builder().withCompressionType(compressionType).build(); + } + }; + if (!disabledCols.contains(column)) { + TableConfig tableConfig = indexLoadingConfig.getTableConfig(); + if (tableConfig == null) { + result.put(column, defaultConfig.get()); + } else { + List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList(); + if (fieldConfigList == null) { + result.put(column, defaultConfig.get()); + continue; + } + FieldConfig fieldConfig = fieldConfigList.stream() + .filter(fc -> fc.getName().equals(column)) + .findAny() + .orElse(null); + if (fieldConfig == null) { + result.put(column, defaultConfig.get()); + continue; + } + ForwardIndexConfig.Builder builder = new ForwardIndexConfig.Builder(); + if (compressionType != null) { + builder.withCompressionType(compressionType); + } else { + FieldConfig.CompressionCodec compressionCodec = fieldConfig.getCompressionCodec(); + if (compressionCodec != null) { + builder.withCompressionType(ChunkCompressionType.valueOf(compressionCodec.name())); + } + } + + result.put(column, builder.build()); + } + } else { + result.put(column, ForwardIndexConfig.DISABLED); + } + } + return result; } @Override - public IndexConfig getDefaultConfig() { - return IndexConfig.ENABLED; + public ForwardIndexConfig getDefaultConfig() { + return ForwardIndexConfig.DEFAULT; } @Override - public IndexConfig getConfig(TableConfig tableConfig, Schema schema) { - throw new UnsupportedOperationException(); + public ColumnConfigDeserializer<ForwardIndexConfig> getDeserializer() { + // reads tableConfig.fieldConfigList and decides what to create using the FieldConfig properties and encoding + ColumnConfigDeserializer<ForwardIndexConfig> fromFieldConfig = IndexConfigDeserializer.fromCollection( + TableConfig::getFieldConfigList, + (accum, fieldConfig) -> { + Map<String, String> properties = fieldConfig.getProperties(); + if (properties != null && isDisabled(properties)) { + accum.put(fieldConfig.getName(), ForwardIndexConfig.DISABLED); + } else if (fieldConfig.getEncodingType() == FieldConfig.EncodingType.RAW) { + accum.put(fieldConfig.getName(), createConfigFromFieldConfig(fieldConfig)); + } + } + ); + return IndexConfigDeserializer.fromIndexes("forward", getIndexConfigClass()) + .withExclusiveAlternative(fromFieldConfig); + } + + private boolean isDisabled(Map<String, String> props) { + return Boolean.parseBoolean( + props.getOrDefault(FieldConfig.FORWARD_INDEX_DISABLED, FieldConfig.DEFAULT_FORWARD_INDEX_DISABLED)); + } + + private ForwardIndexConfig createConfigFromFieldConfig(FieldConfig fieldConfig) { + if (fieldConfig.getEncodingType() != FieldConfig.EncodingType.RAW) { + throw new IllegalArgumentException("Cannot build a forward index on a field whose encoding is " + + fieldConfig.getEncodingType()); + } + FieldConfig.CompressionCodec compressionCodec = fieldConfig.getCompressionCodec(); + ForwardIndexConfig.Builder builder = new ForwardIndexConfig.Builder(); + if (compressionCodec != null) { + builder.withCompressionType(ChunkCompressionType.valueOf(compressionCodec.name())); + } + + Map<String, String> properties = fieldConfig.getProperties(); + if (properties != null) { + builder.withLegacyProperties(properties); + } + + return builder.build(); + } + + public static ChunkCompressionType getDefaultCompressionType(FieldSpec.FieldType fieldType) { + if (fieldType == FieldSpec.FieldType.METRIC) { + return ChunkCompressionType.PASS_THROUGH; + } else { + return ChunkCompressionType.LZ4; + } } @Override - public IndexCreator createIndexCreator(IndexCreationContext context, IndexConfig indexConfig) + public ForwardIndexCreator createIndexCreator(IndexCreationContext context, ForwardIndexConfig indexConfig) throws Exception { - throw new UnsupportedOperationException(); + return ForwardIndexCreatorFactory.createIndexCreator(context, indexConfig); } @Override - public IndexReaderFactory<IndexReader> getReaderFactory() { - throw new UnsupportedOperationException(); + public IndexHandler createIndexHandler(SegmentDirectory segmentDirectory, Map<String, FieldIndexConfigs> configsByCol, + @Nullable Schema schema, @Nullable TableConfig tableConfig) { + return new ForwardIndexHandler(segmentDirectory, configsByCol, schema, tableConfig); + } + + @Override + public IndexReaderFactory<ForwardIndexReader> getReaderFactory() { + return ForwardIndexReaderFactory.INSTANCE; + } + + public static ForwardIndexReader<?> read(SegmentDirectory.Reader segmentReader, FieldIndexConfigs fieldIndexConfigs, + ColumnMetadata metadata) + throws IndexReaderConstraintException, IOException { + return StandardIndexes.forward().getReaderFactory().createIndexReader(segmentReader, fieldIndexConfigs, metadata); Review Comment: just call ForwardIndexReaderFactory.createIndexReader? as in last two read() methods below. ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/json/JsonIndexType.java: ########## @@ -82,11 +118,38 @@ public String getFileExtension(ColumnMetadata columnMetadata) { @Override public IndexHandler createIndexHandler(SegmentDirectory segmentDirectory, Map<String, FieldIndexConfigs> configsByCol, @Nullable Schema schema, @Nullable TableConfig tableConfig) { - throw new UnsupportedOperationException(); + return new JsonIndexHandler(segmentDirectory, configsByCol, tableConfig); } - @Override - public String toString() { - return getId(); + private static class ReaderFactory extends IndexReaderFactory.Default<JsonIndexConfig, JsonIndexReader> { Review Comment: +1 to make this private, which I believe can be applied to all other ReaderFactor classes. ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java: ########## @@ -92,14 +215,23 @@ public String getFileExtension(ColumnMetadata columnMetadata) { } } - @Override - public IndexHandler createIndexHandler(SegmentDirectory segmentDirectory, Map<String, FieldIndexConfigs> configsByCol, - @Nullable Schema schema, @Nullable TableConfig tableConfig) { - throw new UnsupportedOperationException(); + /** + * Returns the forward index reader for the given column. + */ + public static ForwardIndexReader<?> getReader(SegmentDirectory.Reader segmentReader, + ColumnMetadata columnMetadata) Review Comment: format? also, what's the method naming convention? I see read() and getReader() both return ForwardIndexReader<?>. ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java: ########## @@ -198,20 +211,21 @@ Map<String, List<Operation>> computeOperations(SegmentDirectory.Reader segmentRe // Get list of columns with forward index and those without forward index Set<String> existingForwardIndexColumns = - segmentReader.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.FORWARD_INDEX); + segmentReader.toSegmentDirectory().getColumnsWithIndex(StandardIndexes.forward()); Set<String> existingForwardIndexDisabledColumns = new HashSet<>(); for (String column : existingAllColumns) { if (!existingForwardIndexColumns.contains(column)) { existingForwardIndexDisabledColumns.add(column); } } - // From new column config. - Set<String> newNoDictColumns = _indexLoadingConfig.getNoDictionaryColumns(); - Set<String> newForwardIndexDisabledColumns = _indexLoadingConfig.getForwardIndexDisabledColumns(); - for (String column : existingAllColumns) { - if (existingForwardIndexColumns.contains(column) && newForwardIndexDisabledColumns.contains(column)) { + FieldIndexConfigs newConf = _fieldIndexConfigs.get(column); + boolean newIsFwd = newConf.getConfig(StandardIndexes.forward()).isEnabled(); + boolean newIsDict = newConf.getConfig(StandardIndexes.dictionary()).isEnabled(); + boolean newIsRange = newConf.getConfig(StandardIndexes.range()).isEnabled(); Review Comment: nit: fwdIsEnabled, dictIsEnabled, rangeIsEnabled? which I feel more straightforward to read. ########## pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/CombinedInvertedIndexCreator.java: ########## @@ -18,6 +18,80 @@ */ package org.apache.pinot.segment.spi.index.creator; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.apache.pinot.spi.data.FieldSpec; + + +/** + * This is the index used to create range indexes Review Comment: s/range/inverted ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java: ########## @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.segment.local.segment.index.forward; + +import java.io.File; +import java.io.IOException; +import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueFixedByteRawIndexCreator; +import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueUnsortedForwardIndexCreator; +import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator; +import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueFixedByteRawIndexCreator; +import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueSortedForwardIndexCreator; +import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueUnsortedForwardIndexCreator; +import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueVarByteRawIndexCreator; +import org.apache.pinot.segment.spi.compression.ChunkCompressionType; +import org.apache.pinot.segment.spi.creator.IndexCreationContext; +import org.apache.pinot.segment.spi.index.ForwardIndexConfig; +import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator; +import org.apache.pinot.spi.data.FieldSpec; + + +public class ForwardIndexCreatorFactory { + private ForwardIndexCreatorFactory() { + } + + public static ForwardIndexCreator createIndexCreator(IndexCreationContext context, ForwardIndexConfig indexConfig) + throws Exception { + String colName = context.getFieldSpec().getName(); + + if (!context.hasDictionary()) { + ChunkCompressionType chunkCompressionType = indexConfig.getChunkCompressionType(); + if (chunkCompressionType == null) { + chunkCompressionType = ForwardIndexType.getDefaultCompressionType(context.getFieldSpec().getFieldType()); + } + + // Dictionary disabled columns + boolean deriveNumDocsPerChunk = indexConfig.isDeriveNumDocsPerChunk(); + int writerVersion = indexConfig.getRawIndexWriterVersion(); + if (context.getFieldSpec().isSingleValueField()) { + return getRawIndexCreatorForSVColumn(context.getIndexDir(), chunkCompressionType, colName, + context.getFieldSpec().getDataType().getStoredType(), + context.getTotalDocs(), context.getLengthOfLongestEntry(), deriveNumDocsPerChunk, writerVersion); + } else { + return getRawIndexCreatorForMVColumn(context.getIndexDir(), chunkCompressionType, colName, + context.getFieldSpec().getDataType().getStoredType(), + context.getTotalDocs(), context.getMaxNumberOfMultiValueElements(), deriveNumDocsPerChunk, writerVersion, + context.getMaxRowLengthInBytes()); + } + } else { + // Dictionary enabled columns + if (context.getFieldSpec().isSingleValueField()) { + if (context.isSorted()) { + return new SingleValueSortedForwardIndexCreator(context.getIndexDir(), colName, + context.getCardinality()); + } else { + return new SingleValueUnsortedForwardIndexCreator(context.getIndexDir(), colName, + context.getCardinality(), context.getTotalDocs()); + } + } else { + return new MultiValueUnsortedForwardIndexCreator(context.getIndexDir(), colName, + context.getCardinality(), context.getTotalDocs(), context.getTotalNumberOfEntries()); + } + } + } + + /** + * Helper method to build the raw index creator for the column. + * Assumes that column to be indexed is single valued. + * + * @param file Output index file + * @param column Column name + * @param totalDocs Total number of documents to index + * @param lengthOfLongestEntry Length of longest entry + * @param deriveNumDocsPerChunk true if varbyte writer should auto-derive the number of rows per chunk + * @param writerVersion version to use for the raw index writer + * @return raw index creator + */ + public static ForwardIndexCreator getRawIndexCreatorForSVColumn(File file, ChunkCompressionType compressionType, + String column, FieldSpec.DataType dataType, int totalDocs, int lengthOfLongestEntry, + boolean deriveNumDocsPerChunk, int writerVersion) + throws IOException { + switch (dataType.getStoredType()) { + case INT: + case LONG: + case FLOAT: + case DOUBLE: + return new SingleValueFixedByteRawIndexCreator(file, compressionType, column, totalDocs, dataType, + writerVersion); + case BIG_DECIMAL: + case STRING: + case BYTES: + return new SingleValueVarByteRawIndexCreator(file, compressionType, column, totalDocs, dataType, + lengthOfLongestEntry, deriveNumDocsPerChunk, writerVersion); + default: + throw new UnsupportedOperationException("Data type not supported for raw indexing: " + dataType); + } + } + + /** + * Helper method to build the raw index creator for the column. + * Assumes that column to be indexed is single valued. Review Comment: adjust the method comment, as it's for the single-value method (copied from above?) ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/h3/H3IndexType.java: ########## @@ -19,75 +19,115 @@ package org.apache.pinot.segment.local.segment.index.h3; +import com.google.common.base.Preconditions; +import java.io.IOException; import java.util.Map; +import java.util.Objects; import javax.annotation.Nullable; +import org.apache.pinot.segment.local.segment.creator.impl.inv.geospatial.OffHeapH3IndexCreator; +import org.apache.pinot.segment.local.segment.creator.impl.inv.geospatial.OnHeapH3IndexCreator; +import org.apache.pinot.segment.local.segment.index.loader.ConfigurableFromIndexLoadingConfig; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.local.segment.index.loader.invertedindex.H3IndexHandler; +import org.apache.pinot.segment.local.segment.index.readers.geospatial.ImmutableH3IndexReader; import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.V1Constants; import org.apache.pinot.segment.spi.creator.IndexCreationContext; +import org.apache.pinot.segment.spi.index.AbstractIndexType; +import org.apache.pinot.segment.spi.index.ColumnConfigDeserializer; import org.apache.pinot.segment.spi.index.FieldIndexConfigs; -import org.apache.pinot.segment.spi.index.IndexCreator; +import org.apache.pinot.segment.spi.index.IndexConfigDeserializer; import org.apache.pinot.segment.spi.index.IndexHandler; -import org.apache.pinot.segment.spi.index.IndexReader; import org.apache.pinot.segment.spi.index.IndexReaderFactory; import org.apache.pinot.segment.spi.index.IndexType; import org.apache.pinot.segment.spi.index.StandardIndexes; +import org.apache.pinot.segment.spi.index.creator.GeoSpatialIndexCreator; +import org.apache.pinot.segment.spi.index.creator.H3IndexConfig; +import org.apache.pinot.segment.spi.index.reader.H3IndexReader; +import org.apache.pinot.segment.spi.index.reader.H3IndexResolution; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; import org.apache.pinot.segment.spi.store.SegmentDirectory; -import org.apache.pinot.spi.config.table.IndexConfig; +import org.apache.pinot.spi.config.table.FieldConfig; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; -public class H3IndexType implements IndexType<IndexConfig, IndexReader, IndexCreator> { +public class H3IndexType extends AbstractIndexType<H3IndexConfig, H3IndexReader, GeoSpatialIndexCreator> + implements ConfigurableFromIndexLoadingConfig<H3IndexConfig> { - public static final H3IndexType INSTANCE = new H3IndexType(); - - private H3IndexType() { + protected H3IndexType() { + super(StandardIndexes.H3_ID); } @Override - public String getId() { - return StandardIndexes.H3_ID; + public Class<H3IndexConfig> getIndexConfigClass() { + return H3IndexConfig.class; } @Override - public Class<IndexConfig> getIndexConfigClass() { - return IndexConfig.class; + public Map<String, H3IndexConfig> fromIndexLoadingConfig(IndexLoadingConfig indexLoadingConfig) { + return indexLoadingConfig.getH3IndexConfigs(); } @Override - public IndexConfig getDefaultConfig() { - return IndexConfig.DISABLED; + public H3IndexConfig getDefaultConfig() { + return H3IndexConfig.DISABLED; } @Override - public IndexConfig getConfig(TableConfig tableConfig, Schema schema) { - throw new UnsupportedOperationException(); + public ColumnConfigDeserializer<H3IndexConfig> getDeserializer() { + return IndexConfigDeserializer.fromIndexes("h3", getIndexConfigClass()) + .withExclusiveAlternative(IndexConfigDeserializer.fromIndexTypes( + FieldConfig.IndexType.H3, + ((tableConfig, fieldConfig) -> new H3IndexConfig(fieldConfig.getProperties())))); } @Override - public IndexCreator createIndexCreator(IndexCreationContext context, IndexConfig indexConfig) - throws Exception { - throw new UnsupportedOperationException(); + public GeoSpatialIndexCreator createIndexCreator(IndexCreationContext context, H3IndexConfig indexConfig) + throws IOException { + Preconditions.checkState(context.getFieldSpec().isSingleValueField(), + "H3 index is currently only supported on single-value columns"); + Preconditions.checkState(context.getFieldSpec().getDataType().getStoredType() == FieldSpec.DataType.BYTES, + "H3 index is currently only supported on BYTES columns"); + H3IndexResolution resolution = Objects.requireNonNull(indexConfig).getResolution(); + return context.isOnHeap() + ? new OnHeapH3IndexCreator(context.getIndexDir(), context.getFieldSpec().getName(), resolution) + : new OffHeapH3IndexCreator(context.getIndexDir(), context.getFieldSpec().getName(), resolution); } @Override - public IndexReaderFactory<IndexReader> getReaderFactory() { - throw new UnsupportedOperationException(); + public IndexReaderFactory<H3IndexReader> getReaderFactory() { + return ReaderFactory.INSTANCE; } @Override public IndexHandler createIndexHandler(SegmentDirectory segmentDirectory, Map<String, FieldIndexConfigs> configsByCol, @Nullable Schema schema, @Nullable TableConfig tableConfig) { - throw new UnsupportedOperationException(); + return new H3IndexHandler(segmentDirectory, configsByCol, tableConfig); } @Override public String getFileExtension(ColumnMetadata columnMetadata) { return V1Constants.Indexes.H3_INDEX_FILE_EXTENSION; } - @Override - public String toString() { - return getId(); + public static class ReaderFactory extends IndexReaderFactory.Default<H3IndexConfig, H3IndexReader> { Review Comment: no need for a read() method on H3IndexType? ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java: ########## @@ -19,60 +19,183 @@ package org.apache.pinot.segment.local.segment.index.forward; +import com.google.common.collect.Sets; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.function.Supplier; import javax.annotation.Nullable; +import org.apache.pinot.segment.local.segment.index.loader.ConfigurableFromIndexLoadingConfig; +import org.apache.pinot.segment.local.segment.index.loader.ForwardIndexHandler; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.compression.ChunkCompressionType; import org.apache.pinot.segment.spi.creator.IndexCreationContext; +import org.apache.pinot.segment.spi.index.AbstractIndexType; +import org.apache.pinot.segment.spi.index.ColumnConfigDeserializer; import org.apache.pinot.segment.spi.index.FieldIndexConfigs; -import org.apache.pinot.segment.spi.index.IndexCreator; +import org.apache.pinot.segment.spi.index.ForwardIndexConfig; +import org.apache.pinot.segment.spi.index.IndexConfigDeserializer; import org.apache.pinot.segment.spi.index.IndexHandler; -import org.apache.pinot.segment.spi.index.IndexReader; +import org.apache.pinot.segment.spi.index.IndexReaderConstraintException; import org.apache.pinot.segment.spi.index.IndexReaderFactory; -import org.apache.pinot.segment.spi.index.IndexType; import org.apache.pinot.segment.spi.index.StandardIndexes; +import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator; +import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; import org.apache.pinot.segment.spi.store.SegmentDirectory; -import org.apache.pinot.spi.config.table.IndexConfig; +import org.apache.pinot.spi.config.table.FieldConfig; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; -public class ForwardIndexType implements IndexType<IndexConfig, IndexReader, IndexCreator> { +public class ForwardIndexType + extends AbstractIndexType<ForwardIndexConfig, ForwardIndexReader, ForwardIndexCreator> + implements ConfigurableFromIndexLoadingConfig<ForwardIndexConfig> { - public static final ForwardIndexType INSTANCE = new ForwardIndexType(); - - private ForwardIndexType() { + protected ForwardIndexType() { + super(StandardIndexes.FORWARD_ID); } @Override - public String getId() { - return StandardIndexes.FORWARD_ID; + public Class<ForwardIndexConfig> getIndexConfigClass() { + return ForwardIndexConfig.class; } @Override - public Class<IndexConfig> getIndexConfigClass() { - return null; + public Map<String, ForwardIndexConfig> fromIndexLoadingConfig(IndexLoadingConfig indexLoadingConfig) { + Set<String> disabledCols = indexLoadingConfig.getForwardIndexDisabledColumns(); + Map<String, ForwardIndexConfig> result = new HashMap<>(); + Set<String> allColumns = Sets.union(disabledCols, indexLoadingConfig.getAllKnownColumns()); + for (String column : allColumns) { + ChunkCompressionType compressionType = + indexLoadingConfig.getCompressionConfigs() != null + ? indexLoadingConfig.getCompressionConfigs().get(column) + : null; + Supplier<ForwardIndexConfig> defaultConfig = () -> { Review Comment: why the need for a Supplier? it looks like the config value is only used once in the following if-else branches. ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java: ########## @@ -920,10 +929,10 @@ private void disableDictionaryAndCreateRawForwardIndex(String column, SegmentDir } private void rewriteDictToRawForwardIndex(String column, ColumnMetadata existingColMetadata, - SegmentDirectory.Writer segmentWriter, File indexDir, IndexCreatorProvider indexCreatorProvider) + SegmentDirectory.Writer segmentWriter, File indexDir) throws Exception { - try (ForwardIndexReader reader = LoaderUtils.getForwardIndexReader(segmentWriter, existingColMetadata)) { - Dictionary dictionary = LoaderUtils.getDictionary(segmentWriter, existingColMetadata); + try (ForwardIndexReader<?> reader = ForwardIndexType.getReader(segmentWriter, existingColMetadata)) { + Dictionary dictionary = DictionaryIndexType.read(segmentWriter, existingColMetadata); Review Comment: use StandardIndexes.forward() and dictionary() here too? Make ForwardIndexType etc. those conctete index type implementation classes pkg private to avoid accidental direct reference? ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java: ########## @@ -19,60 +19,183 @@ package org.apache.pinot.segment.local.segment.index.forward; +import com.google.common.collect.Sets; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.function.Supplier; import javax.annotation.Nullable; +import org.apache.pinot.segment.local.segment.index.loader.ConfigurableFromIndexLoadingConfig; +import org.apache.pinot.segment.local.segment.index.loader.ForwardIndexHandler; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.compression.ChunkCompressionType; import org.apache.pinot.segment.spi.creator.IndexCreationContext; +import org.apache.pinot.segment.spi.index.AbstractIndexType; +import org.apache.pinot.segment.spi.index.ColumnConfigDeserializer; import org.apache.pinot.segment.spi.index.FieldIndexConfigs; -import org.apache.pinot.segment.spi.index.IndexCreator; +import org.apache.pinot.segment.spi.index.ForwardIndexConfig; +import org.apache.pinot.segment.spi.index.IndexConfigDeserializer; import org.apache.pinot.segment.spi.index.IndexHandler; -import org.apache.pinot.segment.spi.index.IndexReader; +import org.apache.pinot.segment.spi.index.IndexReaderConstraintException; import org.apache.pinot.segment.spi.index.IndexReaderFactory; -import org.apache.pinot.segment.spi.index.IndexType; import org.apache.pinot.segment.spi.index.StandardIndexes; +import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator; +import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; import org.apache.pinot.segment.spi.store.SegmentDirectory; -import org.apache.pinot.spi.config.table.IndexConfig; +import org.apache.pinot.spi.config.table.FieldConfig; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; -public class ForwardIndexType implements IndexType<IndexConfig, IndexReader, IndexCreator> { +public class ForwardIndexType + extends AbstractIndexType<ForwardIndexConfig, ForwardIndexReader, ForwardIndexCreator> + implements ConfigurableFromIndexLoadingConfig<ForwardIndexConfig> { - public static final ForwardIndexType INSTANCE = new ForwardIndexType(); - - private ForwardIndexType() { + protected ForwardIndexType() { + super(StandardIndexes.FORWARD_ID); } @Override - public String getId() { - return StandardIndexes.FORWARD_ID; + public Class<ForwardIndexConfig> getIndexConfigClass() { + return ForwardIndexConfig.class; } @Override - public Class<IndexConfig> getIndexConfigClass() { - return null; + public Map<String, ForwardIndexConfig> fromIndexLoadingConfig(IndexLoadingConfig indexLoadingConfig) { + Set<String> disabledCols = indexLoadingConfig.getForwardIndexDisabledColumns(); + Map<String, ForwardIndexConfig> result = new HashMap<>(); + Set<String> allColumns = Sets.union(disabledCols, indexLoadingConfig.getAllKnownColumns()); + for (String column : allColumns) { + ChunkCompressionType compressionType = + indexLoadingConfig.getCompressionConfigs() != null + ? indexLoadingConfig.getCompressionConfigs().get(column) + : null; + Supplier<ForwardIndexConfig> defaultConfig = () -> { + if (compressionType == null) { + return ForwardIndexConfig.DEFAULT; + } else { + return new ForwardIndexConfig.Builder().withCompressionType(compressionType).build(); + } + }; + if (!disabledCols.contains(column)) { + TableConfig tableConfig = indexLoadingConfig.getTableConfig(); + if (tableConfig == null) { + result.put(column, defaultConfig.get()); + } else { + List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList(); + if (fieldConfigList == null) { + result.put(column, defaultConfig.get()); + continue; + } + FieldConfig fieldConfig = fieldConfigList.stream() + .filter(fc -> fc.getName().equals(column)) + .findAny() + .orElse(null); + if (fieldConfig == null) { + result.put(column, defaultConfig.get()); + continue; + } + ForwardIndexConfig.Builder builder = new ForwardIndexConfig.Builder(); + if (compressionType != null) { + builder.withCompressionType(compressionType); + } else { + FieldConfig.CompressionCodec compressionCodec = fieldConfig.getCompressionCodec(); + if (compressionCodec != null) { + builder.withCompressionType(ChunkCompressionType.valueOf(compressionCodec.name())); + } + } + + result.put(column, builder.build()); + } + } else { + result.put(column, ForwardIndexConfig.DISABLED); + } + } + return result; } @Override - public IndexConfig getDefaultConfig() { - return IndexConfig.ENABLED; + public ForwardIndexConfig getDefaultConfig() { + return ForwardIndexConfig.DEFAULT; } @Override - public IndexConfig getConfig(TableConfig tableConfig, Schema schema) { - throw new UnsupportedOperationException(); + public ColumnConfigDeserializer<ForwardIndexConfig> getDeserializer() { + // reads tableConfig.fieldConfigList and decides what to create using the FieldConfig properties and encoding + ColumnConfigDeserializer<ForwardIndexConfig> fromFieldConfig = IndexConfigDeserializer.fromCollection( + TableConfig::getFieldConfigList, + (accum, fieldConfig) -> { + Map<String, String> properties = fieldConfig.getProperties(); + if (properties != null && isDisabled(properties)) { + accum.put(fieldConfig.getName(), ForwardIndexConfig.DISABLED); + } else if (fieldConfig.getEncodingType() == FieldConfig.EncodingType.RAW) { + accum.put(fieldConfig.getName(), createConfigFromFieldConfig(fieldConfig)); + } + } + ); + return IndexConfigDeserializer.fromIndexes("forward", getIndexConfigClass()) + .withExclusiveAlternative(fromFieldConfig); + } + + private boolean isDisabled(Map<String, String> props) { + return Boolean.parseBoolean( + props.getOrDefault(FieldConfig.FORWARD_INDEX_DISABLED, FieldConfig.DEFAULT_FORWARD_INDEX_DISABLED)); + } + + private ForwardIndexConfig createConfigFromFieldConfig(FieldConfig fieldConfig) { + if (fieldConfig.getEncodingType() != FieldConfig.EncodingType.RAW) { + throw new IllegalArgumentException("Cannot build a forward index on a field whose encoding is " Review Comment: hmm.. what about DICTIONARY? fwd index can be built with dict encoded column ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java: ########## @@ -438,10 +442,9 @@ private void rewriteRawForwardIndexForCompressionChange(String column, SegmentDi } private void rewriteRawMVForwardIndexForCompressionChange(String column, ColumnMetadata existingColMetadata, - File indexDir, SegmentDirectory.Writer segmentWriter, IndexCreatorProvider indexCreatorProvider, - ChunkCompressionType newCompressionType) + File indexDir, SegmentDirectory.Writer segmentWriter, ChunkCompressionType newCompressionType) throws Exception { - try (ForwardIndexReader reader = LoaderUtils.getForwardIndexReader(segmentWriter, existingColMetadata)) { + try (ForwardIndexReader<?> reader = ForwardIndexType.getReader(segmentWriter, existingColMetadata)) { Review Comment: should we use StandardIndexes.forward() to access index reader? ########## pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java: ########## @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.segment.spi.index; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import javax.annotation.Nullable; +import org.apache.pinot.segment.spi.compression.ChunkCompressionType; +import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.config.table.IndexConfig; +import org.apache.pinot.spi.utils.JsonUtils; + + +public class ForwardIndexConfig extends IndexConfig { + public static final int DEFAULT_RAW_WRITER_VERSION = 2; + public static final ForwardIndexConfig DISABLED = new ForwardIndexConfig(false, null, null, null); + + @Nullable + private final ChunkCompressionType _chunkCompressionType; + private final boolean _deriveNumDocsPerChunk; + private final int _rawIndexWriterVersion; + + public static final ForwardIndexConfig DEFAULT = new Builder().build(); Review Comment: move up to be close with other constant values. ########## pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/TextIndexCreator.java: ########## @@ -18,14 +18,22 @@ */ package org.apache.pinot.segment.spi.index.creator; -import java.io.Closeable; import java.io.IOException; +import org.apache.pinot.segment.spi.index.IndexCreator; /** - * Index creator for text index. + * Index creator for both text and FST indexes. Review Comment: update comment as we have added FSTIndexCreator separately ########## pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java: ########## @@ -55,7 +56,7 @@ * SegmentDirectory.Reader reader = * segmentDir.createReader(); * try { - * PinotDataBufferOld col1Dictionary = reader.getIndexFor("col1Dictionary", ColumnIndexType.DICTIONARY); + * PinotDataBufferOld col1Dictionary = reader.getIndexFor("col1Dictionary", DictoionaryIndexType.INSTANCE); Review Comment: typo but should this be StandardIndexes.dictionary()? or you left this one here as alternative way to access index. ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java: ########## @@ -129,15 +129,17 @@ private void createAndSealBloomFilterForDictionaryColumn(BloomFilterCreatorProvi } } - private void createAndSealBloomFilterForNonDictionaryColumn(BloomFilterCreatorProvider indexCreatorProvider, - File indexDir, ColumnMetadata columnMetadata, BloomFilterConfig bloomFilterConfig, - SegmentDirectory.Writer segmentWriter) + private void createAndSealBloomFilterForNonDictionaryColumn(File indexDir, ColumnMetadata columnMetadata, + BloomFilterConfig bloomFilterConfig, SegmentDirectory.Writer segmentWriter) throws Exception { int numDocs = columnMetadata.getTotalDocs(); - try (BloomFilterCreator bloomFilterCreator = indexCreatorProvider.newBloomFilterCreator( - IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(columnMetadata).build() - .forBloomFilter(bloomFilterConfig)); - ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(segmentWriter, columnMetadata); + IndexCreationContext context = IndexCreationContext.builder() + .withIndexDir(indexDir) + .withColumnMetadata(columnMetadata) + .build(); + try (BloomFilterCreator bloomFilterCreator = StandardIndexes.bloomFilter() + .createIndexCreator(context, bloomFilterConfig); + ForwardIndexReader forwardIndexReader = ForwardIndexType.getReader(segmentWriter, columnMetadata); Review Comment: use StandardIndexes.forward() instead? ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/inverted/InvertedIndexType.java: ########## @@ -86,7 +130,46 @@ public String getFileExtension(ColumnMetadata columnMetadata) { } @Override - public String toString() { - return getId(); + public IndexHandler createIndexHandler(SegmentDirectory segmentDirectory, Map<String, FieldIndexConfigs> configsByCol, + @Nullable Schema schema, @Nullable TableConfig tableConfig) { + return new InvertedIndexHandler(segmentDirectory, configsByCol, tableConfig); + } + + public static class ReaderFactory implements IndexReaderFactory<InvertedIndexReader> { + public static final ReaderFactory INSTANCE = new ReaderFactory(); + + private ReaderFactory() { + } + + @Override + public InvertedIndexReader createIndexReader(SegmentDirectory.Reader segmentReader, + FieldIndexConfigs fieldIndexConfigs, ColumnMetadata metadata) + throws IOException, IndexReaderConstraintException { + if (fieldIndexConfigs == null || !fieldIndexConfigs.getConfig(StandardIndexes.inverted()).isEnabled()) { + return null; + } + if (!metadata.hasDictionary()) { + throw new IllegalStateException("Column " + metadata.getColumnName() + " cannot be indexed by an inverted " + + "index if it has no dictionary"); + } + if (metadata.isSorted() && metadata.isSingleValue()) { + ForwardIndexReader fwdReader = StandardIndexes.forward().getReaderFactory() + .createIndexReader(segmentReader, fieldIndexConfigs, metadata); + Preconditions.checkState(fwdReader instanceof SortedIndexReader); + return (SortedIndexReader) fwdReader; + } else { + return createSkippingForward(segmentReader, metadata); + } + } + + public InvertedIndexReader createSkippingForward(SegmentDirectory.Reader segmentReader, ColumnMetadata metadata) Review Comment: got it. I'd prefer to just call it createIndexReader, overloading the method, hiding impl details from the method name. ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java: ########## @@ -104,20 +104,31 @@ public void process() } // Update single-column indices, like inverted index, json index etc. - IndexCreatorProvider indexCreatorProvider = IndexingOverrides.getIndexCreatorProvider(); List<IndexHandler> indexHandlers = new ArrayList<>(); - for (ColumnIndexType type : ColumnIndexType.values()) { - IndexHandler handler = - IndexHandlerFactory.getIndexHandler(type, _segmentDirectory, _indexLoadingConfig, _schema); - indexHandlers.add(handler); - // TODO: Find a way to ensure ForwardIndexHandler is always executed before other handlers instead of - // relying on enum ordering. - handler.updateIndices(segmentWriter, indexCreatorProvider); - // ForwardIndexHandler may modify the segment metadata while rewriting forward index to create / remove a - // dictionary. Other IndexHandler classes may modify the segment metadata while creating a temporary forward - // index to generate their respective indexes from if the forward index was disabled. This new metadata is - // needed to construct other indexes like RangeIndex. - _segmentMetadata = _segmentDirectory.getSegmentMetadata(); + + // We cannot just create all the index handlers in a random order. + // Specifically, ForwardIndexHandler needs to be executed first. This is because it modifies the segment metadata + // while rewriting forward index to create a dictionary. Some other handlers (like the range one) assume that + // metadata was already been modified by ForwardIndexHandler. + IndexHandler forwardHandler = createHandler(StandardIndexes.forward()); + indexHandlers.add(forwardHandler); + forwardHandler.updateIndices(segmentWriter); + + // Now that ForwardIndexHandler.updateIndeces has been updated, we can run all other indexes in any order Review Comment: s/updateIndeces/updateIndices ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java: ########## @@ -104,20 +104,31 @@ public void process() } // Update single-column indices, like inverted index, json index etc. - IndexCreatorProvider indexCreatorProvider = IndexingOverrides.getIndexCreatorProvider(); List<IndexHandler> indexHandlers = new ArrayList<>(); - for (ColumnIndexType type : ColumnIndexType.values()) { - IndexHandler handler = - IndexHandlerFactory.getIndexHandler(type, _segmentDirectory, _indexLoadingConfig, _schema); - indexHandlers.add(handler); - // TODO: Find a way to ensure ForwardIndexHandler is always executed before other handlers instead of - // relying on enum ordering. - handler.updateIndices(segmentWriter, indexCreatorProvider); - // ForwardIndexHandler may modify the segment metadata while rewriting forward index to create / remove a - // dictionary. Other IndexHandler classes may modify the segment metadata while creating a temporary forward - // index to generate their respective indexes from if the forward index was disabled. This new metadata is - // needed to construct other indexes like RangeIndex. - _segmentMetadata = _segmentDirectory.getSegmentMetadata(); + + // We cannot just create all the index handlers in a random order. + // Specifically, ForwardIndexHandler needs to be executed first. This is because it modifies the segment metadata + // while rewriting forward index to create a dictionary. Some other handlers (like the range one) assume that + // metadata was already been modified by ForwardIndexHandler. + IndexHandler forwardHandler = createHandler(StandardIndexes.forward()); + indexHandlers.add(forwardHandler); + forwardHandler.updateIndices(segmentWriter); + + // Now that ForwardIndexHandler.updateIndeces has been updated, we can run all other indexes in any order + + _segmentMetadata = new SegmentMetadataImpl(indexDir); + _segmentDirectory.reloadMetadata(); Review Comment: curious why reloadMetadata() is required now? ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java: ########## @@ -264,51 +265,42 @@ private void createBloomFilterForColumn(SegmentDirectory.Writer segmentWriter, C if (!columnMetadata.hasDictionary()) { // Create a temporary forward index if it is disabled and does not exist - columnMetadata = createForwardIndexIfNeeded(segmentWriter, columnName, indexCreatorProvider, true); + columnMetadata = createForwardIndexIfNeeded(segmentWriter, columnName, true); } // Create new bloom filter for the column. BloomFilterConfig bloomFilterConfig = _bloomFilterConfigs.get(columnName); LOGGER.info("Creating new bloom filter for segment: {}, column: {} with config: {}", segmentName, columnName, bloomFilterConfig); if (columnMetadata.hasDictionary()) { - createAndSealBloomFilterForDictionaryColumn(bloomFilterCreatorProvider, indexDir, columnMetadata, - bloomFilterConfig, segmentWriter); + createAndSealBloomFilterForDictionaryColumn(indexDir, columnMetadata, bloomFilterConfig, segmentWriter); } else { - createAndSealBloomFilterForNonDictionaryColumn(bloomFilterCreatorProvider, indexDir, columnMetadata, - bloomFilterConfig, segmentWriter); + createAndSealBloomFilterForNonDictionaryColumn(indexDir, columnMetadata, bloomFilterConfig, segmentWriter); } // For v3, write the generated bloom filter file into the single file and remove it. if (_segmentDirectory.getSegmentMetadata().getVersion() == SegmentVersion.v3) { - LoaderUtils.writeIndexToV3Format(segmentWriter, columnName, bloomFilterFile, ColumnIndexType.BLOOM_FILTER); + LoaderUtils.writeIndexToV3Format(segmentWriter, columnName, bloomFilterFile, StandardIndexes.bloomFilter()); } // Delete the marker file. FileUtils.deleteQuietly(bloomFilterFileInProgress); LOGGER.info("Created bloom filter for segment: {}, column: {}", segmentName, columnName); } - private BaseImmutableDictionary getDictionaryReader(ColumnMetadata columnMetadata, - SegmentDirectory.Writer segmentWriter) + private Dictionary getDictionaryReader(ColumnMetadata columnMetadata, SegmentDirectory.Writer segmentWriter) throws IOException { - PinotDataBuffer dictionaryBuffer = - segmentWriter.getIndexFor(columnMetadata.getColumnName(), ColumnIndexType.DICTIONARY); - int cardinality = columnMetadata.getCardinality(); DataType dataType = columnMetadata.getDataType(); + switch (dataType) { case INT: - return new IntDictionary(dictionaryBuffer, cardinality); case LONG: - return new LongDictionary(dictionaryBuffer, cardinality); case FLOAT: - return new FloatDictionary(dictionaryBuffer, cardinality); case DOUBLE: - return new DoubleDictionary(dictionaryBuffer, cardinality); case STRING: - return new StringDictionary(dictionaryBuffer, cardinality, columnMetadata.getColumnMaxLength()); case BYTES: - return new BytesDictionary(dictionaryBuffer, cardinality, columnMetadata.getColumnMaxLength()); + PinotDataBuffer buf = segmentWriter.getIndexFor(columnMetadata.getColumnName(), StandardIndexes.dictionary()); + return DictionaryIndexType.read(buf, columnMetadata, DictionaryIndexConfig.DEFAULT_OFFHEAP); Review Comment: StandardIndexes.dictionary() to access the index reader? and same for a few places in other index handler classes, to use StandardIndexes.xxx() to access the index handler? ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/nullvalue/NullValueIndexType.java: ########## @@ -60,34 +66,56 @@ public IndexConfig getDefaultConfig() { } @Override - public IndexConfig getConfig(TableConfig tableConfig, Schema schema) { - throw new UnsupportedOperationException(); + public ColumnConfigDeserializer<IndexConfig> getDeserializer() { + return IndexConfigDeserializer.fromIndexes("null", getIndexConfigClass()) + .withFallbackAlternative( + IndexConfigDeserializer.ifIndexingConfig( + IndexConfigDeserializer.alwaysCall((TableConfig tableConfig, Schema schema) -> + tableConfig.getIndexingConfig().isNullHandlingEnabled() + ? IndexConfig.ENABLED + : IndexConfig.DISABLED)) + ); } - @Override - public IndexCreator createIndexCreator(IndexCreationContext context, IndexConfig indexConfig) - throws Exception { - throw new UnsupportedOperationException(); + public NullValueVectorCreator createIndexCreator(File indexDir, String columnName) { + return new NullValueVectorCreator(indexDir, columnName); } @Override - public IndexReaderFactory<IndexReader> getReaderFactory() { - throw new UnsupportedOperationException(); + public IndexReaderFactory<NullValueVectorReader> getReaderFactory() { + return ReaderFactory.INSTANCE; } @Override public IndexHandler createIndexHandler(SegmentDirectory segmentDirectory, Map<String, FieldIndexConfigs> configsByCol, @Nullable Schema schema, @Nullable TableConfig tableConfig) { - throw new UnsupportedOperationException(); + return IndexHandler.NoOp.INSTANCE; } @Override public String getFileExtension(ColumnMetadata columnMetadata) { return V1Constants.Indexes.NULLVALUE_VECTOR_FILE_EXTENSION; } - @Override - public String toString() { - return getId(); + public static class ReaderFactory implements IndexReaderFactory<NullValueVectorReader> { + + public static final ReaderFactory INSTANCE = new ReaderFactory(); + + private ReaderFactory() { + } + + @Nullable + @Override + public NullValueVectorReader createIndexReader(SegmentDirectory.Reader segmentReader, + FieldIndexConfigs fieldIndexConfigs, ColumnMetadata metadata) + throws IOException { + // For historical and test reasons, NullValueIndexType doesn't really care about its config + // if there is a buffer for this index, it is read even if the config explicitly ask to disable it. Review Comment: add a TODO to fix the legacy logic? ########## pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java: ########## @@ -112,6 +143,16 @@ public Builder withColumnMetadata(ColumnMetadata columnMetadata) { .withMaxNumberOfMultiValueElements(columnMetadata.getMaxNumberOfMultiValues()); } + public Builder withIsOptimizedDictionary(boolean optimized) { Review Comment: nit: should this be called withOptimizeDictionary? i.e. a hint to optimize dictionary ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/text/TextIndexConfigBuilder.java: ########## @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.segment.local.segment.index.text; + +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.pinot.segment.local.segment.store.TextIndexUtils; +import org.apache.pinot.segment.spi.index.TextIndexConfig; +import org.apache.pinot.spi.config.table.FSTType; +import org.apache.pinot.spi.config.table.FieldConfig; + + +public class TextIndexConfigBuilder extends TextIndexConfig.AbstractBuilder { + public TextIndexConfigBuilder(@Nullable FSTType fstType) { + super(fstType); + } + + public TextIndexConfigBuilder(TextIndexConfig other) { + super(other); + } + + @Override + public TextIndexConfig.AbstractBuilder withProperties(@Nullable Map<String, String> textIndexProperties) { + if (textIndexProperties != null) { + if (Boolean.parseBoolean(textIndexProperties.get(FieldConfig.TEXT_INDEX_NO_RAW_DATA))) { + _rawValueForTextIndex = textIndexProperties.get(FieldConfig.TEXT_INDEX_RAW_VALUE); + if (_rawValueForTextIndex == null) { + _rawValueForTextIndex = FieldConfig.TEXT_INDEX_DEFAULT_RAW_VALUE; + } + } + _enableQueryCache = Boolean.parseBoolean(textIndexProperties.get(FieldConfig.TEXT_INDEX_ENABLE_QUERY_CACHE)); + _useANDForMultiTermQueries = Boolean.parseBoolean( + textIndexProperties.get(FieldConfig.TEXT_INDEX_USE_AND_FOR_MULTI_TERM_QUERIES)); + _stopWordsInclude = TextIndexUtils.extractStopWordsInclude(textIndexProperties); + _stopWordsExclude = TextIndexUtils.extractStopWordsExclude(textIndexProperties); + } + return this; + } + + @Override + public TextIndexConfigBuilder withStopWordsInclude(List<String> stopWordsInclude) { Review Comment: looks like no need to overwrite this and next two methods. ########## pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java: ########## @@ -250,6 +208,47 @@ public SegmentGeneratorConfig(TableConfig tableConfig, Schema schema) { _rowTimeValueCheck = ingestionConfig.isRowTimeValueCheck(); _segmentTimeValueCheck = ingestionConfig.isSegmentTimeValueCheck(); } + + _indexConfigsByColName = FieldIndexConfigsUtil.createIndexConfigsByColName(tableConfig, schema); + + if (indexingConfig != null) { + // NOTE: There are 2 ways to configure creating inverted index during segment generation: + // - Set 'generate.inverted.index.before.push' to 'true' in custom config (deprecated) + // - Enable 'createInvertedIndexDuringSegmentGeneration' in indexing config + // TODO: Clean up the table configs with the deprecated settings, and always use the one in the indexing config + // TODO 2: Decide what to do with this. Index-spi is based on the idea that TableConfig is the source of truth + if (indexingConfig.getInvertedIndexColumns() != null) { + Map<String, String> customConfigs = tableConfig.getCustomConfig().getCustomConfigs(); + if ((customConfigs != null && Boolean.parseBoolean(customConfigs.get("generate.inverted.index.before.push"))) + || indexingConfig.isCreateInvertedIndexDuringSegmentGeneration()) { + setIndexOn(StandardIndexes.inverted(), IndexConfig.ENABLED, indexingConfig.getInvertedIndexColumns()); + } + } + } + } + + public <C extends IndexConfig> void setIndexOn(IndexType<C, ?, ?> indexType, C config, String... columns) { + setIndexOn(indexType, config, Arrays.asList(columns)); + } + + @VisibleForTesting + public <C extends IndexConfig> void setIndexOn(IndexType<C, ?, ?> indexType, C config, + @Nullable Iterable<String> columns) { + if (columns == null) { + return; + } + for (String column : columns) { + _indexConfigsByColName.compute(column, (key, old) -> { + FieldIndexConfigs.Builder builder; + if (old == null) { + builder = new FieldIndexConfigs.Builder(); + } else { + builder = new FieldIndexConfigs.Builder(old); + } + return builder.add(indexType, config) + .build(); Review Comment: fold `.build()` up? ########## pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java: ########## @@ -250,6 +208,47 @@ public SegmentGeneratorConfig(TableConfig tableConfig, Schema schema) { _rowTimeValueCheck = ingestionConfig.isRowTimeValueCheck(); _segmentTimeValueCheck = ingestionConfig.isSegmentTimeValueCheck(); } + + _indexConfigsByColName = FieldIndexConfigsUtil.createIndexConfigsByColName(tableConfig, schema); + + if (indexingConfig != null) { + // NOTE: There are 2 ways to configure creating inverted index during segment generation: + // - Set 'generate.inverted.index.before.push' to 'true' in custom config (deprecated) + // - Enable 'createInvertedIndexDuringSegmentGeneration' in indexing config + // TODO: Clean up the table configs with the deprecated settings, and always use the one in the indexing config + // TODO 2: Decide what to do with this. Index-spi is based on the idea that TableConfig is the source of truth + if (indexingConfig.getInvertedIndexColumns() != null) { + Map<String, String> customConfigs = tableConfig.getCustomConfig().getCustomConfigs(); + if ((customConfigs != null && Boolean.parseBoolean(customConfigs.get("generate.inverted.index.before.push"))) Review Comment: define this config name in some existing Config or Constants util classes? ########## pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java: ########## @@ -441,53 +336,23 @@ public void setRawIndexCreationColumns(List<String> rawIndexCreationColumns) { _rawIndexCreationColumns.addAll(rawIndexCreationColumns); } - // NOTE: Should always be extracted from the table config - @Deprecated - public void setInvertedIndexCreationColumns(List<String> indexCreationColumns) { - Preconditions.checkNotNull(indexCreationColumns); - _invertedIndexCreationColumns.addAll(indexCreationColumns); - } - - /** - * Used by org.apache.pinot.core.realtime.converter.RealtimeSegmentConverter - * and text search functional tests - * @param textIndexCreationColumns list of columns with text index creation enabled - */ - public void setTextIndexCreationColumns(List<String> textIndexCreationColumns) { - if (textIndexCreationColumns != null) { - _textIndexCreationColumns.addAll(textIndexCreationColumns); - } - } - @VisibleForTesting public void setRangeIndexCreationColumns(List<String> rangeIndexCreationColumns) { if (rangeIndexCreationColumns != null) { - _rangeIndexCreationColumns.addAll(rangeIndexCreationColumns); + setIndexOn(StandardIndexes.range(), new RangeIndexConfig(2), rangeIndexCreationColumns); Review Comment: s/2/the_version_constant ########## pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/RangeIndexReader.java: ########## @@ -18,15 +18,16 @@ */ package org.apache.pinot.segment.spi.index.reader; -import java.io.Closeable; import javax.annotation.Nullable; +import org.apache.pinot.segment.spi.index.IndexReader; + Review Comment: extra space? ########## pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java: ########## @@ -317,7 +317,7 @@ private void convertOneColumn(IndexSegment segment, String column, File newSegme int numDocs = segment.getSegmentMetadata().getTotalDocs(); int lengthOfLongestEntry = (storedType == DataType.STRING) ? getLengthOfLongestEntry(dictionary) : -1; - try (ForwardIndexCreator rawIndexCreator = DefaultIndexCreatorProvider + try (ForwardIndexCreator rawIndexCreator = ForwardIndexCreatorFactory Review Comment: should we use StandardIndexes.forward() to get reference of index creator, instead of referring to ForwardIndexCreatorFactory class directly? perhaps just make those index specific factories pkg private? ########## pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/provider/MutableInvertedIndexProvider.java: ########## @@ -22,5 +22,6 @@ public interface MutableInvertedIndexProvider { + @Deprecated Review Comment: as this is deprecated, what's the new way? maybe comment it here for later reference -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org