klsince commented on code in PR #10184:
URL: https://github.com/apache/pinot/pull/10184#discussion_r1151133210


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java:
##########
@@ -162,22 +165,27 @@ private File getValidDocIdsSnapshotFile() {
   }
 
   @Override
-  public Dictionary getDictionary(String column) {
+  public <I extends IndexReader> I getIndex(String column, IndexType<?, I, ?> 
type) {
     ColumnIndexContainer container = _indexContainerMap.get(column);
     if (container == null) {
       throw new NullPointerException("Invalid column: " + column);
     }
-    return container.getDictionary();
+    return type.getIndexReader(container);

Review Comment:
   directly call `container.getIndex(type)`? and remove this method from 
IndexType
   ```
      @Nullable
      default IR getIndexReader(ColumnIndexContainer indexContainer) {
           return indexContainer.getIndex(this);
      }
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java:
##########
@@ -133,183 +120,141 @@ public void init(SegmentGeneratorConfig 
segmentCreationSpec, SegmentIndexCreatio
       return;
     }
 
-    Collection<FieldSpec> fieldSpecs = schema.getAllFieldSpecs();
-    Set<String> invertedIndexColumns = new HashSet<>();
-    for (String columnName : _config.getInvertedIndexCreationColumns()) {
-      Preconditions.checkState(schema.hasColumn(columnName),
-          "Cannot create inverted index for column: %s because it is not in 
schema", columnName);
-      invertedIndexColumns.add(columnName);
-    }
+    Map<String, FieldIndexConfigs> indexConfigs = 
segmentCreationSpec.getIndexConfigsByColName();
 
-    Set<String> bloomFilterColumns = new HashSet<>();
-    for (String columnName : _config.getBloomFilterCreationColumns()) {
-      Preconditions.checkState(schema.hasColumn(columnName),
-          "Cannot create bloom filter for column: %s because it is not in 
schema", columnName);
-      bloomFilterColumns.add(columnName);
-    }
-
-    Set<String> rangeIndexColumns = new HashSet<>();
-    for (String columnName : _config.getRangeIndexCreationColumns()) {
-      Preconditions.checkState(schema.hasColumn(columnName),
-          "Cannot create range index for column: %s because it is not in 
schema", columnName);
-      rangeIndexColumns.add(columnName);
-    }
-
-    Set<String> textIndexColumns = new HashSet<>();
-    for (String columnName : _config.getTextIndexCreationColumns()) {
-      Preconditions.checkState(schema.hasColumn(columnName),
-          "Cannot create text index for column: %s because it is not in 
schema", columnName);
-      textIndexColumns.add(columnName);
-    }
-
-    Set<String> fstIndexColumns = new HashSet<>();
-    for (String columnName : _config.getFSTIndexCreationColumns()) {
-      Preconditions.checkState(schema.hasColumn(columnName),
-          "Cannot create FST index for column: %s because it is not in 
schema", columnName);
-      fstIndexColumns.add(columnName);
-    }
-
-    Map<String, JsonIndexConfig> jsonIndexConfigs = 
_config.getJsonIndexConfigs();
-    for (String columnName : jsonIndexConfigs.keySet()) {
-      Preconditions.checkState(schema.hasColumn(columnName),
-          "Cannot create json index for column: %s because it is not in 
schema", columnName);
-    }
-
-    Set<String> forwardIndexDisabledColumns = new HashSet<>();
-    for (String columnName : _config.getForwardIndexDisabledColumns()) {
-      Preconditions.checkState(schema.hasColumn(columnName), 
String.format("Invalid config. Can't disable "
-          + "forward index creation for a column: %s that does not exist in 
schema", columnName));
-      forwardIndexDisabledColumns.add(columnName);
-    }
-
-    Map<String, H3IndexConfig> h3IndexConfigs = _config.getH3IndexConfigs();
-    for (String columnName : h3IndexConfigs.keySet()) {
-      Preconditions.checkState(schema.hasColumn(columnName),
-          "Cannot create H3 index for column: %s because it is not in schema", 
columnName);
-    }
+    _creatorsByColAndIndex = 
Maps.newHashMapWithExpectedSize(indexConfigs.keySet().size());
 
-    // Initialize creators for dictionary, forward index and inverted index
-    IndexingConfig indexingConfig = 
_config.getTableConfig().getIndexingConfig();
-    int rangeIndexVersion = indexingConfig.getRangeIndexVersion();
-    for (FieldSpec fieldSpec : fieldSpecs) {
-      // Ignore virtual columns
+    for (String columnName : indexConfigs.keySet()) {
+      FieldSpec fieldSpec = schema.getFieldSpecFor(columnName);
+      if (fieldSpec == null) {
+        Preconditions.checkState(schema.hasColumn(columnName),
+            "Cannot create index for column: %s because it is not in schema", 
columnName);
+      }
       if (fieldSpec.isVirtualColumn()) {
+        LOGGER.warn("Ignoring index creation for virtual column " + 
columnName);
         continue;
       }
 
-      String columnName = fieldSpec.getName();
+      FieldIndexConfigs originalConfig = indexConfigs.get(columnName);
       ColumnIndexCreationInfo columnIndexCreationInfo = 
indexCreationInfoMap.get(columnName);
       Preconditions.checkNotNull(columnIndexCreationInfo, "Missing index 
creation info for column: %s", columnName);
       boolean dictEnabledColumn = 
createDictionaryForColumn(columnIndexCreationInfo, segmentCreationSpec, 
fieldSpec);
-      Preconditions.checkState(dictEnabledColumn || 
!invertedIndexColumns.contains(columnName),
+      Preconditions.checkState(dictEnabledColumn || 
!originalConfig.getConfig(StandardIndexes.inverted()).isEnabled(),
           "Cannot create inverted index for raw index column: %s", columnName);
 
-      boolean forwardIndexDisabled = 
forwardIndexDisabledColumns.contains(columnName);
+      IndexType<ForwardIndexConfig, ?, ForwardIndexCreator> forwardIdx = 
StandardIndexes.forward();
+      boolean forwardIndexDisabled = 
!originalConfig.getConfig(forwardIdx).isEnabled();
 
       IndexCreationContext.Common context = IndexCreationContext.builder()
           .withIndexDir(_indexDir)
-          .withCardinality(columnIndexCreationInfo.getDistinctValueCount())
           .withDictionary(dictEnabledColumn)
           .withFieldSpec(fieldSpec)
           .withTotalDocs(segmentIndexCreationInfo.getTotalDocs())
-          .withMinValue((Comparable<?>) columnIndexCreationInfo.getMin())
-          .withMaxValue((Comparable<?>) columnIndexCreationInfo.getMax())
-          
.withTotalNumberOfEntries(columnIndexCreationInfo.getTotalNumberOfEntries())
           .withColumnIndexCreationInfo(columnIndexCreationInfo)
-          .sorted(columnIndexCreationInfo.isSorted())
+          .withIsOptimizedDictionary(_config.isOptimizeDictionary()
+              || _config.isOptimizeDictionaryForMetrics() && 
fieldSpec.getFieldType() == FieldSpec.FieldType.METRIC)
           .onHeap(segmentCreationSpec.isOnHeap())
           .withForwardIndexDisabled(forwardIndexDisabled)
+          .withTextCommitOnClose(true)

Review Comment:
   looks like `true` is set for all calls of this method, so maybe we can use 
`true` as default and save all those calls. 



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java:
##########
@@ -133,183 +120,141 @@ public void init(SegmentGeneratorConfig 
segmentCreationSpec, SegmentIndexCreatio
       return;
     }
 
-    Collection<FieldSpec> fieldSpecs = schema.getAllFieldSpecs();
-    Set<String> invertedIndexColumns = new HashSet<>();
-    for (String columnName : _config.getInvertedIndexCreationColumns()) {
-      Preconditions.checkState(schema.hasColumn(columnName),
-          "Cannot create inverted index for column: %s because it is not in 
schema", columnName);
-      invertedIndexColumns.add(columnName);
-    }
+    Map<String, FieldIndexConfigs> indexConfigs = 
segmentCreationSpec.getIndexConfigsByColName();
 
-    Set<String> bloomFilterColumns = new HashSet<>();
-    for (String columnName : _config.getBloomFilterCreationColumns()) {
-      Preconditions.checkState(schema.hasColumn(columnName),
-          "Cannot create bloom filter for column: %s because it is not in 
schema", columnName);
-      bloomFilterColumns.add(columnName);
-    }
-
-    Set<String> rangeIndexColumns = new HashSet<>();
-    for (String columnName : _config.getRangeIndexCreationColumns()) {
-      Preconditions.checkState(schema.hasColumn(columnName),
-          "Cannot create range index for column: %s because it is not in 
schema", columnName);
-      rangeIndexColumns.add(columnName);
-    }
-
-    Set<String> textIndexColumns = new HashSet<>();
-    for (String columnName : _config.getTextIndexCreationColumns()) {
-      Preconditions.checkState(schema.hasColumn(columnName),
-          "Cannot create text index for column: %s because it is not in 
schema", columnName);
-      textIndexColumns.add(columnName);
-    }
-
-    Set<String> fstIndexColumns = new HashSet<>();
-    for (String columnName : _config.getFSTIndexCreationColumns()) {
-      Preconditions.checkState(schema.hasColumn(columnName),
-          "Cannot create FST index for column: %s because it is not in 
schema", columnName);
-      fstIndexColumns.add(columnName);
-    }
-
-    Map<String, JsonIndexConfig> jsonIndexConfigs = 
_config.getJsonIndexConfigs();
-    for (String columnName : jsonIndexConfigs.keySet()) {
-      Preconditions.checkState(schema.hasColumn(columnName),
-          "Cannot create json index for column: %s because it is not in 
schema", columnName);
-    }
-
-    Set<String> forwardIndexDisabledColumns = new HashSet<>();
-    for (String columnName : _config.getForwardIndexDisabledColumns()) {
-      Preconditions.checkState(schema.hasColumn(columnName), 
String.format("Invalid config. Can't disable "
-          + "forward index creation for a column: %s that does not exist in 
schema", columnName));
-      forwardIndexDisabledColumns.add(columnName);
-    }
-
-    Map<String, H3IndexConfig> h3IndexConfigs = _config.getH3IndexConfigs();
-    for (String columnName : h3IndexConfigs.keySet()) {
-      Preconditions.checkState(schema.hasColumn(columnName),
-          "Cannot create H3 index for column: %s because it is not in schema", 
columnName);
-    }
+    _creatorsByColAndIndex = 
Maps.newHashMapWithExpectedSize(indexConfigs.keySet().size());
 
-    // Initialize creators for dictionary, forward index and inverted index
-    IndexingConfig indexingConfig = 
_config.getTableConfig().getIndexingConfig();
-    int rangeIndexVersion = indexingConfig.getRangeIndexVersion();
-    for (FieldSpec fieldSpec : fieldSpecs) {
-      // Ignore virtual columns
+    for (String columnName : indexConfigs.keySet()) {
+      FieldSpec fieldSpec = schema.getFieldSpecFor(columnName);
+      if (fieldSpec == null) {
+        Preconditions.checkState(schema.hasColumn(columnName),
+            "Cannot create index for column: %s because it is not in schema", 
columnName);
+      }
       if (fieldSpec.isVirtualColumn()) {
+        LOGGER.warn("Ignoring index creation for virtual column " + 
columnName);
         continue;
       }
 
-      String columnName = fieldSpec.getName();
+      FieldIndexConfigs originalConfig = indexConfigs.get(columnName);
       ColumnIndexCreationInfo columnIndexCreationInfo = 
indexCreationInfoMap.get(columnName);
       Preconditions.checkNotNull(columnIndexCreationInfo, "Missing index 
creation info for column: %s", columnName);
       boolean dictEnabledColumn = 
createDictionaryForColumn(columnIndexCreationInfo, segmentCreationSpec, 
fieldSpec);
-      Preconditions.checkState(dictEnabledColumn || 
!invertedIndexColumns.contains(columnName),
+      Preconditions.checkState(dictEnabledColumn || 
!originalConfig.getConfig(StandardIndexes.inverted()).isEnabled(),
           "Cannot create inverted index for raw index column: %s", columnName);
 
-      boolean forwardIndexDisabled = 
forwardIndexDisabledColumns.contains(columnName);
+      IndexType<ForwardIndexConfig, ?, ForwardIndexCreator> forwardIdx = 
StandardIndexes.forward();
+      boolean forwardIndexDisabled = 
!originalConfig.getConfig(forwardIdx).isEnabled();
 
       IndexCreationContext.Common context = IndexCreationContext.builder()
           .withIndexDir(_indexDir)
-          .withCardinality(columnIndexCreationInfo.getDistinctValueCount())
           .withDictionary(dictEnabledColumn)
           .withFieldSpec(fieldSpec)
           .withTotalDocs(segmentIndexCreationInfo.getTotalDocs())
-          .withMinValue((Comparable<?>) columnIndexCreationInfo.getMin())
-          .withMaxValue((Comparable<?>) columnIndexCreationInfo.getMax())
-          
.withTotalNumberOfEntries(columnIndexCreationInfo.getTotalNumberOfEntries())
           .withColumnIndexCreationInfo(columnIndexCreationInfo)
-          .sorted(columnIndexCreationInfo.isSorted())
+          .withIsOptimizedDictionary(_config.isOptimizeDictionary()
+              || _config.isOptimizeDictionaryForMetrics() && 
fieldSpec.getFieldType() == FieldSpec.FieldType.METRIC)
           .onHeap(segmentCreationSpec.isOnHeap())
           .withForwardIndexDisabled(forwardIndexDisabled)
+          .withTextCommitOnClose(true)
           .build();
-      // Initialize forward index creator
-      ChunkCompressionType chunkCompressionType =
-          dictEnabledColumn ? null : 
getColumnCompressionType(segmentCreationSpec, fieldSpec);
-      // Sorted columns treat the 'forwardIndexDisabled' flag as a no-op
-      _forwardIndexCreatorMap.put(columnName, (forwardIndexDisabled && 
!columnIndexCreationInfo.isSorted())
-          ? null : _indexCreatorProvider.newForwardIndexCreator(
-              context.forForwardIndex(chunkCompressionType, 
segmentCreationSpec.getColumnProperties())));
-
-      // Initialize inverted index creator; skip creating inverted index if 
sorted
-      if (invertedIndexColumns.contains(columnName) && 
!columnIndexCreationInfo.isSorted()) {
-        _invertedIndexCreatorMap.put(columnName,
-            
_indexCreatorProvider.newInvertedIndexCreator(context.forInvertedIndex()));
-      }
+
+      FieldIndexConfigs config = adaptConfig(columnName, originalConfig, 
columnIndexCreationInfo, segmentCreationSpec);
+
       if (dictEnabledColumn) {
         // Create dictionary-encoded index
         // Initialize dictionary creator
         // TODO: Dictionary creator holds all unique values on heap. Consider 
keeping dictionary instead of creator
         //       which uses off-heap memory.
-        SegmentDictionaryCreator dictionaryCreator =
-            new SegmentDictionaryCreator(fieldSpec, _indexDir, 
columnIndexCreationInfo.isUseVarLengthDictionary());
-        _dictionaryCreatorMap.put(columnName, dictionaryCreator);
-        // Create dictionary
+
+        // Index conf should be present if dictEnabledColumn is true. In case 
it doesn't, getConfig will throw an
+        // exception
+        DictionaryIndexConfig dictConfig = 
config.getConfig(StandardIndexes.dictionary());
+        if (!dictConfig.isEnabled()) {
+          throw new IllegalArgumentException("Dictionary index should be 
enabled");
+        }
+        SegmentDictionaryCreator creator = new 
DictionaryIndexPlugin().getIndexType()
+            .createIndexCreator(context, dictConfig);
+
         try {
-          
dictionaryCreator.build(columnIndexCreationInfo.getSortedUniqueElementsArray());
+          creator.build(context.getSortedUniqueElementsArray());
         } catch (Exception e) {
           LOGGER.error("Error building dictionary for field: {}, cardinality: 
{}, number of bytes per entry: {}",
-              fieldSpec.getName(), 
columnIndexCreationInfo.getDistinctValueCount(),
-              dictionaryCreator.getNumBytesPerEntry());
+              context.getFieldSpec().getName(), context.getCardinality(), 
creator.getNumBytesPerEntry());
           throw e;
         }
-      }
 
-      if (bloomFilterColumns.contains(columnName)) {
-        if (indexingConfig.getBloomFilterConfigs() != null
-            && indexingConfig.getBloomFilterConfigs().containsKey(columnName)) 
{
-          _bloomFilterCreatorMap.put(columnName, 
_indexCreatorProvider.newBloomFilterCreator(
-              
context.forBloomFilter(indexingConfig.getBloomFilterConfigs().get(columnName))));
-        } else {
-          _bloomFilterCreatorMap.put(columnName, 
_indexCreatorProvider.newBloomFilterCreator(
-              context.forBloomFilter(new 
BloomFilterConfig(BloomFilterConfig.DEFAULT_FPP, 0, false))));
-        }
+        _dictionaryCreatorMap.put(columnName, creator);
       }
 
-      if (!columnIndexCreationInfo.isSorted() && 
rangeIndexColumns.contains(columnName)) {
-        _rangeIndexFilterCreatorMap.put(columnName,
-            
_indexCreatorProvider.newRangeIndexCreator(context.forRangeIndex(rangeIndexVersion)));
+      Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex =
+          
Maps.newHashMapWithExpectedSize(IndexService.getInstance().getAllIndexes().size());
+      for (IndexType<?, ?, ?> index : 
IndexService.getInstance().getAllIndexes()) {
+        if (hasSpecialLifecycle(index)) {
+          continue;
+        }
+        tryCreateIndexCreator(creatorsByIndex, index, context, config);
       }
-
-      if (textIndexColumns.contains(columnName)) {
-        FSTType fstType = FSTType.LUCENE;
-        List<FieldConfig> fieldConfigList = 
_config.getTableConfig().getFieldConfigList();
-        if (fieldConfigList != null) {
-          for (FieldConfig fieldConfig : fieldConfigList) {
-            if (fieldConfig.getName().equals(columnName)) {
-              Map<String, String> properties = fieldConfig.getProperties();
-              if (TextIndexUtils.isFstTypeNative(properties)) {
-                fstType = FSTType.NATIVE;
-              }
-            }
-          }
+      // TODO: Remove this when values stored as ForwardIndex stop depending 
on TextIndex config
+      IndexCreator oldFwdCreator = creatorsByIndex.get(forwardIdx);
+      if (oldFwdCreator instanceof ForwardIndexCreator) { // this implies that 
oldFwdCreator != null

Review Comment:
   Is it possible to get a different index creator from `get(forwardIdx)`? If 
not, perhaps just need a simple null check here. 



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java:
##########
@@ -133,183 +120,141 @@ public void init(SegmentGeneratorConfig 
segmentCreationSpec, SegmentIndexCreatio
       return;
     }
 
-    Collection<FieldSpec> fieldSpecs = schema.getAllFieldSpecs();
-    Set<String> invertedIndexColumns = new HashSet<>();
-    for (String columnName : _config.getInvertedIndexCreationColumns()) {
-      Preconditions.checkState(schema.hasColumn(columnName),
-          "Cannot create inverted index for column: %s because it is not in 
schema", columnName);
-      invertedIndexColumns.add(columnName);
-    }
+    Map<String, FieldIndexConfigs> indexConfigs = 
segmentCreationSpec.getIndexConfigsByColName();
 
-    Set<String> bloomFilterColumns = new HashSet<>();
-    for (String columnName : _config.getBloomFilterCreationColumns()) {
-      Preconditions.checkState(schema.hasColumn(columnName),
-          "Cannot create bloom filter for column: %s because it is not in 
schema", columnName);
-      bloomFilterColumns.add(columnName);
-    }
-
-    Set<String> rangeIndexColumns = new HashSet<>();
-    for (String columnName : _config.getRangeIndexCreationColumns()) {
-      Preconditions.checkState(schema.hasColumn(columnName),
-          "Cannot create range index for column: %s because it is not in 
schema", columnName);
-      rangeIndexColumns.add(columnName);
-    }
-
-    Set<String> textIndexColumns = new HashSet<>();
-    for (String columnName : _config.getTextIndexCreationColumns()) {
-      Preconditions.checkState(schema.hasColumn(columnName),
-          "Cannot create text index for column: %s because it is not in 
schema", columnName);
-      textIndexColumns.add(columnName);
-    }
-
-    Set<String> fstIndexColumns = new HashSet<>();
-    for (String columnName : _config.getFSTIndexCreationColumns()) {
-      Preconditions.checkState(schema.hasColumn(columnName),
-          "Cannot create FST index for column: %s because it is not in 
schema", columnName);
-      fstIndexColumns.add(columnName);
-    }
-
-    Map<String, JsonIndexConfig> jsonIndexConfigs = 
_config.getJsonIndexConfigs();
-    for (String columnName : jsonIndexConfigs.keySet()) {
-      Preconditions.checkState(schema.hasColumn(columnName),
-          "Cannot create json index for column: %s because it is not in 
schema", columnName);
-    }
-
-    Set<String> forwardIndexDisabledColumns = new HashSet<>();
-    for (String columnName : _config.getForwardIndexDisabledColumns()) {
-      Preconditions.checkState(schema.hasColumn(columnName), 
String.format("Invalid config. Can't disable "
-          + "forward index creation for a column: %s that does not exist in 
schema", columnName));
-      forwardIndexDisabledColumns.add(columnName);
-    }
-
-    Map<String, H3IndexConfig> h3IndexConfigs = _config.getH3IndexConfigs();
-    for (String columnName : h3IndexConfigs.keySet()) {
-      Preconditions.checkState(schema.hasColumn(columnName),
-          "Cannot create H3 index for column: %s because it is not in schema", 
columnName);
-    }
+    _creatorsByColAndIndex = 
Maps.newHashMapWithExpectedSize(indexConfigs.keySet().size());
 
-    // Initialize creators for dictionary, forward index and inverted index
-    IndexingConfig indexingConfig = 
_config.getTableConfig().getIndexingConfig();
-    int rangeIndexVersion = indexingConfig.getRangeIndexVersion();
-    for (FieldSpec fieldSpec : fieldSpecs) {
-      // Ignore virtual columns
+    for (String columnName : indexConfigs.keySet()) {
+      FieldSpec fieldSpec = schema.getFieldSpecFor(columnName);
+      if (fieldSpec == null) {
+        Preconditions.checkState(schema.hasColumn(columnName),
+            "Cannot create index for column: %s because it is not in schema", 
columnName);
+      }
       if (fieldSpec.isVirtualColumn()) {
+        LOGGER.warn("Ignoring index creation for virtual column " + 
columnName);
         continue;
       }
 
-      String columnName = fieldSpec.getName();
+      FieldIndexConfigs originalConfig = indexConfigs.get(columnName);
       ColumnIndexCreationInfo columnIndexCreationInfo = 
indexCreationInfoMap.get(columnName);
       Preconditions.checkNotNull(columnIndexCreationInfo, "Missing index 
creation info for column: %s", columnName);
       boolean dictEnabledColumn = 
createDictionaryForColumn(columnIndexCreationInfo, segmentCreationSpec, 
fieldSpec);
-      Preconditions.checkState(dictEnabledColumn || 
!invertedIndexColumns.contains(columnName),
+      Preconditions.checkState(dictEnabledColumn || 
!originalConfig.getConfig(StandardIndexes.inverted()).isEnabled(),
           "Cannot create inverted index for raw index column: %s", columnName);
 
-      boolean forwardIndexDisabled = 
forwardIndexDisabledColumns.contains(columnName);
+      IndexType<ForwardIndexConfig, ?, ForwardIndexCreator> forwardIdx = 
StandardIndexes.forward();
+      boolean forwardIndexDisabled = 
!originalConfig.getConfig(forwardIdx).isEnabled();
 
       IndexCreationContext.Common context = IndexCreationContext.builder()
           .withIndexDir(_indexDir)
-          .withCardinality(columnIndexCreationInfo.getDistinctValueCount())
           .withDictionary(dictEnabledColumn)
           .withFieldSpec(fieldSpec)
           .withTotalDocs(segmentIndexCreationInfo.getTotalDocs())
-          .withMinValue((Comparable<?>) columnIndexCreationInfo.getMin())
-          .withMaxValue((Comparable<?>) columnIndexCreationInfo.getMax())
-          
.withTotalNumberOfEntries(columnIndexCreationInfo.getTotalNumberOfEntries())
           .withColumnIndexCreationInfo(columnIndexCreationInfo)
-          .sorted(columnIndexCreationInfo.isSorted())
+          .withIsOptimizedDictionary(_config.isOptimizeDictionary()
+              || _config.isOptimizeDictionaryForMetrics() && 
fieldSpec.getFieldType() == FieldSpec.FieldType.METRIC)
           .onHeap(segmentCreationSpec.isOnHeap())
           .withForwardIndexDisabled(forwardIndexDisabled)
+          .withTextCommitOnClose(true)
           .build();
-      // Initialize forward index creator
-      ChunkCompressionType chunkCompressionType =
-          dictEnabledColumn ? null : 
getColumnCompressionType(segmentCreationSpec, fieldSpec);
-      // Sorted columns treat the 'forwardIndexDisabled' flag as a no-op
-      _forwardIndexCreatorMap.put(columnName, (forwardIndexDisabled && 
!columnIndexCreationInfo.isSorted())
-          ? null : _indexCreatorProvider.newForwardIndexCreator(
-              context.forForwardIndex(chunkCompressionType, 
segmentCreationSpec.getColumnProperties())));
-
-      // Initialize inverted index creator; skip creating inverted index if 
sorted
-      if (invertedIndexColumns.contains(columnName) && 
!columnIndexCreationInfo.isSorted()) {
-        _invertedIndexCreatorMap.put(columnName,
-            
_indexCreatorProvider.newInvertedIndexCreator(context.forInvertedIndex()));
-      }
+
+      FieldIndexConfigs config = adaptConfig(columnName, originalConfig, 
columnIndexCreationInfo, segmentCreationSpec);
+
       if (dictEnabledColumn) {
         // Create dictionary-encoded index
         // Initialize dictionary creator
         // TODO: Dictionary creator holds all unique values on heap. Consider 
keeping dictionary instead of creator
         //       which uses off-heap memory.
-        SegmentDictionaryCreator dictionaryCreator =
-            new SegmentDictionaryCreator(fieldSpec, _indexDir, 
columnIndexCreationInfo.isUseVarLengthDictionary());
-        _dictionaryCreatorMap.put(columnName, dictionaryCreator);
-        // Create dictionary
+
+        // Index conf should be present if dictEnabledColumn is true. In case 
it doesn't, getConfig will throw an
+        // exception
+        DictionaryIndexConfig dictConfig = 
config.getConfig(StandardIndexes.dictionary());
+        if (!dictConfig.isEnabled()) {

Review Comment:
   can this `isEnabled` check be done when calculating `dictEnabledColumn` 
boolean?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java:
##########
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.segment.local.segment.index.forward;
+
+import java.io.File;
+import java.io.IOException;
+import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueFixedByteRawIndexCreator;
+import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueUnsortedForwardIndexCreator;
+import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator;
+import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueFixedByteRawIndexCreator;
+import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueSortedForwardIndexCreator;
+import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueUnsortedForwardIndexCreator;
+import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueVarByteRawIndexCreator;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+public class ForwardIndexCreatorFactory {
+  private ForwardIndexCreatorFactory() {
+  }
+
+  public static ForwardIndexCreator createIndexCreator(IndexCreationContext 
context, ForwardIndexConfig indexConfig)
+      throws Exception {
+    String colName = context.getFieldSpec().getName();
+
+    if (!context.hasDictionary()) {
+      ChunkCompressionType chunkCompressionType = 
indexConfig.getChunkCompressionType();
+      if (chunkCompressionType == null) {
+        chunkCompressionType = 
ForwardIndexType.getDefaultCompressionType(context.getFieldSpec().getFieldType());
+      }
+
+      // Dictionary disabled columns
+      boolean deriveNumDocsPerChunk = indexConfig.isDeriveNumDocsPerChunk();
+      int writerVersion = indexConfig.getRawIndexWriterVersion();
+      if (context.getFieldSpec().isSingleValueField()) {
+        return getRawIndexCreatorForSVColumn(context.getIndexDir(), 
chunkCompressionType, colName,
+            context.getFieldSpec().getDataType().getStoredType(),
+            context.getTotalDocs(), context.getLengthOfLongestEntry(), 
deriveNumDocsPerChunk, writerVersion);
+      } else {
+        return getRawIndexCreatorForMVColumn(context.getIndexDir(), 
chunkCompressionType, colName,
+            context.getFieldSpec().getDataType().getStoredType(),
+            context.getTotalDocs(), 
context.getMaxNumberOfMultiValueElements(), deriveNumDocsPerChunk, 
writerVersion,
+            context.getMaxRowLengthInBytes());
+      }
+    } else {
+      // Dictionary enabled columns
+      if (context.getFieldSpec().isSingleValueField()) {
+        if (context.isSorted()) {
+          return new 
SingleValueSortedForwardIndexCreator(context.getIndexDir(), colName,
+              context.getCardinality());
+        } else {
+          return new 
SingleValueUnsortedForwardIndexCreator(context.getIndexDir(), colName,
+              context.getCardinality(), context.getTotalDocs());
+        }
+      } else {
+        return new 
MultiValueUnsortedForwardIndexCreator(context.getIndexDir(), colName,
+            context.getCardinality(), context.getTotalDocs(), 
context.getTotalNumberOfEntries());
+      }
+    }
+  }
+
+  /**
+   * Helper method to build the raw index creator for the column.
+   * Assumes that column to be indexed is single valued.
+   *
+   * @param file Output index file
+   * @param column Column name
+   * @param totalDocs Total number of documents to index
+   * @param lengthOfLongestEntry Length of longest entry
+   * @param deriveNumDocsPerChunk true if varbyte writer should auto-derive 
the number of rows per chunk
+   * @param writerVersion version to use for the raw index writer
+   * @return raw index creator
+   */
+  public static ForwardIndexCreator getRawIndexCreatorForSVColumn(File file, 
ChunkCompressionType compressionType,
+      String column, FieldSpec.DataType dataType, int totalDocs, int 
lengthOfLongestEntry,
+      boolean deriveNumDocsPerChunk,
+      int writerVersion)

Review Comment:
   format (interesting this was not caught by linter)



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java:
##########
@@ -379,340 +325,95 @@ private boolean 
shouldCreateDictionaryWithinThreshold(ColumnIndexCreationInfo in
   }
 
   /**
-   * Helper method that returns compression type to use based on segment 
creation spec and field type.
-   * <ul>
-   *   <li> Returns compression type from segment creation spec, if specified 
there.</li>
-   *   <li> Else, returns PASS_THROUGH for metrics, and SNAPPY for dimensions. 
This is because metrics are likely
-   *        to be spread in different chunks after applying predicates. Same 
could be true for dimensions, but in that
-   *        case, clients are expected to explicitly specify the appropriate 
compression type in the spec. </li>
-   * </ul>
-   * @param segmentCreationSpec Segment creation spec
-   * @param fieldSpec Field spec for the column
-   * @return Compression type to use
+   * @deprecated use {@link 
ForwardIndexType#getDefaultCompressionType(FieldType)} instead
    */
-  private ChunkCompressionType getColumnCompressionType(SegmentGeneratorConfig 
segmentCreationSpec,
-      FieldSpec fieldSpec) {
-    ChunkCompressionType compressionType = 
segmentCreationSpec.getRawIndexCompressionType().get(fieldSpec.getName());
-    if (compressionType == null) {
-      compressionType = getDefaultCompressionType(fieldSpec.getFieldType());
-    }
-
-    return compressionType;
-  }
-
+  @Deprecated
   public static ChunkCompressionType getDefaultCompressionType(FieldType 
fieldType) {
-    if (fieldType == FieldSpec.FieldType.METRIC) {
-      return ChunkCompressionType.PASS_THROUGH;
-    } else {
-      return ChunkCompressionType.LZ4;
-    }
+    return ForwardIndexType.getDefaultCompressionType(fieldType);
   }
 
   @Override
   public void indexRow(GenericRow row)
       throws IOException {
-    for (Map.Entry<String, ForwardIndexCreator> entry : 
_forwardIndexCreatorMap.entrySet()) {
-      String columnName = entry.getKey();
-      ForwardIndexCreator forwardIndexCreator = entry.getValue();
+    for (Map.Entry<String, Map<IndexType<?, ?, ?>, IndexCreator>> byColEntry : 
_creatorsByColAndIndex.entrySet()) {
+      String columnName = byColEntry.getKey();
 
       Object columnValueToIndex = row.getValue(columnName);
       if (columnValueToIndex == null) {
         throw new RuntimeException("Null value for column:" + columnName);
       }
 
-      FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName);
+      Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex = 
byColEntry.getValue();
 
-      //get dictionaryCreator, will be null if column is not dictionaryEncoded
+      FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName);
       SegmentDictionaryCreator dictionaryCreator = 
_dictionaryCreatorMap.get(columnName);
 
-      // bloom filter
-      BloomFilterCreator bloomFilterCreator = 
_bloomFilterCreatorMap.get(columnName);
-      if (bloomFilterCreator != null) {
-        if (fieldSpec.isSingleValueField()) {
-          if (fieldSpec.getDataType() == DataType.BYTES) {
-            bloomFilterCreator.add(BytesUtils.toHexString((byte[]) 
columnValueToIndex));
-          } else {
-            bloomFilterCreator.add(columnValueToIndex.toString());
-          }
-        } else {
-          Object[] values = (Object[]) columnValueToIndex;
-          if (fieldSpec.getDataType() == DataType.BYTES) {
-            for (Object value : values) {
-              bloomFilterCreator.add(BytesUtils.toHexString((byte[]) value));
-            }
-          } else {
-            for (Object value : values) {
-              bloomFilterCreator.add(value.toString());
-            }
-          }
-        }
-      }
-
-      // range index
-      CombinedInvertedIndexCreator combinedInvertedIndexCreator = 
_rangeIndexFilterCreatorMap.get(columnName);
-      if (combinedInvertedIndexCreator != null) {
-        if (dictionaryCreator != null) {
-          if (fieldSpec.isSingleValueField()) {
-            
combinedInvertedIndexCreator.add(dictionaryCreator.indexOfSV(columnValueToIndex));
-          } else {
-            int[] dictIds = dictionaryCreator.indexOfMV(columnValueToIndex);
-            combinedInvertedIndexCreator.add(dictIds, dictIds.length);
-          }
-        } else {
-          if (fieldSpec.isSingleValueField()) {
-            switch (fieldSpec.getDataType()) {
-              case INT:
-                combinedInvertedIndexCreator.add((Integer) columnValueToIndex);
-                break;
-              case LONG:
-                combinedInvertedIndexCreator.add((Long) columnValueToIndex);
-                break;
-              case FLOAT:
-                combinedInvertedIndexCreator.add((Float) columnValueToIndex);
-                break;
-              case DOUBLE:
-                combinedInvertedIndexCreator.add((Double) columnValueToIndex);
-                break;
-              default:
-                throw new RuntimeException("Unsupported data type " + 
fieldSpec.getDataType() + " for range index");
-            }
-          } else {
-            Object[] values = (Object[]) columnValueToIndex;
-            switch (fieldSpec.getDataType()) {
-              case INT:
-                int[] intValues = new int[values.length];
-                for (int i = 0; i < values.length; i++) {
-                  intValues[i] = (Integer) values[i];
-                }
-                combinedInvertedIndexCreator.add(intValues, values.length);
-                break;
-              case LONG:
-                long[] longValues = new long[values.length];
-                for (int i = 0; i < values.length; i++) {
-                  longValues[i] = (Long) values[i];
-                }
-                combinedInvertedIndexCreator.add(longValues, values.length);
-                break;
-              case FLOAT:
-                float[] floatValues = new float[values.length];
-                for (int i = 0; i < values.length; i++) {
-                  floatValues[i] = (Float) values[i];
-                }
-                combinedInvertedIndexCreator.add(floatValues, values.length);
-                break;
-              case DOUBLE:
-                double[] doubleValues = new double[values.length];
-                for (int i = 0; i < values.length; i++) {
-                  doubleValues[i] = (Double) values[i];
-                }
-                combinedInvertedIndexCreator.add(doubleValues, values.length);
-                break;
-              default:
-                throw new RuntimeException("Unsupported data type " + 
fieldSpec.getDataType() + " for range index");
-            }
-          }
-        }
-      }
-
-      // text-index
-      TextIndexCreator textIndexCreator = _textIndexCreatorMap.get(columnName);
-      if (textIndexCreator != null) {
-        if (fieldSpec.isSingleValueField()) {
-          textIndexCreator.add((String) columnValueToIndex);
-        } else {
-          Object[] values = (Object[]) columnValueToIndex;
-          int length = values.length;
-          if (values instanceof String[]) {
-            textIndexCreator.add((String[]) values, length);
-          } else {
-            String[] strings = new String[length];
-            for (int i = 0; i < length; i++) {
-              strings[i] = (String) values[i];
-            }
-            textIndexCreator.add(strings, length);
-            columnValueToIndex = strings;
-          }
-        }
-      }
-
       if (fieldSpec.isSingleValueField()) {
-        // Single Value column
-        JsonIndexCreator jsonIndexCreator = 
_jsonIndexCreatorMap.get(columnName);
-        if (jsonIndexCreator != null) {
-          jsonIndexCreator.add((String) columnValueToIndex);
-        }
-        GeoSpatialIndexCreator h3IndexCreator = 
_h3IndexCreatorMap.get(columnName);
-        if (h3IndexCreator != null) {
-          h3IndexCreator.add(GeometrySerializer.deserialize((byte[]) 
columnValueToIndex));
-        }
-        if (dictionaryCreator != null) {
-          // dictionary encoded SV column
-          // get dictID from dictionary
-          int dictId = dictionaryCreator.indexOfSV(columnValueToIndex);
-          // store the docID -> dictID mapping in forward index
-          if (forwardIndexCreator != null) {
-            forwardIndexCreator.putDictId(dictId);
-          }
-          DictionaryBasedInvertedIndexCreator invertedIndexCreator = 
_invertedIndexCreatorMap.get(columnName);
-          if (invertedIndexCreator != null) {
-            // if inverted index enabled during segment creation,
-            // then store dictID -> docID mapping in inverted index
-            invertedIndexCreator.add(dictId);
-          }
-        } else {
-          // non-dictionary encoded SV column
-          // store the docId -> raw value mapping in forward index
-          if (textIndexCreator != null && 
!shouldStoreRawValueForTextIndex(columnName)) {
-            // for text index on raw columns, check the config to determine if 
actual raw value should
-            // be stored or not
-            columnValueToIndex = 
_columnProperties.get(columnName).get(FieldConfig.TEXT_INDEX_RAW_VALUE);
-            if (columnValueToIndex == null) {
-              columnValueToIndex = FieldConfig.TEXT_INDEX_DEFAULT_RAW_VALUE;
-            }
-          }
-          if (forwardIndexCreator != null) {
-            switch (forwardIndexCreator.getValueType()) {
-              case INT:
-                forwardIndexCreator.putInt((int) columnValueToIndex);
-                break;
-              case LONG:
-                forwardIndexCreator.putLong((long) columnValueToIndex);
-                break;
-              case FLOAT:
-                forwardIndexCreator.putFloat((float) columnValueToIndex);
-                break;
-              case DOUBLE:
-                forwardIndexCreator.putDouble((double) columnValueToIndex);
-                break;
-              case BIG_DECIMAL:
-                forwardIndexCreator.putBigDecimal((BigDecimal) 
columnValueToIndex);
-                break;
-              case STRING:
-                forwardIndexCreator.putString((String) columnValueToIndex);
-                break;
-              case BYTES:
-                forwardIndexCreator.putBytes((byte[]) columnValueToIndex);
-                break;
-              case JSON:
-                if (columnValueToIndex instanceof String) {
-                  forwardIndexCreator.putString((String) columnValueToIndex);
-                } else if (columnValueToIndex instanceof byte[]) {
-                  forwardIndexCreator.putBytes((byte[]) columnValueToIndex);
-                }
-                break;
-              default:
-                throw new IllegalStateException();
-            }
-          }
-        }
+        indexSingleValueRow(dictionaryCreator, columnValueToIndex, 
creatorsByIndex);
       } else {
-        if (dictionaryCreator != null) {
-          //dictionary encoded
-          int[] dictIds = dictionaryCreator.indexOfMV(columnValueToIndex);
-          if (forwardIndexCreator != null) {
-            forwardIndexCreator.putDictIdMV(dictIds);
-          }
-          DictionaryBasedInvertedIndexCreator invertedIndexCreator = 
_invertedIndexCreatorMap.get(columnName);
-          if (invertedIndexCreator != null) {
-            invertedIndexCreator.add(dictIds, dictIds.length);
-          }
-        } else {
-          // for text index on raw columns, check the config to determine if 
actual raw value should
-          // be stored or not
-          if (forwardIndexCreator != null) {
-            if (textIndexCreator != null && 
!shouldStoreRawValueForTextIndex(columnName)) {
-              Object value = 
_columnProperties.get(columnName).get(FieldConfig.TEXT_INDEX_RAW_VALUE);
-              if (value == null) {
-                value = FieldConfig.TEXT_INDEX_DEFAULT_RAW_VALUE;
-              }
-              if (forwardIndexCreator.getValueType().getStoredType() == 
DataType.STRING) {
-                columnValueToIndex = new String[]{String.valueOf(value)};
-              } else if (forwardIndexCreator.getValueType().getStoredType() == 
DataType.BYTES) {
-                columnValueToIndex = new 
byte[][]{String.valueOf(value).getBytes(UTF_8)};
-              } else {
-                throw new RuntimeException("Text Index is only supported for 
STRING and BYTES stored type");
-              }
-            }
-            Object[] values = (Object[]) columnValueToIndex;
-            int length = values.length;
-            switch (forwardIndexCreator.getValueType()) {
-              case INT:
-                int[] ints = new int[length];
-                for (int i = 0; i < length; i++) {
-                  ints[i] = (Integer) values[i];
-                }
-                forwardIndexCreator.putIntMV(ints);
-                break;
-              case LONG:
-                long[] longs = new long[length];
-                for (int i = 0; i < length; i++) {
-                  longs[i] = (Long) values[i];
-                }
-                forwardIndexCreator.putLongMV(longs);
-                break;
-              case FLOAT:
-                float[] floats = new float[length];
-                for (int i = 0; i < length; i++) {
-                  floats[i] = (Float) values[i];
-                }
-                forwardIndexCreator.putFloatMV(floats);
-                break;
-              case DOUBLE:
-                double[] doubles = new double[length];
-                for (int i = 0; i < length; i++) {
-                  doubles[i] = (Double) values[i];
-                }
-                forwardIndexCreator.putDoubleMV(doubles);
-                break;
-              case STRING:
-                if (values instanceof String[]) {
-                  forwardIndexCreator.putStringMV((String[]) values);
-                } else {
-                  String[] strings = new String[length];
-                  for (int i = 0; i < length; i++) {
-                    strings[i] = (String) values[i];
-                  }
-                  forwardIndexCreator.putStringMV(strings);
-                }
-                break;
-              case BYTES:
-                if (values instanceof byte[][]) {
-                  forwardIndexCreator.putBytesMV((byte[][]) values);
-                } else {
-                  byte[][] bytesArray = new byte[length][];
-                  for (int i = 0; i < length; i++) {
-                    bytesArray[i] = (byte[]) values[i];
-                  }
-                  forwardIndexCreator.putBytesMV(bytesArray);
-                }
-                break;
-              default:
-                throw new IllegalStateException();
-            }
-          }
-        }
+        indexMultiValueRow(dictionaryCreator, (Object[]) columnValueToIndex, 
creatorsByIndex);
       }
+    }
 
-      if (_nullHandlingEnabled) {
+    if (_nullHandlingEnabled) {
+      for (Map.Entry<String, NullValueVectorCreator> entry : 
_nullValueVectorCreatorMap.entrySet()) {
+        String columnName = entry.getKey();
         // If row has null value for given column name, add to null value 
vector
         if (row.isNullValue(columnName)) {
           _nullValueVectorCreatorMap.get(columnName).setNull(_docIdCounter);
         }
       }
     }
+
     _docIdCounter++;
   }
 
-  private boolean shouldStoreRawValueForTextIndex(String column) {
-    if (_columnProperties != null) {
-      Map<String, String> props = _columnProperties.get(column);
-      // by default always store the raw value
-      // if the config is set to true, don't store the actual raw value
-      // there will be a dummy value
-      return props == null || 
!Boolean.parseBoolean(props.get(FieldConfig.TEXT_INDEX_NO_RAW_DATA));
+  private void indexSingleValueRow(SegmentDictionaryCreator dictionaryCreator, 
Object value,
+      Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex)
+      throws IOException {
+    int dictId = dictionaryCreator != null ? 
dictionaryCreator.indexOfSV(value) : -1;
+    for (IndexCreator creator : creatorsByIndex.values()) {
+      creator.add(value, dictId);
+    }
+  }
+
+  private void indexMultiValueRow(SegmentDictionaryCreator dictionaryCreator, 
Object[] values,
+      Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex)
+      throws IOException {
+    int[] dictId = dictionaryCreator != null ? 
dictionaryCreator.indexOfMV(values) : null;
+    for (IndexCreator creator : creatorsByIndex.values()) {
+      creator.add(values, dictId);
+    }
+  }
+
+  @Nullable
+  private Object calculateAlternativeValue(boolean dictEnabledColumn, 
FieldIndexConfigs configs, FieldSpec fieldSpec) {
+    if (dictEnabledColumn) {
+      return null;
     }
+    TextIndexConfig textConfig = configs.getConfig(StandardIndexes.text());
+    if (!textConfig.isEnabled()) {
+      return null;
+    }
+
+    Object alternativeValue = textConfig.getRawValueForTextIndex();
 
-    return true;
+    if (alternativeValue == null) {
+      return null;
+    } else if (!fieldSpec.isSingleValueField()) {

Review Comment:
   nit: rename this method calculateRawValueForTextIndex?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/bloom/BloomIndexType.java:
##########
@@ -40,27 +46,43 @@
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 
-public class BloomIndexType implements IndexType<BloomFilterConfig, 
BloomFilterReader, BloomFilterCreator> {
-  public static final BloomIndexType INSTANCE = new BloomIndexType();
 
-  @Override
-  public String getId() {
-    return StandardIndexes.BLOOM_FILTER_ID;
+public class BloomIndexType
+    extends AbstractIndexType<BloomFilterConfig, BloomFilterReader, 
BloomFilterCreator>
+    implements ConfigurableFromIndexLoadingConfig<BloomFilterConfig> {
+
+  protected BloomIndexType() {
+    super(StandardIndexes.BLOOM_FILTER_ID);
   }
 
   @Override
   public Class<BloomFilterConfig> getIndexConfigClass() {
     return BloomFilterConfig.class;
   }
 
+  @Override
+  public Map<String, BloomFilterConfig> 
fromIndexLoadingConfig(IndexLoadingConfig indexLoadingConfig) {
+    return indexLoadingConfig.getBloomFilterConfigs();
+  }
+
   @Override
   public BloomFilterConfig getDefaultConfig() {
     return BloomFilterConfig.DISABLED;
   }
 
   @Override
-  public BloomFilterConfig getConfig(TableConfig tableConfig, Schema schema) {
-    throw new UnsupportedOperationException("To be implemented in a future 
PR");
+  public ColumnConfigDeserializer<BloomFilterConfig> getDeserializer() {
+    return IndexConfigDeserializer.fromIndexes("bloom", getIndexConfigClass())
+        .withExclusiveAlternative(
+            IndexConfigDeserializer.ifIndexingConfig(
+                    // reads tableConfig.indexingConfig.bloomFilterConfigs

Review Comment:
   nit: format the comments more consistently?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/fst/FstIndexType.java:
##########
@@ -19,74 +19,152 @@
 
 package org.apache.pinot.segment.local.segment.index.fst;
 
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 import javax.annotation.Nullable;
+import 
org.apache.pinot.segment.local.segment.creator.impl.inv.text.LuceneFSTIndexCreator;
+import 
org.apache.pinot.segment.local.segment.index.loader.ConfigurableFromIndexLoadingConfig;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import 
org.apache.pinot.segment.local.segment.index.loader.invertedindex.FSTIndexHandler;
+import 
org.apache.pinot.segment.local.segment.index.readers.LuceneFSTIndexReader;
+import org.apache.pinot.segment.local.utils.nativefst.FSTHeader;
+import org.apache.pinot.segment.local.utils.nativefst.NativeFSTIndexCreator;
+import org.apache.pinot.segment.local.utils.nativefst.NativeFSTIndexReader;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.index.AbstractIndexType;
+import org.apache.pinot.segment.spi.index.ColumnConfigDeserializer;
 import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
-import org.apache.pinot.segment.spi.index.IndexCreator;
+import org.apache.pinot.segment.spi.index.FstIndexConfig;
+import org.apache.pinot.segment.spi.index.IndexConfigDeserializer;
 import org.apache.pinot.segment.spi.index.IndexHandler;
-import org.apache.pinot.segment.spi.index.IndexReader;
+import org.apache.pinot.segment.spi.index.IndexReaderConstraintException;
 import org.apache.pinot.segment.spi.index.IndexReaderFactory;
 import org.apache.pinot.segment.spi.index.IndexType;
 import org.apache.pinot.segment.spi.index.StandardIndexes;
+import org.apache.pinot.segment.spi.index.creator.FSTIndexCreator;
+import org.apache.pinot.segment.spi.index.reader.TextIndexReader;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
-import org.apache.pinot.spi.config.table.IndexConfig;
+import org.apache.pinot.spi.config.table.FSTType;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.IndexingConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 
 
-public class FstIndexType implements IndexType<IndexConfig, IndexReader, 
IndexCreator> {
-  public static final FstIndexType INSTANCE = new FstIndexType();
+public class FstIndexType extends AbstractIndexType<FstIndexConfig, 
TextIndexReader, FSTIndexCreator>
+    implements ConfigurableFromIndexLoadingConfig<FstIndexConfig> {
 
-  private FstIndexType() {
+  protected FstIndexType() {
+    super(StandardIndexes.FST_ID);
   }
 
   @Override
-  public String getId() {
-    return StandardIndexes.FST_ID;
+  public Class<FstIndexConfig> getIndexConfigClass() {
+    return FstIndexConfig.class;
   }
 
   @Override
-  public Class<IndexConfig> getIndexConfigClass() {
-    return IndexConfig.class;
+  public Map<String, FstIndexConfig> fromIndexLoadingConfig(IndexLoadingConfig 
indexLoadingConfig) {
+    Map<String, FstIndexConfig> result = new HashMap<>();
+    Set<String> fstIndexColumns = indexLoadingConfig.getFSTIndexColumns();
+    for (String column : indexLoadingConfig.getAllKnownColumns()) {
+      if (fstIndexColumns.contains(column)) {
+        FstIndexConfig conf = new 
FstIndexConfig(indexLoadingConfig.getFSTIndexType());
+        result.put(column, conf);
+      } else {
+        result.put(column, FstIndexConfig.DISABLED);
+      }
+    }
+    return result;
   }
 
   @Override
-  public IndexConfig getDefaultConfig() {
-    return IndexConfig.DISABLED;
+  public FstIndexConfig getDefaultConfig() {
+    return FstIndexConfig.DISABLED;
   }
 
   @Override
-  public IndexConfig getConfig(TableConfig tableConfig, Schema schema) {
-    throw new UnsupportedOperationException();
+  public ColumnConfigDeserializer<FstIndexConfig> getDeserializer() {
+    return IndexConfigDeserializer.fromIndexes("fst", getIndexConfigClass())
+        .withExclusiveAlternative(IndexConfigDeserializer.fromIndexTypes(
+            FieldConfig.IndexType.FST,
+            (tableConfig, fieldConfig) -> {
+              IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+              FSTType fstIndexType = indexingConfig != null ? 
indexingConfig.getFSTIndexType() : null;
+              return new FstIndexConfig(fstIndexType);
+            }));
   }
 
   @Override
-  public IndexCreator createIndexCreator(IndexCreationContext context, 
IndexConfig indexConfig)
-      throws Exception {
-    throw new UnsupportedOperationException();
+  public FSTIndexCreator createIndexCreator(IndexCreationContext context, 
FstIndexConfig indexConfig)
+      throws IOException {
+    Preconditions.checkState(context.getFieldSpec().isSingleValueField(),
+        "FST index is currently only supported on single-value columns");
+    
Preconditions.checkState(context.getFieldSpec().getDataType().getStoredType() 
== FieldSpec.DataType.STRING,
+        "FST index is currently only supported on STRING type columns");
+    Preconditions.checkState(context.hasDictionary(),
+        "FST index is currently only supported on dictionary-encoded columns");
+    if (indexConfig.getFstType() == FSTType.NATIVE) {
+      return new NativeFSTIndexCreator(context);
+    } else {
+      return new LuceneFSTIndexCreator(context);
+    }
   }
 
   @Override
-  public IndexReaderFactory<IndexReader> getReaderFactory() {
-    throw new UnsupportedOperationException();
+  public ReaderFactory getReaderFactory() {
+    return ReaderFactory.INSTANCE;
+  }
+
+  public static TextIndexReader read(PinotDataBuffer dataBuffer, 
ColumnMetadata metadata)
+      throws IndexReaderConstraintException, IOException {
+    return ReaderFactory.INSTANCE.createIndexReader(dataBuffer, metadata);
   }
 
   @Override
   public IndexHandler createIndexHandler(SegmentDirectory segmentDirectory, 
Map<String, FieldIndexConfigs> configsByCol,
       @Nullable Schema schema, @Nullable TableConfig tableConfig) {
-    throw new UnsupportedOperationException();
+    return new FSTIndexHandler(segmentDirectory, configsByCol, tableConfig);
   }
 
   @Override
   public String getFileExtension(ColumnMetadata columnMetadata) {
     return V1Constants.Indexes.FST_INDEX_FILE_EXTENSION;
   }
 
-  @Override
-  public String toString() {
-    return getId();
+  public static class ReaderFactory extends 
IndexReaderFactory.Default<FstIndexConfig, TextIndexReader> {
+    public static final ReaderFactory INSTANCE = new ReaderFactory();
+    @Override
+    protected IndexType<FstIndexConfig, TextIndexReader, ?> getIndexType() {
+      return StandardIndexes.fst();
+    }
+
+    protected TextIndexReader createIndexReader(PinotDataBuffer dataBuffer, 
ColumnMetadata metadata)
+        throws IndexReaderConstraintException, IOException {
+      if (!metadata.hasDictionary()) {
+        throw new IndexReaderConstraintException(metadata.getColumnName(), 
StandardIndexes.fst(),
+            "This index requires a dictionary");

Review Comment:
   I didn't find this error msg in current logic. could you point me back to 
where this would be needed in the current logic?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/dictionary/DictionaryIndexType.java:
##########
@@ -19,74 +19,265 @@
 
 package org.apache.pinot.segment.local.segment.index.dictionary;
 
+import com.google.common.collect.Sets;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
+import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentDictionaryCreator;
+import 
org.apache.pinot.segment.local.segment.index.loader.ConfigurableFromIndexLoadingConfig;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import 
org.apache.pinot.segment.local.segment.index.readers.BigDecimalDictionary;
+import org.apache.pinot.segment.local.segment.index.readers.BytesDictionary;
+import org.apache.pinot.segment.local.segment.index.readers.DoubleDictionary;
+import org.apache.pinot.segment.local.segment.index.readers.FloatDictionary;
+import org.apache.pinot.segment.local.segment.index.readers.IntDictionary;
+import org.apache.pinot.segment.local.segment.index.readers.LongDictionary;
+import 
org.apache.pinot.segment.local.segment.index.readers.OnHeapBigDecimalDictionary;
+import 
org.apache.pinot.segment.local.segment.index.readers.OnHeapBytesDictionary;
+import 
org.apache.pinot.segment.local.segment.index.readers.OnHeapDoubleDictionary;
+import 
org.apache.pinot.segment.local.segment.index.readers.OnHeapFloatDictionary;
+import 
org.apache.pinot.segment.local.segment.index.readers.OnHeapIntDictionary;
+import 
org.apache.pinot.segment.local.segment.index.readers.OnHeapLongDictionary;
+import 
org.apache.pinot.segment.local.segment.index.readers.OnHeapStringDictionary;
+import org.apache.pinot.segment.local.segment.index.readers.StringDictionary;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.creator.ColumnStatistics;
 import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.index.AbstractIndexType;
+import org.apache.pinot.segment.spi.index.ColumnConfigDeserializer;
+import org.apache.pinot.segment.spi.index.DictionaryIndexConfig;
 import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
-import org.apache.pinot.segment.spi.index.IndexCreator;
+import org.apache.pinot.segment.spi.index.IndexConfigDeserializer;
 import org.apache.pinot.segment.spi.index.IndexHandler;
-import org.apache.pinot.segment.spi.index.IndexReader;
+import org.apache.pinot.segment.spi.index.IndexReaderConstraintException;
 import org.apache.pinot.segment.spi.index.IndexReaderFactory;
 import org.apache.pinot.segment.spi.index.IndexType;
 import org.apache.pinot.segment.spi.index.StandardIndexes;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
-import org.apache.pinot.spi.config.table.IndexConfig;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.IndexingConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
-public class DictionaryIndexType implements IndexType<IndexConfig, 
IndexReader, IndexCreator> {
-  public static final DictionaryIndexType INSTANCE = new DictionaryIndexType();
+public class DictionaryIndexType
+    extends AbstractIndexType<DictionaryIndexConfig, Dictionary, 
SegmentDictionaryCreator>
+    implements ConfigurableFromIndexLoadingConfig<DictionaryIndexConfig> {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DictionaryIndexType.class);
 
-  private DictionaryIndexType() {
+  protected DictionaryIndexType() {
+    super(StandardIndexes.DICTIONARY_ID);
   }
 
   @Override
-  public String getId() {
-    return StandardIndexes.DICTIONARY_ID;
+  public Class<DictionaryIndexConfig> getIndexConfigClass() {
+    return DictionaryIndexConfig.class;
   }
 
   @Override
-  public Class<IndexConfig> getIndexConfigClass() {
-    return IndexConfig.class;
+  public Map<String, DictionaryIndexConfig> fromIndexLoadingConfig(
+      IndexLoadingConfig indexLoadingConfig) {
+    Map<String, DictionaryIndexConfig> result = new HashMap<>();
+    Set<String> noDictionaryColumns = 
indexLoadingConfig.getNoDictionaryColumns();
+    Set<String> onHeapCols = indexLoadingConfig.getOnHeapDictionaryColumns();
+    Set<String> varLengthCols = 
indexLoadingConfig.getVarLengthDictionaryColumns();
+    for (String column : indexLoadingConfig.getAllKnownColumns()) {
+      if (noDictionaryColumns.contains(column)) {
+        result.put(column, DictionaryIndexConfig.disabled());
+      } else {
+        result.put(column, new 
DictionaryIndexConfig(onHeapCols.contains(column), 
varLengthCols.contains(column)));
+      }
+    }
+    return result;
   }
 
   @Override
-  public IndexConfig getDefaultConfig() {
-    return IndexConfig.DISABLED;
+  public DictionaryIndexConfig getDefaultConfig() {
+    return DictionaryIndexConfig.DEFAULT;
   }
 
   @Override
-  public IndexConfig getConfig(TableConfig tableConfig, Schema schema) {
-    throw new UnsupportedOperationException();
+  public ColumnConfigDeserializer<DictionaryIndexConfig> getDeserializer() {
+    // reads tableConfig.indexingConfig.noDictionaryConfig
+    ColumnConfigDeserializer<DictionaryIndexConfig> fromNoDictConf = 
IndexConfigDeserializer.fromMap(
+        tableConfig -> tableConfig.getIndexingConfig() == null ? 
Collections.emptyMap()
+            : tableConfig.getIndexingConfig().getNoDictionaryConfig(),
+        (accum, col, value) -> accum.put(col, 
DictionaryIndexConfig.disabled()));
+
+    // reads tableConfig.indexingConfig.noDictionaryColumns
+    ColumnConfigDeserializer<DictionaryIndexConfig> fromNoDictCol = 
IndexConfigDeserializer.fromCollection(
+        tableConfig -> tableConfig.getIndexingConfig() == null ? 
Collections.emptyList()
+            : tableConfig.getIndexingConfig().getNoDictionaryColumns(),
+        (accum, noDictionaryCol) -> accum.put(noDictionaryCol, 
DictionaryIndexConfig.disabled()));
+
+    // reads tableConfig.fieldConfigList.encodingType
+    ColumnConfigDeserializer<DictionaryIndexConfig> fromFieldConfigList = 
IndexConfigDeserializer.fromCollection(
+        TableConfig::getFieldConfigList,
+        (accum, fieldConfig) -> {
+          if (fieldConfig.getEncodingType() == FieldConfig.EncodingType.RAW) {
+            accum.put(fieldConfig.getName(), DictionaryIndexConfig.disabled());
+          }
+        });
+
+    // reads tableConfig.indexingConfig.onHeapDictionaryColumns and
+    // tableConfig.indexingConfig.varLengthDictionaryColumns
+    ColumnConfigDeserializer<DictionaryIndexConfig> fromIndexingConfig = 
(tableConfig, schema) -> {
+      IndexingConfig ic = tableConfig.getIndexingConfig();
+      if (ic == null) {
+        return Collections.emptyMap();
+      }
+      Set<String> onHeap = new HashSet<>(
+          ic.getOnHeapDictionaryColumns() == null ? Collections.emptyList() : 
ic.getOnHeapDictionaryColumns());
+      Set<String> varLength = new HashSet<>(
+          ic.getVarLengthDictionaryColumns() == null ? Collections.emptyList() 
: ic.getVarLengthDictionaryColumns()
+      );
+      Function<String, DictionaryIndexConfig> valueCalculator =
+          column -> new DictionaryIndexConfig(onHeap.contains(column), 
varLength.contains(column));
+      return Sets.union(onHeap, varLength).stream()
+          .collect(Collectors.toMap(Function.identity(), valueCalculator));
+    };
+
+    return fromNoDictConf
+        .withFallbackAlternative(fromNoDictCol)
+        .withFallbackAlternative(fromFieldConfigList)
+        .withFallbackAlternative(fromIndexingConfig)
+        .withExclusiveAlternative(IndexConfigDeserializer.fromIndexes(getId(), 
getIndexConfigClass()));
   }
 
   @Override
-  public IndexCreator createIndexCreator(IndexCreationContext context, 
IndexConfig indexConfig)
+  public SegmentDictionaryCreator createIndexCreator(IndexCreationContext 
context, DictionaryIndexConfig indexConfig) {
+    boolean useVarLengthDictionary = shouldUseVarLengthDictionary(context, 
indexConfig);
+    return new SegmentDictionaryCreator(context.getFieldSpec(), 
context.getIndexDir(), useVarLengthDictionary);
+  }
+
+  public boolean shouldUseVarLengthDictionary(IndexCreationContext context, 
DictionaryIndexConfig indexConfig) {
+    if (indexConfig.getUseVarLengthDictionary()) {
+      return true;
+    }
+    FieldSpec.DataType storedType = 
context.getFieldSpec().getDataType().getStoredType();
+    if (storedType != FieldSpec.DataType.BYTES && storedType != 
FieldSpec.DataType.BIG_DECIMAL) {
+      return false;
+    }
+    return !context.isFixedLength();
+  }
+
+  public static boolean shouldUseVarLengthDictionary(String columnName, 
Set<String> varLengthDictColumns,
+      FieldSpec.DataType columnStoredType, ColumnStatistics columnProfile) {
+    if (varLengthDictColumns.contains(columnName)) {
+      return true;
+    }
+
+    return shouldUseVarLengthDictionary(columnStoredType, columnProfile);
+  }
+
+  public static boolean shouldUseVarLengthDictionary(FieldSpec.DataType 
columnStoredType, ColumnStatistics profile) {
+    if (columnStoredType == FieldSpec.DataType.BYTES || columnStoredType == 
FieldSpec.DataType.BIG_DECIMAL) {
+      return !profile.isFixedLength();
+    }
+
+    return false;
+  }
+
+  public SegmentDictionaryCreator createIndexCreator(FieldSpec fieldSpec, File 
indexDir, boolean useVarLengthDictionary)
       throws Exception {
-    throw new UnsupportedOperationException();
+    return new SegmentDictionaryCreator(fieldSpec, indexDir, 
useVarLengthDictionary);
+  }
+
+  public static Dictionary read(SegmentDirectory.Reader segmentReader, 
ColumnMetadata columnMetadata)
+      throws IOException {
+    PinotDataBuffer dataBuffer =
+        segmentReader.getIndexFor(columnMetadata.getColumnName(), 
StandardIndexes.dictionary());
+    return read(dataBuffer, columnMetadata, new DictionaryIndexConfig(false, 
true));
+  }
+
+  public static Dictionary read(PinotDataBuffer dataBuffer, ColumnMetadata 
metadata, DictionaryIndexConfig indexConfig)
+      throws IOException {
+    FieldSpec.DataType dataType = metadata.getDataType();
+    boolean loadOnHeap = indexConfig.isOnHeap();
+    if (loadOnHeap) {
+      String columnName = metadata.getColumnName();
+      LOGGER.info("Loading on-heap dictionary for column: {}", columnName);
+    }
+
+    int length = metadata.getCardinality();
+    switch (dataType.getStoredType()) {
+      case INT:
+        return loadOnHeap ? new OnHeapIntDictionary(dataBuffer, length)
+            : new IntDictionary(dataBuffer, length);
+      case LONG:
+        return loadOnHeap ? new OnHeapLongDictionary(dataBuffer, length)
+            : new LongDictionary(dataBuffer, length);
+      case FLOAT:
+        return loadOnHeap ? new OnHeapFloatDictionary(dataBuffer, length)
+            : new FloatDictionary(dataBuffer, length);
+      case DOUBLE:
+        return loadOnHeap ? new OnHeapDoubleDictionary(dataBuffer, length)
+            : new DoubleDictionary(dataBuffer, length);
+      case BIG_DECIMAL:
+        int numBytesPerValue = metadata.getColumnMaxLength();
+        return loadOnHeap ? new OnHeapBigDecimalDictionary(dataBuffer, length, 
numBytesPerValue)
+            : new BigDecimalDictionary(dataBuffer, length, numBytesPerValue);
+      case STRING:
+        numBytesPerValue = metadata.getColumnMaxLength();
+        return loadOnHeap ? new OnHeapStringDictionary(dataBuffer, length, 
numBytesPerValue)
+            : new StringDictionary(dataBuffer, length, numBytesPerValue);
+      case BYTES:
+        numBytesPerValue = metadata.getColumnMaxLength();
+        return loadOnHeap ? new OnHeapBytesDictionary(dataBuffer, length, 
numBytesPerValue)
+            : new BytesDictionary(dataBuffer, length, numBytesPerValue);
+      default:
+        throw new IllegalStateException("Unsupported data type for dictionary: 
" + dataType);
+    }
   }
 
   @Override
-  public IndexReaderFactory<IndexReader> getReaderFactory() {
-    throw new UnsupportedOperationException();
+  public IndexReaderFactory<Dictionary> getReaderFactory() {
+    return ReaderFactory.INSTANCE;
   }
 
   @Override
   public IndexHandler createIndexHandler(SegmentDirectory segmentDirectory, 
Map<String, FieldIndexConfigs> configsByCol,
       @Nullable Schema schema, @Nullable TableConfig tableConfig) {
-    throw new UnsupportedOperationException();
+    return IndexHandler.NoOp.INSTANCE;
   }
 
-  @Override
-  public String getFileExtension(ColumnMetadata columnMetadata) {
+  public static String getFileExtension() {
     return V1Constants.Dict.FILE_EXTENSION;
   }
 
   @Override
-  public String toString() {
-    return getId();
+  public String getFileExtension(ColumnMetadata columnMetadata) {
+    return getFileExtension();
+  }
+
+  public static class ReaderFactory extends 
IndexReaderFactory.Default<DictionaryIndexConfig, Dictionary> {

Review Comment:
   is there need to make this inner class `public`? from a quick search in this 
PR, I didn't see direct use of it outside the class. Some get reference of it 
via getReaderFactory() above.
   
   Same question for the other ReaderFactory classes for other index types. 
Looks like they don't need to be `public`.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/inverted/InvertedIndexType.java:
##########
@@ -86,7 +130,46 @@ public String getFileExtension(ColumnMetadata 
columnMetadata) {
   }
 
   @Override
-  public String toString() {
-    return getId();
+  public IndexHandler createIndexHandler(SegmentDirectory segmentDirectory, 
Map<String, FieldIndexConfigs> configsByCol,
+      @Nullable Schema schema, @Nullable TableConfig tableConfig) {
+    return new InvertedIndexHandler(segmentDirectory, configsByCol, 
tableConfig);
+  }
+
+  public static class ReaderFactory implements 
IndexReaderFactory<InvertedIndexReader> {
+    public static final ReaderFactory INSTANCE = new ReaderFactory();
+
+    private ReaderFactory() {
+    }
+
+    @Override
+    public InvertedIndexReader createIndexReader(SegmentDirectory.Reader 
segmentReader,
+        FieldIndexConfigs fieldIndexConfigs, ColumnMetadata metadata)
+        throws IOException, IndexReaderConstraintException {
+      if (fieldIndexConfigs == null || 
!fieldIndexConfigs.getConfig(StandardIndexes.inverted()).isEnabled()) {
+        return null;
+      }
+      if (!metadata.hasDictionary()) {
+        throw new IllegalStateException("Column " + metadata.getColumnName() + 
" cannot be indexed by an inverted "
+            + "index if it has no dictionary");
+      }
+      if (metadata.isSorted() && metadata.isSingleValue()) {
+        ForwardIndexReader fwdReader = 
StandardIndexes.forward().getReaderFactory()
+            .createIndexReader(segmentReader, fieldIndexConfigs, metadata);
+        Preconditions.checkState(fwdReader instanceof SortedIndexReader);
+        return (SortedIndexReader) fwdReader;
+      } else {
+        return createSkippingForward(segmentReader, metadata);
+      }
+    }
+
+    public InvertedIndexReader createSkippingForward(SegmentDirectory.Reader 
segmentReader, ColumnMetadata metadata)

Review Comment:
   why the method is called create`SkippingForward`? looks like we're creating 
the ordinary bitmap-based inverted index here.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/BaseIndexHandler.java:
##########
@@ -40,14 +46,31 @@
  */
 public abstract class BaseIndexHandler implements IndexHandler {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(BaseIndexHandler.class);
-
-  protected final IndexLoadingConfig _indexLoadingConfig;
   protected final Set<String> _tmpForwardIndexColumns;
   protected final SegmentDirectory _segmentDirectory;
+  protected final Map<String, FieldIndexConfigs> _fieldIndexConfigs;
+  @Nullable
+  protected final TableConfig _tableConfig;
 
   public BaseIndexHandler(SegmentDirectory segmentDirectory, 
IndexLoadingConfig indexLoadingConfig) {
+    this(segmentDirectory, indexLoadingConfig.getFieldIndexConfigByColName(), 
indexLoadingConfig.getTableConfig());
+  }
+
+  public BaseIndexHandler(SegmentDirectory segmentDirectory, Map<String, 
FieldIndexConfigs> fieldIndexConfigs,
+      @Nullable TableConfig tableConfig) {
     _segmentDirectory = segmentDirectory;
-    _indexLoadingConfig = indexLoadingConfig;
+    SegmentMetadataImpl segmentMetadata = 
segmentDirectory.getSegmentMetadata();
+    if (fieldIndexConfigs.keySet().equals(segmentMetadata.getAllColumns())) {
+      _fieldIndexConfigs = fieldIndexConfigs;
+    } else {

Review Comment:
   save this else branch, and do `_fieldIndexConfigs.getOrDefault(..., 
FieldIndexConfigs.EMPTY)`? 



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/fst/FstIndexType.java:
##########
@@ -19,74 +19,152 @@
 
 package org.apache.pinot.segment.local.segment.index.fst;
 
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 import javax.annotation.Nullable;
+import 
org.apache.pinot.segment.local.segment.creator.impl.inv.text.LuceneFSTIndexCreator;
+import 
org.apache.pinot.segment.local.segment.index.loader.ConfigurableFromIndexLoadingConfig;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import 
org.apache.pinot.segment.local.segment.index.loader.invertedindex.FSTIndexHandler;
+import 
org.apache.pinot.segment.local.segment.index.readers.LuceneFSTIndexReader;
+import org.apache.pinot.segment.local.utils.nativefst.FSTHeader;
+import org.apache.pinot.segment.local.utils.nativefst.NativeFSTIndexCreator;
+import org.apache.pinot.segment.local.utils.nativefst.NativeFSTIndexReader;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.index.AbstractIndexType;
+import org.apache.pinot.segment.spi.index.ColumnConfigDeserializer;
 import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
-import org.apache.pinot.segment.spi.index.IndexCreator;
+import org.apache.pinot.segment.spi.index.FstIndexConfig;
+import org.apache.pinot.segment.spi.index.IndexConfigDeserializer;
 import org.apache.pinot.segment.spi.index.IndexHandler;
-import org.apache.pinot.segment.spi.index.IndexReader;
+import org.apache.pinot.segment.spi.index.IndexReaderConstraintException;
 import org.apache.pinot.segment.spi.index.IndexReaderFactory;
 import org.apache.pinot.segment.spi.index.IndexType;
 import org.apache.pinot.segment.spi.index.StandardIndexes;
+import org.apache.pinot.segment.spi.index.creator.FSTIndexCreator;
+import org.apache.pinot.segment.spi.index.reader.TextIndexReader;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
-import org.apache.pinot.spi.config.table.IndexConfig;
+import org.apache.pinot.spi.config.table.FSTType;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.IndexingConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 
 
-public class FstIndexType implements IndexType<IndexConfig, IndexReader, 
IndexCreator> {
-  public static final FstIndexType INSTANCE = new FstIndexType();
+public class FstIndexType extends AbstractIndexType<FstIndexConfig, 
TextIndexReader, FSTIndexCreator>
+    implements ConfigurableFromIndexLoadingConfig<FstIndexConfig> {
 
-  private FstIndexType() {
+  protected FstIndexType() {
+    super(StandardIndexes.FST_ID);
   }
 
   @Override
-  public String getId() {
-    return StandardIndexes.FST_ID;
+  public Class<FstIndexConfig> getIndexConfigClass() {
+    return FstIndexConfig.class;
   }
 
   @Override
-  public Class<IndexConfig> getIndexConfigClass() {
-    return IndexConfig.class;
+  public Map<String, FstIndexConfig> fromIndexLoadingConfig(IndexLoadingConfig 
indexLoadingConfig) {
+    Map<String, FstIndexConfig> result = new HashMap<>();
+    Set<String> fstIndexColumns = indexLoadingConfig.getFSTIndexColumns();
+    for (String column : indexLoadingConfig.getAllKnownColumns()) {
+      if (fstIndexColumns.contains(column)) {
+        FstIndexConfig conf = new 
FstIndexConfig(indexLoadingConfig.getFSTIndexType());

Review Comment:
   inline `conf = ..` with result.put(), or probably just shorten this if-else 
with ternary operator



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ConfigurableFromIndexLoadingConfig.java:
##########
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.segment.local.segment.index.loader;
+
+import java.util.Map;
+import org.apache.pinot.segment.spi.index.ColumnConfigDeserializer;
+import org.apache.pinot.spi.config.table.IndexConfig;
+
+
+/**
+ * This interface can be optionally implemented by {@link 
org.apache.pinot.segment.spi.index.IndexType index types} to
+ * indicate that they can extract their configuration from an older {@link 
IndexLoadingConfig} object.
+ */
+public interface ConfigurableFromIndexLoadingConfig<C extends IndexConfig> {
+
+
+  /**
+   * Returns a {@link ColumnConfigDeserializer} that can be used to 
deserialize the index config.

Review Comment:
   The method comment and the return type seem not consistent. 



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/InvertedIndexAndDictionaryBasedForwardIndexCreator.java:
##########
@@ -112,15 +113,13 @@ public class 
InvertedIndexAndDictionaryBasedForwardIndexCreator implements AutoC
   private PinotDataBuffer _forwardIndexMaxSizeBuffer;
 
   public InvertedIndexAndDictionaryBasedForwardIndexCreator(String columnName, 
SegmentDirectory segmentDirectory,
-      IndexLoadingConfig indexLoadingConfig, SegmentDirectory.Writer 
segmentWriter,
-      IndexCreatorProvider indexCreatorProvider, boolean 
isTemporaryForwardIndex)
+      boolean dictionaryEnabled, ForwardIndexConfig forwConf, 
SegmentDirectory.Writer segmentWriter,

Review Comment:
   nit: fwdConf



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java:
##########
@@ -102,6 +118,8 @@ public class IndexLoadingConfig {
 
   private String _instanceId;
   private Map<String, Map<String, String>> _instanceTierConfigs;
+  private boolean _dirty = true;

Review Comment:
   could you comment a bit why the need of _dirty flag?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java:
##########
@@ -455,12 +458,16 @@ private void 
rewriteRawMVForwardIndexForCompressionChange(String column, ColumnM
       int maxRowLengthInBytes =
           
MultiValueVarByteRawIndexCreator.getMaxRowDataLengthInBytes(lengthOfLongestEntry,
 maxNumberOfMVEntries);
 
-      IndexCreationContext.Forward context =
-          
IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(existingColMetadata)
-              
.withLengthOfLongestEntry(lengthOfLongestEntry).withMaxRowLengthInBytes(maxRowLengthInBytes).build()
-              .forForwardIndex(newCompressionType, 
_indexLoadingConfig.getColumnProperties());
+      IndexCreationContext context = IndexCreationContext.builder()

Review Comment:
   guess your IDE is using a different code style? but I'd assume linter should 
catch those. It seems like skipping those 'newline' changes. 



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java:
##########
@@ -264,51 +265,42 @@ private void 
createBloomFilterForColumn(SegmentDirectory.Writer segmentWriter, C
 
     if (!columnMetadata.hasDictionary()) {
       // Create a temporary forward index if it is disabled and does not exist
-      columnMetadata = createForwardIndexIfNeeded(segmentWriter, columnName, 
indexCreatorProvider, true);
+      columnMetadata = createForwardIndexIfNeeded(segmentWriter, columnName, 
true);
     }
 
     // Create new bloom filter for the column.
     BloomFilterConfig bloomFilterConfig = _bloomFilterConfigs.get(columnName);
     LOGGER.info("Creating new bloom filter for segment: {}, column: {} with 
config: {}", segmentName, columnName,
         bloomFilterConfig);
     if (columnMetadata.hasDictionary()) {
-      createAndSealBloomFilterForDictionaryColumn(bloomFilterCreatorProvider, 
indexDir, columnMetadata,
-          bloomFilterConfig, segmentWriter);
+      createAndSealBloomFilterForDictionaryColumn(indexDir, columnMetadata, 
bloomFilterConfig, segmentWriter);
     } else {
-      
createAndSealBloomFilterForNonDictionaryColumn(bloomFilterCreatorProvider, 
indexDir, columnMetadata,
-          bloomFilterConfig, segmentWriter);
+      createAndSealBloomFilterForNonDictionaryColumn(indexDir, columnMetadata, 
bloomFilterConfig, segmentWriter);
     }
 
     // For v3, write the generated bloom filter file into the single file and 
remove it.
     if (_segmentDirectory.getSegmentMetadata().getVersion() == 
SegmentVersion.v3) {
-      LoaderUtils.writeIndexToV3Format(segmentWriter, columnName, 
bloomFilterFile, ColumnIndexType.BLOOM_FILTER);
+      LoaderUtils.writeIndexToV3Format(segmentWriter, columnName, 
bloomFilterFile, StandardIndexes.bloomFilter());
     }
 
     // Delete the marker file.
     FileUtils.deleteQuietly(bloomFilterFileInProgress);
     LOGGER.info("Created bloom filter for segment: {}, column: {}", 
segmentName, columnName);
   }
 
-  private BaseImmutableDictionary getDictionaryReader(ColumnMetadata 
columnMetadata,
-      SegmentDirectory.Writer segmentWriter)
+  private Dictionary getDictionaryReader(ColumnMetadata columnMetadata, 
SegmentDirectory.Writer segmentWriter)
       throws IOException {
-    PinotDataBuffer dictionaryBuffer =
-        segmentWriter.getIndexFor(columnMetadata.getColumnName(), 
ColumnIndexType.DICTIONARY);
-    int cardinality = columnMetadata.getCardinality();
     DataType dataType = columnMetadata.getDataType();
+
     switch (dataType) {
       case INT:
-        return new IntDictionary(dictionaryBuffer, cardinality);
       case LONG:
-        return new LongDictionary(dictionaryBuffer, cardinality);
       case FLOAT:
-        return new FloatDictionary(dictionaryBuffer, cardinality);
       case DOUBLE:
-        return new DoubleDictionary(dictionaryBuffer, cardinality);
       case STRING:
-        return new StringDictionary(dictionaryBuffer, cardinality, 
columnMetadata.getColumnMaxLength());
       case BYTES:
-        return new BytesDictionary(dictionaryBuffer, cardinality, 
columnMetadata.getColumnMaxLength());
+        PinotDataBuffer buf = 
segmentWriter.getIndexFor(columnMetadata.getColumnName(), 
StandardIndexes.dictionary());
+        return DictionaryIndexType.read(buf, columnMetadata, new 
DictionaryIndexConfig(false, true));

Review Comment:
   define DictionaryIndexConfig.OFFHEAP constant for a bit more readability here



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java:
##########
@@ -104,20 +104,28 @@ public void process()
       }
 
       // Update single-column indices, like inverted index, json index etc.
-      IndexCreatorProvider indexCreatorProvider = 
IndexingOverrides.getIndexCreatorProvider();
+
+      // ForwardIndexHandler may modify the segment metadata while rewriting 
forward index to create a dictionary
+      // This new metadata is needed to construct other indexes like 
RangeIndex.

Review Comment:
   broken comment? 



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/nullvalue/NullValueIndexType.java:
##########
@@ -60,34 +66,56 @@ public IndexConfig getDefaultConfig() {
   }
 
   @Override
-  public IndexConfig getConfig(TableConfig tableConfig, Schema schema) {
-    throw new UnsupportedOperationException();
+  public ColumnConfigDeserializer<IndexConfig> getDeserializer() {
+    return IndexConfigDeserializer.fromIndexes("null", getIndexConfigClass())
+        .withFallbackAlternative(
+            IndexConfigDeserializer.ifIndexingConfig(
+                IndexConfigDeserializer.alwaysCall((TableConfig tableConfig, 
Schema schema) ->
+                  tableConfig.getIndexingConfig().isNullHandlingEnabled()
+                      ? IndexConfig.ENABLED
+                      : IndexConfig.DISABLED))

Review Comment:
   unnecessary newline missed by linter



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentDictionaryCreator.java:
##########
@@ -62,9 +73,20 @@ public class SegmentDictionaryCreator implements Closeable {
   public SegmentDictionaryCreator(FieldSpec fieldSpec, File indexDir, boolean 
useVarLengthDictionary) {
     _columnName = fieldSpec.getName();
     _storedType = fieldSpec.getDataType().getStoredType();
-    _dictionaryFile = new File(indexDir, _columnName + 
V1Constants.Dict.FILE_EXTENSION);
+    _dictionaryFile = new File(indexDir, _columnName + 
DictionaryIndexType.getFileExtension());
     _useVarLengthDictionary = useVarLengthDictionary;
   }
+  @Override
+  public void add(@Nonnull Object value, int dictId)
+      throws IOException {
+    throw new UnsupportedOperationException("Dictionaries should not be build 
as a normal index");

Review Comment:
   s/build/built
   
   found same typo in a few other classes, you can grep and replace. 



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/inverted/InvertedIndexType.java:
##########
@@ -59,25 +79,49 @@ public IndexConfig getDefaultConfig() {
   }
 
   @Override
-  public IndexConfig getConfig(TableConfig tableConfig, Schema schema) {
-    throw new UnsupportedOperationException();
+  public ColumnConfigDeserializer<IndexConfig> getDeserializer() {
+    ColumnConfigDeserializer<IndexConfig> fromInvertedCols = 
IndexConfigDeserializer.fromCollection(
+        tableConfig -> 
tableConfig.getIndexingConfig().getInvertedIndexColumns(),
+        (acum, column) -> acum.put(column, IndexConfig.ENABLED));
+    return IndexConfigDeserializer.fromIndexes("inverted", 
getIndexConfigClass())
+        
.withExclusiveAlternative(IndexConfigDeserializer.ifIndexingConfig(fromInvertedCols));
+  }
+
+  public DictionaryBasedInvertedIndexCreator 
createIndexCreator(IndexCreationContext context)
+      throws IOException {
+    if (context.isOnHeap()) {
+      return new OnHeapBitmapInvertedIndexCreator(context.getIndexDir(), 
context.getFieldSpec().getName(),
+          context.getCardinality());
+    } else {
+      return new OffHeapBitmapInvertedIndexCreator(context.getIndexDir(), 
context.getFieldSpec(),
+          context.getCardinality(), context.getTotalDocs(), 
context.getTotalNumberOfEntries());
+    }
   }
 
   @Override
-  public IndexCreator createIndexCreator(IndexCreationContext context, 
IndexConfig indexConfig)
-      throws Exception {
-    throw new UnsupportedOperationException();
+  public DictionaryBasedInvertedIndexCreator 
createIndexCreator(IndexCreationContext context,
+      IndexConfig indexConfig)
+      throws IOException {
+    return createIndexCreator(context);
   }
 
   @Override
-  public IndexReaderFactory<IndexReader> getReaderFactory() {
-    throw new UnsupportedOperationException();
+  public IndexReaderFactory<InvertedIndexReader> getReaderFactory() {
+    return ReaderFactory.INSTANCE;
   }
 
   @Override
-  public IndexHandler createIndexHandler(SegmentDirectory segmentDirectory, 
Map<String, FieldIndexConfigs> configsByCol,
-      @Nullable Schema schema, @Nullable TableConfig tableConfig) {
-    throw new UnsupportedOperationException();
+  @Nullable
+  public InvertedIndexReader getIndexReader(ColumnIndexContainer 
indexContainer) {
+    InvertedIndexReader reader = super.getIndexReader(indexContainer);
+    if (reader != null) {
+      return reader;
+    }
+    ForwardIndexReader fwdReader = 
indexContainer.getIndex(StandardIndexes.forward());
+    if (fwdReader != null && fwdReader instanceof SortedIndexReader) {

Review Comment:
   just do the `instanceof` check? as it does null check too. and probably 
shorten here with ternary operator



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to