klsince commented on code in PR #10184:
URL: https://github.com/apache/pinot/pull/10184#discussion_r1153887239


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java:
##########
@@ -19,60 +19,183 @@
 
 package org.apache.pinot.segment.local.segment.index.forward;
 
+import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
 import javax.annotation.Nullable;
+import 
org.apache.pinot.segment.local.segment.index.loader.ConfigurableFromIndexLoadingConfig;
+import org.apache.pinot.segment.local.segment.index.loader.ForwardIndexHandler;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.index.AbstractIndexType;
+import org.apache.pinot.segment.spi.index.ColumnConfigDeserializer;
 import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
-import org.apache.pinot.segment.spi.index.IndexCreator;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
+import org.apache.pinot.segment.spi.index.IndexConfigDeserializer;
 import org.apache.pinot.segment.spi.index.IndexHandler;
-import org.apache.pinot.segment.spi.index.IndexReader;
+import org.apache.pinot.segment.spi.index.IndexReaderConstraintException;
 import org.apache.pinot.segment.spi.index.IndexReaderFactory;
-import org.apache.pinot.segment.spi.index.IndexType;
 import org.apache.pinot.segment.spi.index.StandardIndexes;
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
-import org.apache.pinot.spi.config.table.IndexConfig;
+import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 
 
-public class ForwardIndexType implements IndexType<IndexConfig, IndexReader, 
IndexCreator> {
+public class ForwardIndexType
+    extends AbstractIndexType<ForwardIndexConfig, ForwardIndexReader, 
ForwardIndexCreator>
+    implements ConfigurableFromIndexLoadingConfig<ForwardIndexConfig> {
 
-  public static final ForwardIndexType INSTANCE = new ForwardIndexType();
-
-  private ForwardIndexType() {
+  protected ForwardIndexType() {
+    super(StandardIndexes.FORWARD_ID);
   }
 
   @Override
-  public String getId() {
-    return StandardIndexes.FORWARD_ID;
+  public Class<ForwardIndexConfig> getIndexConfigClass() {
+    return ForwardIndexConfig.class;
   }
 
   @Override
-  public Class<IndexConfig> getIndexConfigClass() {
-    return null;
+  public Map<String, ForwardIndexConfig> 
fromIndexLoadingConfig(IndexLoadingConfig indexLoadingConfig) {
+    Set<String> disabledCols = 
indexLoadingConfig.getForwardIndexDisabledColumns();
+    Map<String, ForwardIndexConfig> result = new HashMap<>();
+    Set<String> allColumns = Sets.union(disabledCols, 
indexLoadingConfig.getAllKnownColumns());
+    for (String column : allColumns) {
+      ChunkCompressionType compressionType =
+          indexLoadingConfig.getCompressionConfigs() != null
+              ? indexLoadingConfig.getCompressionConfigs().get(column)
+              : null;
+      Supplier<ForwardIndexConfig> defaultConfig = () -> {
+        if (compressionType == null) {
+          return ForwardIndexConfig.DEFAULT;
+        } else {
+          return new 
ForwardIndexConfig.Builder().withCompressionType(compressionType).build();
+        }
+      };
+      if (!disabledCols.contains(column)) {
+        TableConfig tableConfig = indexLoadingConfig.getTableConfig();
+        if (tableConfig == null) {
+          result.put(column, defaultConfig.get());
+        } else {
+          List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
+          if (fieldConfigList == null) {
+            result.put(column, defaultConfig.get());
+            continue;
+          }
+          FieldConfig fieldConfig = fieldConfigList.stream()
+              .filter(fc -> fc.getName().equals(column))
+              .findAny()
+              .orElse(null);
+          if (fieldConfig == null) {
+            result.put(column, defaultConfig.get());
+            continue;
+          }
+          ForwardIndexConfig.Builder builder = new 
ForwardIndexConfig.Builder();
+          if (compressionType != null) {
+            builder.withCompressionType(compressionType);
+          } else {
+            FieldConfig.CompressionCodec compressionCodec = 
fieldConfig.getCompressionCodec();
+            if (compressionCodec != null) {
+              
builder.withCompressionType(ChunkCompressionType.valueOf(compressionCodec.name()));
+            }
+          }
+
+          result.put(column, builder.build());
+        }
+      } else {
+        result.put(column, ForwardIndexConfig.DISABLED);
+      }
+    }
+    return result;
   }
 
   @Override
-  public IndexConfig getDefaultConfig() {
-    return IndexConfig.ENABLED;
+  public ForwardIndexConfig getDefaultConfig() {
+    return ForwardIndexConfig.DEFAULT;
   }
 
   @Override
-  public IndexConfig getConfig(TableConfig tableConfig, Schema schema) {
-    throw new UnsupportedOperationException();
+  public ColumnConfigDeserializer<ForwardIndexConfig> getDeserializer() {
+    // reads tableConfig.fieldConfigList and decides what to create using the 
FieldConfig properties and encoding
+    ColumnConfigDeserializer<ForwardIndexConfig> fromFieldConfig = 
IndexConfigDeserializer.fromCollection(
+        TableConfig::getFieldConfigList,
+        (accum, fieldConfig) -> {
+          Map<String, String> properties = fieldConfig.getProperties();
+          if (properties != null && isDisabled(properties)) {
+            accum.put(fieldConfig.getName(), ForwardIndexConfig.DISABLED);
+          } else if (fieldConfig.getEncodingType() == 
FieldConfig.EncodingType.RAW) {
+            accum.put(fieldConfig.getName(), 
createConfigFromFieldConfig(fieldConfig));
+          }
+        }
+    );
+    return IndexConfigDeserializer.fromIndexes("forward", 
getIndexConfigClass())
+        .withExclusiveAlternative(fromFieldConfig);
+  }
+
+  private boolean isDisabled(Map<String, String> props) {
+    return Boolean.parseBoolean(
+        props.getOrDefault(FieldConfig.FORWARD_INDEX_DISABLED, 
FieldConfig.DEFAULT_FORWARD_INDEX_DISABLED));
+  }
+
+  private ForwardIndexConfig createConfigFromFieldConfig(FieldConfig 
fieldConfig) {
+    if (fieldConfig.getEncodingType() != FieldConfig.EncodingType.RAW) {
+      throw new IllegalArgumentException("Cannot build a forward index on a 
field whose encoding is "

Review Comment:
   Not a new feature, but we might be talking about different things. 
   
   A column can be dict-encoded and have its fwd index made of dict ids; or 
have its fwd index simply made of raw values (i.e. encodingType=RAW). When I 
saw this if-check, I was curious why this method only deals with RAW.
   ```
   public enum EncodingType {
       RAW, DICTIONARY
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to