klsince commented on code in PR #10184: URL: https://github.com/apache/pinot/pull/10184#discussion_r1153887239
########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java: ########## @@ -19,60 +19,183 @@ package org.apache.pinot.segment.local.segment.index.forward; +import com.google.common.collect.Sets; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.function.Supplier; import javax.annotation.Nullable; +import org.apache.pinot.segment.local.segment.index.loader.ConfigurableFromIndexLoadingConfig; +import org.apache.pinot.segment.local.segment.index.loader.ForwardIndexHandler; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.V1Constants; +import org.apache.pinot.segment.spi.compression.ChunkCompressionType; import org.apache.pinot.segment.spi.creator.IndexCreationContext; +import org.apache.pinot.segment.spi.index.AbstractIndexType; +import org.apache.pinot.segment.spi.index.ColumnConfigDeserializer; import org.apache.pinot.segment.spi.index.FieldIndexConfigs; -import org.apache.pinot.segment.spi.index.IndexCreator; +import org.apache.pinot.segment.spi.index.ForwardIndexConfig; +import org.apache.pinot.segment.spi.index.IndexConfigDeserializer; import org.apache.pinot.segment.spi.index.IndexHandler; -import org.apache.pinot.segment.spi.index.IndexReader; +import org.apache.pinot.segment.spi.index.IndexReaderConstraintException; import org.apache.pinot.segment.spi.index.IndexReaderFactory; -import org.apache.pinot.segment.spi.index.IndexType; import org.apache.pinot.segment.spi.index.StandardIndexes; +import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator; +import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; import org.apache.pinot.segment.spi.store.SegmentDirectory; -import org.apache.pinot.spi.config.table.IndexConfig; +import org.apache.pinot.spi.config.table.FieldConfig; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; -public class ForwardIndexType implements IndexType<IndexConfig, IndexReader, IndexCreator> { +public class ForwardIndexType + extends AbstractIndexType<ForwardIndexConfig, ForwardIndexReader, ForwardIndexCreator> + implements ConfigurableFromIndexLoadingConfig<ForwardIndexConfig> { - public static final ForwardIndexType INSTANCE = new ForwardIndexType(); - - private ForwardIndexType() { + protected ForwardIndexType() { + super(StandardIndexes.FORWARD_ID); } @Override - public String getId() { - return StandardIndexes.FORWARD_ID; + public Class<ForwardIndexConfig> getIndexConfigClass() { + return ForwardIndexConfig.class; } @Override - public Class<IndexConfig> getIndexConfigClass() { - return null; + public Map<String, ForwardIndexConfig> fromIndexLoadingConfig(IndexLoadingConfig indexLoadingConfig) { + Set<String> disabledCols = indexLoadingConfig.getForwardIndexDisabledColumns(); + Map<String, ForwardIndexConfig> result = new HashMap<>(); + Set<String> allColumns = Sets.union(disabledCols, indexLoadingConfig.getAllKnownColumns()); + for (String column : allColumns) { + ChunkCompressionType compressionType = + indexLoadingConfig.getCompressionConfigs() != null + ? indexLoadingConfig.getCompressionConfigs().get(column) + : null; + Supplier<ForwardIndexConfig> defaultConfig = () -> { + if (compressionType == null) { + return ForwardIndexConfig.DEFAULT; + } else { + return new ForwardIndexConfig.Builder().withCompressionType(compressionType).build(); + } + }; + if (!disabledCols.contains(column)) { + TableConfig tableConfig = indexLoadingConfig.getTableConfig(); + if (tableConfig == null) { + result.put(column, defaultConfig.get()); + } else { + List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList(); + if (fieldConfigList == null) { + result.put(column, defaultConfig.get()); + continue; + } + FieldConfig fieldConfig = fieldConfigList.stream() + .filter(fc -> fc.getName().equals(column)) + .findAny() + .orElse(null); + if (fieldConfig == null) { + result.put(column, defaultConfig.get()); + continue; + } + ForwardIndexConfig.Builder builder = new ForwardIndexConfig.Builder(); + if (compressionType != null) { + builder.withCompressionType(compressionType); + } else { + FieldConfig.CompressionCodec compressionCodec = fieldConfig.getCompressionCodec(); + if (compressionCodec != null) { + builder.withCompressionType(ChunkCompressionType.valueOf(compressionCodec.name())); + } + } + + result.put(column, builder.build()); + } + } else { + result.put(column, ForwardIndexConfig.DISABLED); + } + } + return result; } @Override - public IndexConfig getDefaultConfig() { - return IndexConfig.ENABLED; + public ForwardIndexConfig getDefaultConfig() { + return ForwardIndexConfig.DEFAULT; } @Override - public IndexConfig getConfig(TableConfig tableConfig, Schema schema) { - throw new UnsupportedOperationException(); + public ColumnConfigDeserializer<ForwardIndexConfig> getDeserializer() { + // reads tableConfig.fieldConfigList and decides what to create using the FieldConfig properties and encoding + ColumnConfigDeserializer<ForwardIndexConfig> fromFieldConfig = IndexConfigDeserializer.fromCollection( + TableConfig::getFieldConfigList, + (accum, fieldConfig) -> { + Map<String, String> properties = fieldConfig.getProperties(); + if (properties != null && isDisabled(properties)) { + accum.put(fieldConfig.getName(), ForwardIndexConfig.DISABLED); + } else if (fieldConfig.getEncodingType() == FieldConfig.EncodingType.RAW) { + accum.put(fieldConfig.getName(), createConfigFromFieldConfig(fieldConfig)); + } + } + ); + return IndexConfigDeserializer.fromIndexes("forward", getIndexConfigClass()) + .withExclusiveAlternative(fromFieldConfig); + } + + private boolean isDisabled(Map<String, String> props) { + return Boolean.parseBoolean( + props.getOrDefault(FieldConfig.FORWARD_INDEX_DISABLED, FieldConfig.DEFAULT_FORWARD_INDEX_DISABLED)); + } + + private ForwardIndexConfig createConfigFromFieldConfig(FieldConfig fieldConfig) { + if (fieldConfig.getEncodingType() != FieldConfig.EncodingType.RAW) { + throw new IllegalArgumentException("Cannot build a forward index on a field whose encoding is " Review Comment: Not a new feature, but we might be talking about different things. A column can be dict-encoded and have its fwd index made of dict ids; or have its fwd index simply made of raw values (i.e. encodingType=RAW). When I saw this if-check, I was curious why this method only deals with RAW. ``` public enum EncodingType { RAW, DICTIONARY } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org