klsince commented on code in PR #10184:
URL: https://github.com/apache/pinot/pull/10184#discussion_r1152440038


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/dictionary/DictionaryIndexType.java:
##########
@@ -19,74 +19,265 @@
 
 package org.apache.pinot.segment.local.segment.index.dictionary;
 
+import com.google.common.collect.Sets;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
+import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentDictionaryCreator;
+import 
org.apache.pinot.segment.local.segment.index.loader.ConfigurableFromIndexLoadingConfig;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import 
org.apache.pinot.segment.local.segment.index.readers.BigDecimalDictionary;
+import org.apache.pinot.segment.local.segment.index.readers.BytesDictionary;
+import org.apache.pinot.segment.local.segment.index.readers.DoubleDictionary;
+import org.apache.pinot.segment.local.segment.index.readers.FloatDictionary;
+import org.apache.pinot.segment.local.segment.index.readers.IntDictionary;
+import org.apache.pinot.segment.local.segment.index.readers.LongDictionary;
+import 
org.apache.pinot.segment.local.segment.index.readers.OnHeapBigDecimalDictionary;
+import 
org.apache.pinot.segment.local.segment.index.readers.OnHeapBytesDictionary;
+import 
org.apache.pinot.segment.local.segment.index.readers.OnHeapDoubleDictionary;
+import 
org.apache.pinot.segment.local.segment.index.readers.OnHeapFloatDictionary;
+import 
org.apache.pinot.segment.local.segment.index.readers.OnHeapIntDictionary;
+import 
org.apache.pinot.segment.local.segment.index.readers.OnHeapLongDictionary;
+import 
org.apache.pinot.segment.local.segment.index.readers.OnHeapStringDictionary;
+import org.apache.pinot.segment.local.segment.index.readers.StringDictionary;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.creator.ColumnStatistics;
 import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.index.AbstractIndexType;
+import org.apache.pinot.segment.spi.index.ColumnConfigDeserializer;
+import org.apache.pinot.segment.spi.index.DictionaryIndexConfig;
 import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
-import org.apache.pinot.segment.spi.index.IndexCreator;
+import org.apache.pinot.segment.spi.index.IndexConfigDeserializer;
 import org.apache.pinot.segment.spi.index.IndexHandler;
-import org.apache.pinot.segment.spi.index.IndexReader;
+import org.apache.pinot.segment.spi.index.IndexReaderConstraintException;
 import org.apache.pinot.segment.spi.index.IndexReaderFactory;
 import org.apache.pinot.segment.spi.index.IndexType;
 import org.apache.pinot.segment.spi.index.StandardIndexes;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
-import org.apache.pinot.spi.config.table.IndexConfig;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.IndexingConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
-public class DictionaryIndexType implements IndexType<IndexConfig, 
IndexReader, IndexCreator> {
-  public static final DictionaryIndexType INSTANCE = new DictionaryIndexType();
+public class DictionaryIndexType
+    extends AbstractIndexType<DictionaryIndexConfig, Dictionary, 
SegmentDictionaryCreator>
+    implements ConfigurableFromIndexLoadingConfig<DictionaryIndexConfig> {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DictionaryIndexType.class);
 
-  private DictionaryIndexType() {
+  protected DictionaryIndexType() {
+    super(StandardIndexes.DICTIONARY_ID);
   }
 
   @Override
-  public String getId() {
-    return StandardIndexes.DICTIONARY_ID;
+  public Class<DictionaryIndexConfig> getIndexConfigClass() {
+    return DictionaryIndexConfig.class;
   }
 
   @Override
-  public Class<IndexConfig> getIndexConfigClass() {
-    return IndexConfig.class;
+  public Map<String, DictionaryIndexConfig> fromIndexLoadingConfig(
+      IndexLoadingConfig indexLoadingConfig) {
+    Map<String, DictionaryIndexConfig> result = new HashMap<>();
+    Set<String> noDictionaryColumns = 
indexLoadingConfig.getNoDictionaryColumns();

Review Comment:
   s/noDictionaryColumns/noDictionaryCols to be consistent with the other two



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java:
##########
@@ -379,340 +327,96 @@ private boolean 
shouldCreateDictionaryWithinThreshold(ColumnIndexCreationInfo in
   }
 
   /**
-   * Helper method that returns compression type to use based on segment 
creation spec and field type.
-   * <ul>
-   *   <li> Returns compression type from segment creation spec, if specified 
there.</li>
-   *   <li> Else, returns PASS_THROUGH for metrics, and SNAPPY for dimensions. 
This is because metrics are likely
-   *        to be spread in different chunks after applying predicates. Same 
could be true for dimensions, but in that
-   *        case, clients are expected to explicitly specify the appropriate 
compression type in the spec. </li>
-   * </ul>
-   * @param segmentCreationSpec Segment creation spec
-   * @param fieldSpec Field spec for the column
-   * @return Compression type to use
+   * @deprecated use {@link 
ForwardIndexType#getDefaultCompressionType(FieldType)} instead
    */
-  private ChunkCompressionType getColumnCompressionType(SegmentGeneratorConfig 
segmentCreationSpec,
-      FieldSpec fieldSpec) {
-    ChunkCompressionType compressionType = 
segmentCreationSpec.getRawIndexCompressionType().get(fieldSpec.getName());
-    if (compressionType == null) {
-      compressionType = getDefaultCompressionType(fieldSpec.getFieldType());
-    }
-
-    return compressionType;
-  }
-
+  @Deprecated
   public static ChunkCompressionType getDefaultCompressionType(FieldType 
fieldType) {
-    if (fieldType == FieldSpec.FieldType.METRIC) {
-      return ChunkCompressionType.PASS_THROUGH;
-    } else {
-      return ChunkCompressionType.LZ4;
-    }
+    return ForwardIndexType.getDefaultCompressionType(fieldType);
   }
 
   @Override
   public void indexRow(GenericRow row)
       throws IOException {
-    for (Map.Entry<String, ForwardIndexCreator> entry : 
_forwardIndexCreatorMap.entrySet()) {
-      String columnName = entry.getKey();
-      ForwardIndexCreator forwardIndexCreator = entry.getValue();
+    for (Map.Entry<String, Map<IndexType<?, ?, ?>, IndexCreator>> byColEntry : 
_creatorsByColAndIndex.entrySet()) {
+      String columnName = byColEntry.getKey();
 
       Object columnValueToIndex = row.getValue(columnName);
       if (columnValueToIndex == null) {
         throw new RuntimeException("Null value for column:" + columnName);
       }
 
-      FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName);
+      Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex = 
byColEntry.getValue();
 
-      //get dictionaryCreator, will be null if column is not dictionaryEncoded
+      FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName);
       SegmentDictionaryCreator dictionaryCreator = 
_dictionaryCreatorMap.get(columnName);
 
-      // bloom filter
-      BloomFilterCreator bloomFilterCreator = 
_bloomFilterCreatorMap.get(columnName);
-      if (bloomFilterCreator != null) {
-        if (fieldSpec.isSingleValueField()) {
-          if (fieldSpec.getDataType() == DataType.BYTES) {
-            bloomFilterCreator.add(BytesUtils.toHexString((byte[]) 
columnValueToIndex));
-          } else {
-            bloomFilterCreator.add(columnValueToIndex.toString());
-          }
-        } else {
-          Object[] values = (Object[]) columnValueToIndex;
-          if (fieldSpec.getDataType() == DataType.BYTES) {
-            for (Object value : values) {
-              bloomFilterCreator.add(BytesUtils.toHexString((byte[]) value));
-            }
-          } else {
-            for (Object value : values) {
-              bloomFilterCreator.add(value.toString());
-            }
-          }
-        }
-      }
-
-      // range index
-      CombinedInvertedIndexCreator combinedInvertedIndexCreator = 
_rangeIndexFilterCreatorMap.get(columnName);
-      if (combinedInvertedIndexCreator != null) {
-        if (dictionaryCreator != null) {
-          if (fieldSpec.isSingleValueField()) {
-            
combinedInvertedIndexCreator.add(dictionaryCreator.indexOfSV(columnValueToIndex));
-          } else {
-            int[] dictIds = dictionaryCreator.indexOfMV(columnValueToIndex);
-            combinedInvertedIndexCreator.add(dictIds, dictIds.length);
-          }
-        } else {
-          if (fieldSpec.isSingleValueField()) {
-            switch (fieldSpec.getDataType()) {
-              case INT:
-                combinedInvertedIndexCreator.add((Integer) columnValueToIndex);
-                break;
-              case LONG:
-                combinedInvertedIndexCreator.add((Long) columnValueToIndex);
-                break;
-              case FLOAT:
-                combinedInvertedIndexCreator.add((Float) columnValueToIndex);
-                break;
-              case DOUBLE:
-                combinedInvertedIndexCreator.add((Double) columnValueToIndex);
-                break;
-              default:
-                throw new RuntimeException("Unsupported data type " + 
fieldSpec.getDataType() + " for range index");
-            }
-          } else {
-            Object[] values = (Object[]) columnValueToIndex;
-            switch (fieldSpec.getDataType()) {
-              case INT:
-                int[] intValues = new int[values.length];
-                for (int i = 0; i < values.length; i++) {
-                  intValues[i] = (Integer) values[i];
-                }
-                combinedInvertedIndexCreator.add(intValues, values.length);
-                break;
-              case LONG:
-                long[] longValues = new long[values.length];
-                for (int i = 0; i < values.length; i++) {
-                  longValues[i] = (Long) values[i];
-                }
-                combinedInvertedIndexCreator.add(longValues, values.length);
-                break;
-              case FLOAT:
-                float[] floatValues = new float[values.length];
-                for (int i = 0; i < values.length; i++) {
-                  floatValues[i] = (Float) values[i];
-                }
-                combinedInvertedIndexCreator.add(floatValues, values.length);
-                break;
-              case DOUBLE:
-                double[] doubleValues = new double[values.length];
-                for (int i = 0; i < values.length; i++) {
-                  doubleValues[i] = (Double) values[i];
-                }
-                combinedInvertedIndexCreator.add(doubleValues, values.length);
-                break;
-              default:
-                throw new RuntimeException("Unsupported data type " + 
fieldSpec.getDataType() + " for range index");
-            }
-          }
-        }
-      }
-
-      // text-index
-      TextIndexCreator textIndexCreator = _textIndexCreatorMap.get(columnName);
-      if (textIndexCreator != null) {
-        if (fieldSpec.isSingleValueField()) {
-          textIndexCreator.add((String) columnValueToIndex);
-        } else {
-          Object[] values = (Object[]) columnValueToIndex;
-          int length = values.length;
-          if (values instanceof String[]) {
-            textIndexCreator.add((String[]) values, length);
-          } else {
-            String[] strings = new String[length];
-            for (int i = 0; i < length; i++) {
-              strings[i] = (String) values[i];
-            }
-            textIndexCreator.add(strings, length);
-            columnValueToIndex = strings;
-          }
-        }
-      }
-
       if (fieldSpec.isSingleValueField()) {
-        // Single Value column
-        JsonIndexCreator jsonIndexCreator = 
_jsonIndexCreatorMap.get(columnName);
-        if (jsonIndexCreator != null) {
-          jsonIndexCreator.add((String) columnValueToIndex);
-        }
-        GeoSpatialIndexCreator h3IndexCreator = 
_h3IndexCreatorMap.get(columnName);
-        if (h3IndexCreator != null) {
-          h3IndexCreator.add(GeometrySerializer.deserialize((byte[]) 
columnValueToIndex));
-        }
-        if (dictionaryCreator != null) {
-          // dictionary encoded SV column
-          // get dictID from dictionary
-          int dictId = dictionaryCreator.indexOfSV(columnValueToIndex);
-          // store the docID -> dictID mapping in forward index
-          if (forwardIndexCreator != null) {
-            forwardIndexCreator.putDictId(dictId);
-          }
-          DictionaryBasedInvertedIndexCreator invertedIndexCreator = 
_invertedIndexCreatorMap.get(columnName);
-          if (invertedIndexCreator != null) {
-            // if inverted index enabled during segment creation,
-            // then store dictID -> docID mapping in inverted index
-            invertedIndexCreator.add(dictId);
-          }
-        } else {
-          // non-dictionary encoded SV column
-          // store the docId -> raw value mapping in forward index
-          if (textIndexCreator != null && 
!shouldStoreRawValueForTextIndex(columnName)) {
-            // for text index on raw columns, check the config to determine if 
actual raw value should
-            // be stored or not
-            columnValueToIndex = 
_columnProperties.get(columnName).get(FieldConfig.TEXT_INDEX_RAW_VALUE);
-            if (columnValueToIndex == null) {
-              columnValueToIndex = FieldConfig.TEXT_INDEX_DEFAULT_RAW_VALUE;
-            }
-          }
-          if (forwardIndexCreator != null) {
-            switch (forwardIndexCreator.getValueType()) {
-              case INT:
-                forwardIndexCreator.putInt((int) columnValueToIndex);
-                break;
-              case LONG:
-                forwardIndexCreator.putLong((long) columnValueToIndex);
-                break;
-              case FLOAT:
-                forwardIndexCreator.putFloat((float) columnValueToIndex);
-                break;
-              case DOUBLE:
-                forwardIndexCreator.putDouble((double) columnValueToIndex);
-                break;
-              case BIG_DECIMAL:
-                forwardIndexCreator.putBigDecimal((BigDecimal) 
columnValueToIndex);
-                break;
-              case STRING:
-                forwardIndexCreator.putString((String) columnValueToIndex);
-                break;
-              case BYTES:
-                forwardIndexCreator.putBytes((byte[]) columnValueToIndex);
-                break;
-              case JSON:
-                if (columnValueToIndex instanceof String) {
-                  forwardIndexCreator.putString((String) columnValueToIndex);
-                } else if (columnValueToIndex instanceof byte[]) {
-                  forwardIndexCreator.putBytes((byte[]) columnValueToIndex);
-                }
-                break;
-              default:
-                throw new IllegalStateException();
-            }
-          }
-        }
+        indexSingleValueRow(dictionaryCreator, columnValueToIndex, 
creatorsByIndex);
       } else {
-        if (dictionaryCreator != null) {
-          //dictionary encoded
-          int[] dictIds = dictionaryCreator.indexOfMV(columnValueToIndex);
-          if (forwardIndexCreator != null) {
-            forwardIndexCreator.putDictIdMV(dictIds);
-          }
-          DictionaryBasedInvertedIndexCreator invertedIndexCreator = 
_invertedIndexCreatorMap.get(columnName);
-          if (invertedIndexCreator != null) {
-            invertedIndexCreator.add(dictIds, dictIds.length);
-          }
-        } else {
-          // for text index on raw columns, check the config to determine if 
actual raw value should
-          // be stored or not
-          if (forwardIndexCreator != null) {
-            if (textIndexCreator != null && 
!shouldStoreRawValueForTextIndex(columnName)) {
-              Object value = 
_columnProperties.get(columnName).get(FieldConfig.TEXT_INDEX_RAW_VALUE);
-              if (value == null) {
-                value = FieldConfig.TEXT_INDEX_DEFAULT_RAW_VALUE;
-              }
-              if (forwardIndexCreator.getValueType().getStoredType() == 
DataType.STRING) {
-                columnValueToIndex = new String[]{String.valueOf(value)};
-              } else if (forwardIndexCreator.getValueType().getStoredType() == 
DataType.BYTES) {
-                columnValueToIndex = new 
byte[][]{String.valueOf(value).getBytes(UTF_8)};
-              } else {
-                throw new RuntimeException("Text Index is only supported for 
STRING and BYTES stored type");
-              }
-            }
-            Object[] values = (Object[]) columnValueToIndex;
-            int length = values.length;
-            switch (forwardIndexCreator.getValueType()) {
-              case INT:
-                int[] ints = new int[length];
-                for (int i = 0; i < length; i++) {
-                  ints[i] = (Integer) values[i];
-                }
-                forwardIndexCreator.putIntMV(ints);
-                break;
-              case LONG:
-                long[] longs = new long[length];
-                for (int i = 0; i < length; i++) {
-                  longs[i] = (Long) values[i];
-                }
-                forwardIndexCreator.putLongMV(longs);
-                break;
-              case FLOAT:
-                float[] floats = new float[length];
-                for (int i = 0; i < length; i++) {
-                  floats[i] = (Float) values[i];
-                }
-                forwardIndexCreator.putFloatMV(floats);
-                break;
-              case DOUBLE:
-                double[] doubles = new double[length];
-                for (int i = 0; i < length; i++) {
-                  doubles[i] = (Double) values[i];
-                }
-                forwardIndexCreator.putDoubleMV(doubles);
-                break;
-              case STRING:
-                if (values instanceof String[]) {
-                  forwardIndexCreator.putStringMV((String[]) values);
-                } else {
-                  String[] strings = new String[length];
-                  for (int i = 0; i < length; i++) {
-                    strings[i] = (String) values[i];
-                  }
-                  forwardIndexCreator.putStringMV(strings);
-                }
-                break;
-              case BYTES:
-                if (values instanceof byte[][]) {
-                  forwardIndexCreator.putBytesMV((byte[][]) values);
-                } else {
-                  byte[][] bytesArray = new byte[length][];
-                  for (int i = 0; i < length; i++) {
-                    bytesArray[i] = (byte[]) values[i];
-                  }
-                  forwardIndexCreator.putBytesMV(bytesArray);
-                }
-                break;
-              default:
-                throw new IllegalStateException();
-            }
-          }
-        }
+        indexMultiValueRow(dictionaryCreator, (Object[]) columnValueToIndex, 
creatorsByIndex);
       }
+    }
 
-      if (_nullHandlingEnabled) {
+    if (_nullHandlingEnabled) {
+      for (Map.Entry<String, NullValueVectorCreator> entry : 
_nullValueVectorCreatorMap.entrySet()) {
+        String columnName = entry.getKey();
         // If row has null value for given column name, add to null value 
vector
         if (row.isNullValue(columnName)) {
           _nullValueVectorCreatorMap.get(columnName).setNull(_docIdCounter);
         }
       }
     }
+
     _docIdCounter++;
   }
 
-  private boolean shouldStoreRawValueForTextIndex(String column) {
-    if (_columnProperties != null) {
-      Map<String, String> props = _columnProperties.get(column);
-      // by default always store the raw value
-      // if the config is set to true, don't store the actual raw value
-      // there will be a dummy value
-      return props == null || 
!Boolean.parseBoolean(props.get(FieldConfig.TEXT_INDEX_NO_RAW_DATA));
+  private void indexSingleValueRow(SegmentDictionaryCreator dictionaryCreator, 
Object value,
+      Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex)
+      throws IOException {
+    int dictId = dictionaryCreator != null ? 
dictionaryCreator.indexOfSV(value) : -1;
+    for (IndexCreator creator : creatorsByIndex.values()) {
+      creator.add(value, dictId);
+    }
+  }
+
+  private void indexMultiValueRow(SegmentDictionaryCreator dictionaryCreator, 
Object[] values,
+      Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex)
+      throws IOException {
+    int[] dictId = dictionaryCreator != null ? 
dictionaryCreator.indexOfMV(values) : null;
+    for (IndexCreator creator : creatorsByIndex.values()) {
+      creator.add(values, dictId);
+    }
+  }
+
+  @Nullable
+  private Object calculateRawValueForTextIndex(boolean dictEnabledColumn, 
FieldIndexConfigs configs,
+      FieldSpec fieldSpec) {
+    if (dictEnabledColumn) {
+      return null;
     }
+    TextIndexConfig textConfig = configs.getConfig(StandardIndexes.text());
+    if (!textConfig.isEnabled()) {
+      return null;
+    }
+
+    Object alternativeValue = textConfig.getRawValueForTextIndex();

Review Comment:
   s/alternativeValue/rawValue?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java:
##########
@@ -132,41 +144,42 @@ public void updateIndices(SegmentDirectory.Writer 
segmentWriter, IndexCreatorPro
             _tmpForwardIndexColumns.add(column);
             break;
           case ENABLE_FORWARD_INDEX:
-            ColumnMetadata columnMetadata = 
createForwardIndexIfNeeded(segmentWriter, column, indexCreatorProvider,
+            ColumnMetadata columnMetadata = 
createForwardIndexIfNeeded(segmentWriter, column,

Review Comment:
   format: fold `false` up to L147



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java:
##########
@@ -444,26 +526,31 @@ public void setColumnProperties(Map<String, Map<String, 
String>> columnPropertie
   @VisibleForTesting
   public void setInvertedIndexColumns(Set<String> invertedIndexColumns) {
     _invertedIndexColumns = new HashSet<>(invertedIndexColumns);
+    _dirty = true;
   }
 
   @VisibleForTesting
   public void addInvertedIndexColumns(String... invertedIndexColumns) {
     _invertedIndexColumns.addAll(Arrays.asList(invertedIndexColumns));
+    _dirty = true;
   }
 
   @VisibleForTesting
   public void addInvertedIndexColumns(Collection<String> invertedIndexColumns) 
{
     _invertedIndexColumns.addAll(invertedIndexColumns);
+    _dirty = true;
   }
 
   @VisibleForTesting
   public void removeInvertedIndexColumns(String... invertedIndexColumns) {
     removeInvertedIndexColumns(Arrays.asList(invertedIndexColumns));
+    assert _dirty;

Review Comment:
   just to be sure if this assert is added intentionally, as it seems always 
able to pass after calling removeInvertedIndexColumns(Collection...)



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/InvertedIndexAndDictionaryBasedForwardIndexCreator.java:
##########
@@ -266,8 +247,9 @@ public void regenerateForwardIndex()
   private Map<String, String> createForwardIndexForSVColumn()
       throws IOException {
     try (BitmapInvertedIndexReader invertedIndexReader =
-        (BitmapInvertedIndexReader) 
LoaderUtils.getInvertedIndexReader(_segmentWriter, _columnMetadata);
-        Dictionary dictionary = LoaderUtils.getDictionary(_segmentWriter, 
_columnMetadata)) {
+        (BitmapInvertedIndexReader) InvertedIndexType.ReaderFactory

Review Comment:
   use StandardIndexes.inverted() and .dictionary() to get their index reader? 
and same for another place a few lines below.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/text/TextIndexType.java:
##########
@@ -19,75 +19,178 @@
 
 package org.apache.pinot.segment.local.segment.index.text;
 
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
+import 
org.apache.pinot.segment.local.segment.creator.impl.text.LuceneTextIndexCreator;
+import 
org.apache.pinot.segment.local.segment.creator.impl.text.NativeTextIndexCreator;
+import 
org.apache.pinot.segment.local.segment.index.loader.ConfigurableFromIndexLoadingConfig;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import 
org.apache.pinot.segment.local.segment.index.loader.invertedindex.TextIndexHandler;
+import 
org.apache.pinot.segment.local.segment.index.readers.text.LuceneTextIndexReader;
+import 
org.apache.pinot.segment.local.segment.index.readers.text.NativeTextIndexReader;
+import org.apache.pinot.segment.local.segment.store.TextIndexUtils;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.index.AbstractIndexType;
+import org.apache.pinot.segment.spi.index.ColumnConfigDeserializer;
 import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
-import org.apache.pinot.segment.spi.index.IndexCreator;
+import org.apache.pinot.segment.spi.index.IndexConfigDeserializer;
 import org.apache.pinot.segment.spi.index.IndexHandler;
-import org.apache.pinot.segment.spi.index.IndexReader;
+import org.apache.pinot.segment.spi.index.IndexReaderConstraintException;
 import org.apache.pinot.segment.spi.index.IndexReaderFactory;
-import org.apache.pinot.segment.spi.index.IndexType;
 import org.apache.pinot.segment.spi.index.StandardIndexes;
+import org.apache.pinot.segment.spi.index.TextIndexConfig;
+import org.apache.pinot.segment.spi.index.creator.TextIndexCreator;
+import org.apache.pinot.segment.spi.index.reader.TextIndexReader;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
-import org.apache.pinot.spi.config.table.IndexConfig;
+import org.apache.pinot.spi.config.table.FSTType;
+import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
-public class TextIndexType implements IndexType<IndexConfig, IndexReader, 
IndexCreator> {
+public class TextIndexType extends AbstractIndexType<TextIndexConfig, 
TextIndexReader, TextIndexCreator>
+    implements ConfigurableFromIndexLoadingConfig<TextIndexConfig> {
+  protected static final Logger LOGGER = 
LoggerFactory.getLogger(TextIndexType.class);
 
-  public static final TextIndexType INSTANCE = new TextIndexType();
+  protected TextIndexType() {
+    super(StandardIndexes.TEXT_ID);
+  }
 
-  private TextIndexType() {
+  @Override
+  public Class<TextIndexConfig> getIndexConfigClass() {
+    return TextIndexConfig.class;
   }
 
   @Override
-  public String getId() {
-    return StandardIndexes.TEXT_ID;
+  public Map<String, TextIndexConfig> 
fromIndexLoadingConfig(IndexLoadingConfig indexLoadingConfig) {
+    Map<String, Map<String, String>> allColProps = 
indexLoadingConfig.getColumnProperties();
+    return 
indexLoadingConfig.getTextIndexColumns().stream().collect(Collectors.toMap(
+        Function.identity(),
+        colName -> new 
TextIndexConfigBuilder(indexLoadingConfig.getFSTIndexType())
+            .withProperties(allColProps.get(colName))
+            .build()
+    ));
   }
 
   @Override
-  public Class<IndexConfig> getIndexConfigClass() {
-    return IndexConfig.class;
+  public TextIndexConfig getDefaultConfig() {
+    return TextIndexConfig.DISABLED;
   }
 
   @Override
-  public IndexConfig getDefaultConfig() {
-    return IndexConfig.DISABLED;
+  public ColumnConfigDeserializer<TextIndexConfig> getDeserializer() {
+    return IndexConfigDeserializer.fromIndexes("text", getIndexConfigClass())
+        .withExclusiveAlternative((tableConfig, schema) -> {
+          List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
+          if (fieldConfigList == null) {
+            return Collections.emptyMap();
+          }
+          // TODO(index-spi): This doesn't feel right, but it is here to keep 
backward compatibility.
+          //  If there are two text indexes in two different columns and one 
explicitly specifies LUCENE and the but
+          //  the other specifies native, both are created as native. No error 
is thrown and no log is shown.

Review Comment:
   since you've added a warning log for this now, perhaps rephrase the last 
sentence a bit. 



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java:
##########
@@ -19,60 +19,183 @@
 
 package org.apache.pinot.segment.local.segment.index.forward;
 
+import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
 import javax.annotation.Nullable;
+import 
org.apache.pinot.segment.local.segment.index.loader.ConfigurableFromIndexLoadingConfig;
+import org.apache.pinot.segment.local.segment.index.loader.ForwardIndexHandler;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.index.AbstractIndexType;
+import org.apache.pinot.segment.spi.index.ColumnConfigDeserializer;
 import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
-import org.apache.pinot.segment.spi.index.IndexCreator;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
+import org.apache.pinot.segment.spi.index.IndexConfigDeserializer;
 import org.apache.pinot.segment.spi.index.IndexHandler;
-import org.apache.pinot.segment.spi.index.IndexReader;
+import org.apache.pinot.segment.spi.index.IndexReaderConstraintException;
 import org.apache.pinot.segment.spi.index.IndexReaderFactory;
-import org.apache.pinot.segment.spi.index.IndexType;
 import org.apache.pinot.segment.spi.index.StandardIndexes;
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
-import org.apache.pinot.spi.config.table.IndexConfig;
+import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 
 
-public class ForwardIndexType implements IndexType<IndexConfig, IndexReader, 
IndexCreator> {
+public class ForwardIndexType
+    extends AbstractIndexType<ForwardIndexConfig, ForwardIndexReader, 
ForwardIndexCreator>
+    implements ConfigurableFromIndexLoadingConfig<ForwardIndexConfig> {
 
-  public static final ForwardIndexType INSTANCE = new ForwardIndexType();
-
-  private ForwardIndexType() {
+  protected ForwardIndexType() {
+    super(StandardIndexes.FORWARD_ID);
   }
 
   @Override
-  public String getId() {
-    return StandardIndexes.FORWARD_ID;
+  public Class<ForwardIndexConfig> getIndexConfigClass() {
+    return ForwardIndexConfig.class;
   }
 
   @Override
-  public Class<IndexConfig> getIndexConfigClass() {
-    return null;
+  public Map<String, ForwardIndexConfig> 
fromIndexLoadingConfig(IndexLoadingConfig indexLoadingConfig) {
+    Set<String> disabledCols = 
indexLoadingConfig.getForwardIndexDisabledColumns();
+    Map<String, ForwardIndexConfig> result = new HashMap<>();
+    Set<String> allColumns = Sets.union(disabledCols, 
indexLoadingConfig.getAllKnownColumns());
+    for (String column : allColumns) {
+      ChunkCompressionType compressionType =
+          indexLoadingConfig.getCompressionConfigs() != null
+              ? indexLoadingConfig.getCompressionConfigs().get(column)
+              : null;
+      Supplier<ForwardIndexConfig> defaultConfig = () -> {
+        if (compressionType == null) {
+          return ForwardIndexConfig.DEFAULT;
+        } else {
+          return new 
ForwardIndexConfig.Builder().withCompressionType(compressionType).build();
+        }
+      };
+      if (!disabledCols.contains(column)) {
+        TableConfig tableConfig = indexLoadingConfig.getTableConfig();
+        if (tableConfig == null) {
+          result.put(column, defaultConfig.get());
+        } else {
+          List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
+          if (fieldConfigList == null) {
+            result.put(column, defaultConfig.get());
+            continue;
+          }
+          FieldConfig fieldConfig = fieldConfigList.stream()
+              .filter(fc -> fc.getName().equals(column))
+              .findAny()
+              .orElse(null);
+          if (fieldConfig == null) {
+            result.put(column, defaultConfig.get());
+            continue;
+          }
+          ForwardIndexConfig.Builder builder = new 
ForwardIndexConfig.Builder();
+          if (compressionType != null) {
+            builder.withCompressionType(compressionType);
+          } else {
+            FieldConfig.CompressionCodec compressionCodec = 
fieldConfig.getCompressionCodec();
+            if (compressionCodec != null) {
+              
builder.withCompressionType(ChunkCompressionType.valueOf(compressionCodec.name()));
+            }
+          }
+
+          result.put(column, builder.build());
+        }
+      } else {
+        result.put(column, ForwardIndexConfig.DISABLED);
+      }
+    }
+    return result;
   }
 
   @Override
-  public IndexConfig getDefaultConfig() {
-    return IndexConfig.ENABLED;
+  public ForwardIndexConfig getDefaultConfig() {
+    return ForwardIndexConfig.DEFAULT;
   }
 
   @Override
-  public IndexConfig getConfig(TableConfig tableConfig, Schema schema) {
-    throw new UnsupportedOperationException();
+  public ColumnConfigDeserializer<ForwardIndexConfig> getDeserializer() {
+    // reads tableConfig.fieldConfigList and decides what to create using the 
FieldConfig properties and encoding
+    ColumnConfigDeserializer<ForwardIndexConfig> fromFieldConfig = 
IndexConfigDeserializer.fromCollection(
+        TableConfig::getFieldConfigList,
+        (accum, fieldConfig) -> {
+          Map<String, String> properties = fieldConfig.getProperties();
+          if (properties != null && isDisabled(properties)) {
+            accum.put(fieldConfig.getName(), ForwardIndexConfig.DISABLED);
+          } else if (fieldConfig.getEncodingType() == 
FieldConfig.EncodingType.RAW) {
+            accum.put(fieldConfig.getName(), 
createConfigFromFieldConfig(fieldConfig));
+          }
+        }
+    );
+    return IndexConfigDeserializer.fromIndexes("forward", 
getIndexConfigClass())
+        .withExclusiveAlternative(fromFieldConfig);
+  }
+
+  private boolean isDisabled(Map<String, String> props) {
+    return Boolean.parseBoolean(
+        props.getOrDefault(FieldConfig.FORWARD_INDEX_DISABLED, 
FieldConfig.DEFAULT_FORWARD_INDEX_DISABLED));
+  }
+
+  private ForwardIndexConfig createConfigFromFieldConfig(FieldConfig 
fieldConfig) {
+    if (fieldConfig.getEncodingType() != FieldConfig.EncodingType.RAW) {
+      throw new IllegalArgumentException("Cannot build a forward index on a 
field whose encoding is "
+          + fieldConfig.getEncodingType());
+    }
+    FieldConfig.CompressionCodec compressionCodec = 
fieldConfig.getCompressionCodec();
+    ForwardIndexConfig.Builder builder = new ForwardIndexConfig.Builder();
+    if (compressionCodec != null) {
+      
builder.withCompressionType(ChunkCompressionType.valueOf(compressionCodec.name()));
+    }
+
+    Map<String, String> properties = fieldConfig.getProperties();
+    if (properties != null) {
+      builder.withLegacyProperties(properties);
+    }
+
+    return builder.build();
+  }
+
+  public static ChunkCompressionType 
getDefaultCompressionType(FieldSpec.FieldType fieldType) {
+    if (fieldType == FieldSpec.FieldType.METRIC) {
+      return ChunkCompressionType.PASS_THROUGH;
+    } else {
+      return ChunkCompressionType.LZ4;
+    }
   }
 
   @Override
-  public IndexCreator createIndexCreator(IndexCreationContext context, 
IndexConfig indexConfig)
+  public ForwardIndexCreator createIndexCreator(IndexCreationContext context, 
ForwardIndexConfig indexConfig)
       throws Exception {
-    throw new UnsupportedOperationException();
+    return ForwardIndexCreatorFactory.createIndexCreator(context, indexConfig);
   }
 
   @Override
-  public IndexReaderFactory<IndexReader> getReaderFactory() {
-    throw new UnsupportedOperationException();
+  public IndexHandler createIndexHandler(SegmentDirectory segmentDirectory, 
Map<String, FieldIndexConfigs> configsByCol,
+      @Nullable Schema schema, @Nullable TableConfig tableConfig) {
+    return new ForwardIndexHandler(segmentDirectory, configsByCol, schema, 
tableConfig);
+  }
+
+  @Override
+  public IndexReaderFactory<ForwardIndexReader> getReaderFactory() {
+    return ForwardIndexReaderFactory.INSTANCE;
+  }
+
+  public static ForwardIndexReader<?> read(SegmentDirectory.Reader 
segmentReader, FieldIndexConfigs fieldIndexConfigs,
+      ColumnMetadata metadata)
+      throws IndexReaderConstraintException, IOException {
+    return 
StandardIndexes.forward().getReaderFactory().createIndexReader(segmentReader, 
fieldIndexConfigs, metadata);

Review Comment:
   just call ForwardIndexReaderFactory.createIndexReader? as in last two read() 
methods below.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/json/JsonIndexType.java:
##########
@@ -82,11 +118,38 @@ public String getFileExtension(ColumnMetadata 
columnMetadata) {
   @Override
   public IndexHandler createIndexHandler(SegmentDirectory segmentDirectory, 
Map<String, FieldIndexConfigs> configsByCol,
       @Nullable Schema schema, @Nullable TableConfig tableConfig) {
-    throw new UnsupportedOperationException();
+    return new JsonIndexHandler(segmentDirectory, configsByCol, tableConfig);
   }
 
-  @Override
-  public String toString() {
-    return getId();
+  private static class ReaderFactory extends 
IndexReaderFactory.Default<JsonIndexConfig, JsonIndexReader> {

Review Comment:
   +1 to make this private, which I believe can be applied to all other 
ReaderFactor classes. 



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java:
##########
@@ -92,14 +215,23 @@ public String getFileExtension(ColumnMetadata 
columnMetadata) {
     }
   }
 
-  @Override
-  public IndexHandler createIndexHandler(SegmentDirectory segmentDirectory, 
Map<String, FieldIndexConfigs> configsByCol,
-      @Nullable Schema schema, @Nullable TableConfig tableConfig) {
-    throw new UnsupportedOperationException();
+  /**
+   * Returns the forward index reader for the given column.
+   */
+  public static ForwardIndexReader<?> getReader(SegmentDirectory.Reader 
segmentReader,
+      ColumnMetadata columnMetadata)

Review Comment:
   format? 
   
   also, what's the method naming convention? I see read() and getReader() both 
return ForwardIndexReader<?>. 



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java:
##########
@@ -198,20 +211,21 @@ Map<String, List<Operation>> 
computeOperations(SegmentDirectory.Reader segmentRe
 
     // Get list of columns with forward index and those without forward index
     Set<String> existingForwardIndexColumns =
-        
segmentReader.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.FORWARD_INDEX);
+        
segmentReader.toSegmentDirectory().getColumnsWithIndex(StandardIndexes.forward());
     Set<String> existingForwardIndexDisabledColumns = new HashSet<>();
     for (String column : existingAllColumns) {
       if (!existingForwardIndexColumns.contains(column)) {
         existingForwardIndexDisabledColumns.add(column);
       }
     }
 
-    // From new column config.
-    Set<String> newNoDictColumns = 
_indexLoadingConfig.getNoDictionaryColumns();
-    Set<String> newForwardIndexDisabledColumns = 
_indexLoadingConfig.getForwardIndexDisabledColumns();
-
     for (String column : existingAllColumns) {
-      if (existingForwardIndexColumns.contains(column) && 
newForwardIndexDisabledColumns.contains(column)) {
+      FieldIndexConfigs newConf = _fieldIndexConfigs.get(column);
+      boolean newIsFwd = 
newConf.getConfig(StandardIndexes.forward()).isEnabled();
+      boolean newIsDict = 
newConf.getConfig(StandardIndexes.dictionary()).isEnabled();
+      boolean newIsRange = 
newConf.getConfig(StandardIndexes.range()).isEnabled();

Review Comment:
   nit: fwdIsEnabled, dictIsEnabled, rangeIsEnabled? which I feel more 
straightforward to read. 



##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/CombinedInvertedIndexCreator.java:
##########
@@ -18,6 +18,80 @@
  */
 package org.apache.pinot.segment.spi.index.creator;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * This is the index used to create range indexes

Review Comment:
   s/range/inverted



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java:
##########
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.segment.local.segment.index.forward;
+
+import java.io.File;
+import java.io.IOException;
+import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueFixedByteRawIndexCreator;
+import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueUnsortedForwardIndexCreator;
+import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator;
+import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueFixedByteRawIndexCreator;
+import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueSortedForwardIndexCreator;
+import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueUnsortedForwardIndexCreator;
+import 
org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueVarByteRawIndexCreator;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+public class ForwardIndexCreatorFactory {
+  private ForwardIndexCreatorFactory() {
+  }
+
+  public static ForwardIndexCreator createIndexCreator(IndexCreationContext 
context, ForwardIndexConfig indexConfig)
+      throws Exception {
+    String colName = context.getFieldSpec().getName();
+
+    if (!context.hasDictionary()) {
+      ChunkCompressionType chunkCompressionType = 
indexConfig.getChunkCompressionType();
+      if (chunkCompressionType == null) {
+        chunkCompressionType = 
ForwardIndexType.getDefaultCompressionType(context.getFieldSpec().getFieldType());
+      }
+
+      // Dictionary disabled columns
+      boolean deriveNumDocsPerChunk = indexConfig.isDeriveNumDocsPerChunk();
+      int writerVersion = indexConfig.getRawIndexWriterVersion();
+      if (context.getFieldSpec().isSingleValueField()) {
+        return getRawIndexCreatorForSVColumn(context.getIndexDir(), 
chunkCompressionType, colName,
+            context.getFieldSpec().getDataType().getStoredType(),
+            context.getTotalDocs(), context.getLengthOfLongestEntry(), 
deriveNumDocsPerChunk, writerVersion);
+      } else {
+        return getRawIndexCreatorForMVColumn(context.getIndexDir(), 
chunkCompressionType, colName,
+            context.getFieldSpec().getDataType().getStoredType(),
+            context.getTotalDocs(), 
context.getMaxNumberOfMultiValueElements(), deriveNumDocsPerChunk, 
writerVersion,
+            context.getMaxRowLengthInBytes());
+      }
+    } else {
+      // Dictionary enabled columns
+      if (context.getFieldSpec().isSingleValueField()) {
+        if (context.isSorted()) {
+          return new 
SingleValueSortedForwardIndexCreator(context.getIndexDir(), colName,
+              context.getCardinality());
+        } else {
+          return new 
SingleValueUnsortedForwardIndexCreator(context.getIndexDir(), colName,
+              context.getCardinality(), context.getTotalDocs());
+        }
+      } else {
+        return new 
MultiValueUnsortedForwardIndexCreator(context.getIndexDir(), colName,
+            context.getCardinality(), context.getTotalDocs(), 
context.getTotalNumberOfEntries());
+      }
+    }
+  }
+
+  /**
+   * Helper method to build the raw index creator for the column.
+   * Assumes that column to be indexed is single valued.
+   *
+   * @param file Output index file
+   * @param column Column name
+   * @param totalDocs Total number of documents to index
+   * @param lengthOfLongestEntry Length of longest entry
+   * @param deriveNumDocsPerChunk true if varbyte writer should auto-derive 
the number of rows per chunk
+   * @param writerVersion version to use for the raw index writer
+   * @return raw index creator
+   */
+  public static ForwardIndexCreator getRawIndexCreatorForSVColumn(File file, 
ChunkCompressionType compressionType,
+      String column, FieldSpec.DataType dataType, int totalDocs, int 
lengthOfLongestEntry,
+      boolean deriveNumDocsPerChunk, int writerVersion)
+      throws IOException {
+    switch (dataType.getStoredType()) {
+      case INT:
+      case LONG:
+      case FLOAT:
+      case DOUBLE:
+        return new SingleValueFixedByteRawIndexCreator(file, compressionType, 
column, totalDocs, dataType,
+            writerVersion);
+      case BIG_DECIMAL:
+      case STRING:
+      case BYTES:
+        return new SingleValueVarByteRawIndexCreator(file, compressionType, 
column, totalDocs, dataType,
+            lengthOfLongestEntry, deriveNumDocsPerChunk, writerVersion);
+      default:
+        throw new UnsupportedOperationException("Data type not supported for 
raw indexing: " + dataType);
+    }
+  }
+
+  /**
+   * Helper method to build the raw index creator for the column.
+   * Assumes that column to be indexed is single valued.

Review Comment:
   adjust the method comment, as it's for the single-value method (copied from 
above?)



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/h3/H3IndexType.java:
##########
@@ -19,75 +19,115 @@
 
 package org.apache.pinot.segment.local.segment.index.h3;
 
+import com.google.common.base.Preconditions;
+import java.io.IOException;
 import java.util.Map;
+import java.util.Objects;
 import javax.annotation.Nullable;
+import 
org.apache.pinot.segment.local.segment.creator.impl.inv.geospatial.OffHeapH3IndexCreator;
+import 
org.apache.pinot.segment.local.segment.creator.impl.inv.geospatial.OnHeapH3IndexCreator;
+import 
org.apache.pinot.segment.local.segment.index.loader.ConfigurableFromIndexLoadingConfig;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import 
org.apache.pinot.segment.local.segment.index.loader.invertedindex.H3IndexHandler;
+import 
org.apache.pinot.segment.local.segment.index.readers.geospatial.ImmutableH3IndexReader;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.index.AbstractIndexType;
+import org.apache.pinot.segment.spi.index.ColumnConfigDeserializer;
 import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
-import org.apache.pinot.segment.spi.index.IndexCreator;
+import org.apache.pinot.segment.spi.index.IndexConfigDeserializer;
 import org.apache.pinot.segment.spi.index.IndexHandler;
-import org.apache.pinot.segment.spi.index.IndexReader;
 import org.apache.pinot.segment.spi.index.IndexReaderFactory;
 import org.apache.pinot.segment.spi.index.IndexType;
 import org.apache.pinot.segment.spi.index.StandardIndexes;
+import org.apache.pinot.segment.spi.index.creator.GeoSpatialIndexCreator;
+import org.apache.pinot.segment.spi.index.creator.H3IndexConfig;
+import org.apache.pinot.segment.spi.index.reader.H3IndexReader;
+import org.apache.pinot.segment.spi.index.reader.H3IndexResolution;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
-import org.apache.pinot.spi.config.table.IndexConfig;
+import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 
 
-public class H3IndexType implements IndexType<IndexConfig, IndexReader, 
IndexCreator> {
+public class H3IndexType extends AbstractIndexType<H3IndexConfig, 
H3IndexReader, GeoSpatialIndexCreator>
+  implements ConfigurableFromIndexLoadingConfig<H3IndexConfig> {
 
-  public static final H3IndexType INSTANCE = new H3IndexType();
-
-  private H3IndexType() {
+  protected H3IndexType() {
+    super(StandardIndexes.H3_ID);
   }
 
   @Override
-  public String getId() {
-    return StandardIndexes.H3_ID;
+  public Class<H3IndexConfig> getIndexConfigClass() {
+    return H3IndexConfig.class;
   }
 
   @Override
-  public Class<IndexConfig> getIndexConfigClass() {
-    return IndexConfig.class;
+  public Map<String, H3IndexConfig> fromIndexLoadingConfig(IndexLoadingConfig 
indexLoadingConfig) {
+    return indexLoadingConfig.getH3IndexConfigs();
   }
 
   @Override
-  public IndexConfig getDefaultConfig() {
-    return IndexConfig.DISABLED;
+  public H3IndexConfig getDefaultConfig() {
+    return H3IndexConfig.DISABLED;
   }
 
   @Override
-  public IndexConfig getConfig(TableConfig tableConfig, Schema schema) {
-    throw new UnsupportedOperationException();
+  public ColumnConfigDeserializer<H3IndexConfig> getDeserializer() {
+    return IndexConfigDeserializer.fromIndexes("h3", getIndexConfigClass())
+        .withExclusiveAlternative(IndexConfigDeserializer.fromIndexTypes(
+            FieldConfig.IndexType.H3,
+            ((tableConfig, fieldConfig) -> new 
H3IndexConfig(fieldConfig.getProperties()))));
   }
 
   @Override
-  public IndexCreator createIndexCreator(IndexCreationContext context, 
IndexConfig indexConfig)
-      throws Exception {
-    throw new UnsupportedOperationException();
+  public GeoSpatialIndexCreator createIndexCreator(IndexCreationContext 
context, H3IndexConfig indexConfig)
+      throws IOException {
+    Preconditions.checkState(context.getFieldSpec().isSingleValueField(),
+        "H3 index is currently only supported on single-value columns");
+    
Preconditions.checkState(context.getFieldSpec().getDataType().getStoredType() 
== FieldSpec.DataType.BYTES,
+        "H3 index is currently only supported on BYTES columns");
+    H3IndexResolution resolution = 
Objects.requireNonNull(indexConfig).getResolution();
+    return context.isOnHeap()
+        ? new OnHeapH3IndexCreator(context.getIndexDir(), 
context.getFieldSpec().getName(), resolution)
+        : new OffHeapH3IndexCreator(context.getIndexDir(), 
context.getFieldSpec().getName(), resolution);
   }
 
   @Override
-  public IndexReaderFactory<IndexReader> getReaderFactory() {
-    throw new UnsupportedOperationException();
+  public IndexReaderFactory<H3IndexReader> getReaderFactory() {
+    return ReaderFactory.INSTANCE;
   }
 
   @Override
   public IndexHandler createIndexHandler(SegmentDirectory segmentDirectory, 
Map<String, FieldIndexConfigs> configsByCol,
       @Nullable Schema schema, @Nullable TableConfig tableConfig) {
-    throw new UnsupportedOperationException();
+    return new H3IndexHandler(segmentDirectory, configsByCol, tableConfig);
   }
 
   @Override
   public String getFileExtension(ColumnMetadata columnMetadata) {
     return V1Constants.Indexes.H3_INDEX_FILE_EXTENSION;
   }
 
-  @Override
-  public String toString() {
-    return getId();
+  public static class ReaderFactory extends 
IndexReaderFactory.Default<H3IndexConfig, H3IndexReader> {

Review Comment:
   no need for a read() method on H3IndexType?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java:
##########
@@ -19,60 +19,183 @@
 
 package org.apache.pinot.segment.local.segment.index.forward;
 
+import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
 import javax.annotation.Nullable;
+import 
org.apache.pinot.segment.local.segment.index.loader.ConfigurableFromIndexLoadingConfig;
+import org.apache.pinot.segment.local.segment.index.loader.ForwardIndexHandler;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.index.AbstractIndexType;
+import org.apache.pinot.segment.spi.index.ColumnConfigDeserializer;
 import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
-import org.apache.pinot.segment.spi.index.IndexCreator;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
+import org.apache.pinot.segment.spi.index.IndexConfigDeserializer;
 import org.apache.pinot.segment.spi.index.IndexHandler;
-import org.apache.pinot.segment.spi.index.IndexReader;
+import org.apache.pinot.segment.spi.index.IndexReaderConstraintException;
 import org.apache.pinot.segment.spi.index.IndexReaderFactory;
-import org.apache.pinot.segment.spi.index.IndexType;
 import org.apache.pinot.segment.spi.index.StandardIndexes;
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
-import org.apache.pinot.spi.config.table.IndexConfig;
+import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 
 
-public class ForwardIndexType implements IndexType<IndexConfig, IndexReader, 
IndexCreator> {
+public class ForwardIndexType
+    extends AbstractIndexType<ForwardIndexConfig, ForwardIndexReader, 
ForwardIndexCreator>
+    implements ConfigurableFromIndexLoadingConfig<ForwardIndexConfig> {
 
-  public static final ForwardIndexType INSTANCE = new ForwardIndexType();
-
-  private ForwardIndexType() {
+  protected ForwardIndexType() {
+    super(StandardIndexes.FORWARD_ID);
   }
 
   @Override
-  public String getId() {
-    return StandardIndexes.FORWARD_ID;
+  public Class<ForwardIndexConfig> getIndexConfigClass() {
+    return ForwardIndexConfig.class;
   }
 
   @Override
-  public Class<IndexConfig> getIndexConfigClass() {
-    return null;
+  public Map<String, ForwardIndexConfig> 
fromIndexLoadingConfig(IndexLoadingConfig indexLoadingConfig) {
+    Set<String> disabledCols = 
indexLoadingConfig.getForwardIndexDisabledColumns();
+    Map<String, ForwardIndexConfig> result = new HashMap<>();
+    Set<String> allColumns = Sets.union(disabledCols, 
indexLoadingConfig.getAllKnownColumns());
+    for (String column : allColumns) {
+      ChunkCompressionType compressionType =
+          indexLoadingConfig.getCompressionConfigs() != null
+              ? indexLoadingConfig.getCompressionConfigs().get(column)
+              : null;
+      Supplier<ForwardIndexConfig> defaultConfig = () -> {

Review Comment:
   why the need for a Supplier? it looks like the config value is only used 
once in the following if-else branches.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java:
##########
@@ -920,10 +929,10 @@ private void 
disableDictionaryAndCreateRawForwardIndex(String column, SegmentDir
   }
 
   private void rewriteDictToRawForwardIndex(String column, ColumnMetadata 
existingColMetadata,
-      SegmentDirectory.Writer segmentWriter, File indexDir, 
IndexCreatorProvider indexCreatorProvider)
+      SegmentDirectory.Writer segmentWriter, File indexDir)
       throws Exception {
-    try (ForwardIndexReader reader = 
LoaderUtils.getForwardIndexReader(segmentWriter, existingColMetadata)) {
-      Dictionary dictionary = LoaderUtils.getDictionary(segmentWriter, 
existingColMetadata);
+    try (ForwardIndexReader<?> reader = 
ForwardIndexType.getReader(segmentWriter, existingColMetadata)) {
+      Dictionary dictionary = DictionaryIndexType.read(segmentWriter, 
existingColMetadata);

Review Comment:
   use StandardIndexes.forward() and dictionary() here too? Make 
ForwardIndexType etc. those conctete index type implementation classes pkg 
private to avoid accidental direct reference?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java:
##########
@@ -19,60 +19,183 @@
 
 package org.apache.pinot.segment.local.segment.index.forward;
 
+import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
 import javax.annotation.Nullable;
+import 
org.apache.pinot.segment.local.segment.index.loader.ConfigurableFromIndexLoadingConfig;
+import org.apache.pinot.segment.local.segment.index.loader.ForwardIndexHandler;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.index.AbstractIndexType;
+import org.apache.pinot.segment.spi.index.ColumnConfigDeserializer;
 import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
-import org.apache.pinot.segment.spi.index.IndexCreator;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
+import org.apache.pinot.segment.spi.index.IndexConfigDeserializer;
 import org.apache.pinot.segment.spi.index.IndexHandler;
-import org.apache.pinot.segment.spi.index.IndexReader;
+import org.apache.pinot.segment.spi.index.IndexReaderConstraintException;
 import org.apache.pinot.segment.spi.index.IndexReaderFactory;
-import org.apache.pinot.segment.spi.index.IndexType;
 import org.apache.pinot.segment.spi.index.StandardIndexes;
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
-import org.apache.pinot.spi.config.table.IndexConfig;
+import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 
 
-public class ForwardIndexType implements IndexType<IndexConfig, IndexReader, 
IndexCreator> {
+public class ForwardIndexType
+    extends AbstractIndexType<ForwardIndexConfig, ForwardIndexReader, 
ForwardIndexCreator>
+    implements ConfigurableFromIndexLoadingConfig<ForwardIndexConfig> {
 
-  public static final ForwardIndexType INSTANCE = new ForwardIndexType();
-
-  private ForwardIndexType() {
+  protected ForwardIndexType() {
+    super(StandardIndexes.FORWARD_ID);
   }
 
   @Override
-  public String getId() {
-    return StandardIndexes.FORWARD_ID;
+  public Class<ForwardIndexConfig> getIndexConfigClass() {
+    return ForwardIndexConfig.class;
   }
 
   @Override
-  public Class<IndexConfig> getIndexConfigClass() {
-    return null;
+  public Map<String, ForwardIndexConfig> 
fromIndexLoadingConfig(IndexLoadingConfig indexLoadingConfig) {
+    Set<String> disabledCols = 
indexLoadingConfig.getForwardIndexDisabledColumns();
+    Map<String, ForwardIndexConfig> result = new HashMap<>();
+    Set<String> allColumns = Sets.union(disabledCols, 
indexLoadingConfig.getAllKnownColumns());
+    for (String column : allColumns) {
+      ChunkCompressionType compressionType =
+          indexLoadingConfig.getCompressionConfigs() != null
+              ? indexLoadingConfig.getCompressionConfigs().get(column)
+              : null;
+      Supplier<ForwardIndexConfig> defaultConfig = () -> {
+        if (compressionType == null) {
+          return ForwardIndexConfig.DEFAULT;
+        } else {
+          return new 
ForwardIndexConfig.Builder().withCompressionType(compressionType).build();
+        }
+      };
+      if (!disabledCols.contains(column)) {
+        TableConfig tableConfig = indexLoadingConfig.getTableConfig();
+        if (tableConfig == null) {
+          result.put(column, defaultConfig.get());
+        } else {
+          List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
+          if (fieldConfigList == null) {
+            result.put(column, defaultConfig.get());
+            continue;
+          }
+          FieldConfig fieldConfig = fieldConfigList.stream()
+              .filter(fc -> fc.getName().equals(column))
+              .findAny()
+              .orElse(null);
+          if (fieldConfig == null) {
+            result.put(column, defaultConfig.get());
+            continue;
+          }
+          ForwardIndexConfig.Builder builder = new 
ForwardIndexConfig.Builder();
+          if (compressionType != null) {
+            builder.withCompressionType(compressionType);
+          } else {
+            FieldConfig.CompressionCodec compressionCodec = 
fieldConfig.getCompressionCodec();
+            if (compressionCodec != null) {
+              
builder.withCompressionType(ChunkCompressionType.valueOf(compressionCodec.name()));
+            }
+          }
+
+          result.put(column, builder.build());
+        }
+      } else {
+        result.put(column, ForwardIndexConfig.DISABLED);
+      }
+    }
+    return result;
   }
 
   @Override
-  public IndexConfig getDefaultConfig() {
-    return IndexConfig.ENABLED;
+  public ForwardIndexConfig getDefaultConfig() {
+    return ForwardIndexConfig.DEFAULT;
   }
 
   @Override
-  public IndexConfig getConfig(TableConfig tableConfig, Schema schema) {
-    throw new UnsupportedOperationException();
+  public ColumnConfigDeserializer<ForwardIndexConfig> getDeserializer() {
+    // reads tableConfig.fieldConfigList and decides what to create using the 
FieldConfig properties and encoding
+    ColumnConfigDeserializer<ForwardIndexConfig> fromFieldConfig = 
IndexConfigDeserializer.fromCollection(
+        TableConfig::getFieldConfigList,
+        (accum, fieldConfig) -> {
+          Map<String, String> properties = fieldConfig.getProperties();
+          if (properties != null && isDisabled(properties)) {
+            accum.put(fieldConfig.getName(), ForwardIndexConfig.DISABLED);
+          } else if (fieldConfig.getEncodingType() == 
FieldConfig.EncodingType.RAW) {
+            accum.put(fieldConfig.getName(), 
createConfigFromFieldConfig(fieldConfig));
+          }
+        }
+    );
+    return IndexConfigDeserializer.fromIndexes("forward", 
getIndexConfigClass())
+        .withExclusiveAlternative(fromFieldConfig);
+  }
+
+  private boolean isDisabled(Map<String, String> props) {
+    return Boolean.parseBoolean(
+        props.getOrDefault(FieldConfig.FORWARD_INDEX_DISABLED, 
FieldConfig.DEFAULT_FORWARD_INDEX_DISABLED));
+  }
+
+  private ForwardIndexConfig createConfigFromFieldConfig(FieldConfig 
fieldConfig) {
+    if (fieldConfig.getEncodingType() != FieldConfig.EncodingType.RAW) {
+      throw new IllegalArgumentException("Cannot build a forward index on a 
field whose encoding is "

Review Comment:
   hmm.. what about DICTIONARY? fwd index can be built with dict encoded column



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java:
##########
@@ -438,10 +442,9 @@ private void 
rewriteRawForwardIndexForCompressionChange(String column, SegmentDi
   }
 
   private void rewriteRawMVForwardIndexForCompressionChange(String column, 
ColumnMetadata existingColMetadata,
-      File indexDir, SegmentDirectory.Writer segmentWriter, 
IndexCreatorProvider indexCreatorProvider,
-      ChunkCompressionType newCompressionType)
+      File indexDir, SegmentDirectory.Writer segmentWriter, 
ChunkCompressionType newCompressionType)
       throws Exception {
-    try (ForwardIndexReader reader = 
LoaderUtils.getForwardIndexReader(segmentWriter, existingColMetadata)) {
+    try (ForwardIndexReader<?> reader = 
ForwardIndexType.getReader(segmentWriter, existingColMetadata)) {

Review Comment:
   should we use StandardIndexes.forward() to access index reader? 



##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java:
##########
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.segment.spi.index;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.IndexConfig;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+public class ForwardIndexConfig extends IndexConfig {
+  public static final int DEFAULT_RAW_WRITER_VERSION = 2;
+  public static final ForwardIndexConfig DISABLED = new 
ForwardIndexConfig(false, null, null, null);
+
+  @Nullable
+  private final ChunkCompressionType _chunkCompressionType;
+  private final boolean _deriveNumDocsPerChunk;
+  private final int _rawIndexWriterVersion;
+
+  public static final ForwardIndexConfig DEFAULT = new Builder().build();

Review Comment:
   move up to be close with other constant values.



##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/TextIndexCreator.java:
##########
@@ -18,14 +18,22 @@
  */
 package org.apache.pinot.segment.spi.index.creator;
 
-import java.io.Closeable;
 import java.io.IOException;
+import org.apache.pinot.segment.spi.index.IndexCreator;
 
 
 /**
- * Index creator for text index.
+ * Index creator for both text and FST indexes.

Review Comment:
   update comment as we have added FSTIndexCreator separately



##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java:
##########
@@ -55,7 +56,7 @@
  *     SegmentDirectory.Reader reader =
  *           segmentDir.createReader();
  *     try {
- *       PinotDataBufferOld col1Dictionary = 
reader.getIndexFor("col1Dictionary", ColumnIndexType.DICTIONARY);
+ *       PinotDataBufferOld col1Dictionary = 
reader.getIndexFor("col1Dictionary", DictoionaryIndexType.INSTANCE);

Review Comment:
   typo
   
   but should this be StandardIndexes.dictionary()? or you left this one here 
as alternative way to access index.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java:
##########
@@ -129,15 +129,17 @@ private void 
createAndSealBloomFilterForDictionaryColumn(BloomFilterCreatorProvi
     }
   }
 
-  private void 
createAndSealBloomFilterForNonDictionaryColumn(BloomFilterCreatorProvider 
indexCreatorProvider,
-      File indexDir, ColumnMetadata columnMetadata, BloomFilterConfig 
bloomFilterConfig,
-      SegmentDirectory.Writer segmentWriter)
+  private void createAndSealBloomFilterForNonDictionaryColumn(File indexDir, 
ColumnMetadata columnMetadata,
+      BloomFilterConfig bloomFilterConfig, SegmentDirectory.Writer 
segmentWriter)
       throws Exception {
     int numDocs = columnMetadata.getTotalDocs();
-    try (BloomFilterCreator bloomFilterCreator = 
indexCreatorProvider.newBloomFilterCreator(
-        
IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(columnMetadata).build()
-            .forBloomFilter(bloomFilterConfig));
-        ForwardIndexReader forwardIndexReader = 
LoaderUtils.getForwardIndexReader(segmentWriter, columnMetadata);
+    IndexCreationContext context = IndexCreationContext.builder()
+        .withIndexDir(indexDir)
+        .withColumnMetadata(columnMetadata)
+        .build();
+    try (BloomFilterCreator bloomFilterCreator = StandardIndexes.bloomFilter()
+        .createIndexCreator(context, bloomFilterConfig);
+        ForwardIndexReader forwardIndexReader = 
ForwardIndexType.getReader(segmentWriter, columnMetadata);

Review Comment:
   use StandardIndexes.forward() instead?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/inverted/InvertedIndexType.java:
##########
@@ -86,7 +130,46 @@ public String getFileExtension(ColumnMetadata 
columnMetadata) {
   }
 
   @Override
-  public String toString() {
-    return getId();
+  public IndexHandler createIndexHandler(SegmentDirectory segmentDirectory, 
Map<String, FieldIndexConfigs> configsByCol,
+      @Nullable Schema schema, @Nullable TableConfig tableConfig) {
+    return new InvertedIndexHandler(segmentDirectory, configsByCol, 
tableConfig);
+  }
+
+  public static class ReaderFactory implements 
IndexReaderFactory<InvertedIndexReader> {
+    public static final ReaderFactory INSTANCE = new ReaderFactory();
+
+    private ReaderFactory() {
+    }
+
+    @Override
+    public InvertedIndexReader createIndexReader(SegmentDirectory.Reader 
segmentReader,
+        FieldIndexConfigs fieldIndexConfigs, ColumnMetadata metadata)
+        throws IOException, IndexReaderConstraintException {
+      if (fieldIndexConfigs == null || 
!fieldIndexConfigs.getConfig(StandardIndexes.inverted()).isEnabled()) {
+        return null;
+      }
+      if (!metadata.hasDictionary()) {
+        throw new IllegalStateException("Column " + metadata.getColumnName() + 
" cannot be indexed by an inverted "
+            + "index if it has no dictionary");
+      }
+      if (metadata.isSorted() && metadata.isSingleValue()) {
+        ForwardIndexReader fwdReader = 
StandardIndexes.forward().getReaderFactory()
+            .createIndexReader(segmentReader, fieldIndexConfigs, metadata);
+        Preconditions.checkState(fwdReader instanceof SortedIndexReader);
+        return (SortedIndexReader) fwdReader;
+      } else {
+        return createSkippingForward(segmentReader, metadata);
+      }
+    }
+
+    public InvertedIndexReader createSkippingForward(SegmentDirectory.Reader 
segmentReader, ColumnMetadata metadata)

Review Comment:
   got it. I'd prefer to just call it createIndexReader, overloading the 
method, hiding impl details from the method name. 



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java:
##########
@@ -104,20 +104,31 @@ public void process()
       }
 
       // Update single-column indices, like inverted index, json index etc.
-      IndexCreatorProvider indexCreatorProvider = 
IndexingOverrides.getIndexCreatorProvider();
       List<IndexHandler> indexHandlers = new ArrayList<>();
-      for (ColumnIndexType type : ColumnIndexType.values()) {
-        IndexHandler handler =
-            IndexHandlerFactory.getIndexHandler(type, _segmentDirectory, 
_indexLoadingConfig, _schema);
-        indexHandlers.add(handler);
-        // TODO: Find a way to ensure ForwardIndexHandler is always executed 
before other handlers instead of
-        // relying on enum ordering.
-        handler.updateIndices(segmentWriter, indexCreatorProvider);
-        // ForwardIndexHandler may modify the segment metadata while rewriting 
forward index to create / remove a
-        // dictionary. Other IndexHandler classes may modify the segment 
metadata while creating a temporary forward
-        // index to generate their respective indexes from if the forward 
index was disabled. This new metadata is
-        // needed to construct other indexes like RangeIndex.
-        _segmentMetadata = _segmentDirectory.getSegmentMetadata();
+
+      // We cannot just create all the index handlers in a random order.
+      // Specifically, ForwardIndexHandler needs to be executed first. This is 
because it modifies the segment metadata
+      // while rewriting forward index to create a dictionary. Some other 
handlers (like the range one) assume that
+      // metadata was already been modified by ForwardIndexHandler.
+      IndexHandler forwardHandler = createHandler(StandardIndexes.forward());
+      indexHandlers.add(forwardHandler);
+      forwardHandler.updateIndices(segmentWriter);
+
+      // Now that ForwardIndexHandler.updateIndeces has been updated, we can 
run all other indexes in any order

Review Comment:
   s/updateIndeces/updateIndices



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java:
##########
@@ -104,20 +104,31 @@ public void process()
       }
 
       // Update single-column indices, like inverted index, json index etc.
-      IndexCreatorProvider indexCreatorProvider = 
IndexingOverrides.getIndexCreatorProvider();
       List<IndexHandler> indexHandlers = new ArrayList<>();
-      for (ColumnIndexType type : ColumnIndexType.values()) {
-        IndexHandler handler =
-            IndexHandlerFactory.getIndexHandler(type, _segmentDirectory, 
_indexLoadingConfig, _schema);
-        indexHandlers.add(handler);
-        // TODO: Find a way to ensure ForwardIndexHandler is always executed 
before other handlers instead of
-        // relying on enum ordering.
-        handler.updateIndices(segmentWriter, indexCreatorProvider);
-        // ForwardIndexHandler may modify the segment metadata while rewriting 
forward index to create / remove a
-        // dictionary. Other IndexHandler classes may modify the segment 
metadata while creating a temporary forward
-        // index to generate their respective indexes from if the forward 
index was disabled. This new metadata is
-        // needed to construct other indexes like RangeIndex.
-        _segmentMetadata = _segmentDirectory.getSegmentMetadata();
+
+      // We cannot just create all the index handlers in a random order.
+      // Specifically, ForwardIndexHandler needs to be executed first. This is 
because it modifies the segment metadata
+      // while rewriting forward index to create a dictionary. Some other 
handlers (like the range one) assume that
+      // metadata was already been modified by ForwardIndexHandler.
+      IndexHandler forwardHandler = createHandler(StandardIndexes.forward());
+      indexHandlers.add(forwardHandler);
+      forwardHandler.updateIndices(segmentWriter);
+
+      // Now that ForwardIndexHandler.updateIndeces has been updated, we can 
run all other indexes in any order
+
+      _segmentMetadata = new SegmentMetadataImpl(indexDir);
+      _segmentDirectory.reloadMetadata();

Review Comment:
   curious why reloadMetadata() is required now? 



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java:
##########
@@ -264,51 +265,42 @@ private void 
createBloomFilterForColumn(SegmentDirectory.Writer segmentWriter, C
 
     if (!columnMetadata.hasDictionary()) {
       // Create a temporary forward index if it is disabled and does not exist
-      columnMetadata = createForwardIndexIfNeeded(segmentWriter, columnName, 
indexCreatorProvider, true);
+      columnMetadata = createForwardIndexIfNeeded(segmentWriter, columnName, 
true);
     }
 
     // Create new bloom filter for the column.
     BloomFilterConfig bloomFilterConfig = _bloomFilterConfigs.get(columnName);
     LOGGER.info("Creating new bloom filter for segment: {}, column: {} with 
config: {}", segmentName, columnName,
         bloomFilterConfig);
     if (columnMetadata.hasDictionary()) {
-      createAndSealBloomFilterForDictionaryColumn(bloomFilterCreatorProvider, 
indexDir, columnMetadata,
-          bloomFilterConfig, segmentWriter);
+      createAndSealBloomFilterForDictionaryColumn(indexDir, columnMetadata, 
bloomFilterConfig, segmentWriter);
     } else {
-      
createAndSealBloomFilterForNonDictionaryColumn(bloomFilterCreatorProvider, 
indexDir, columnMetadata,
-          bloomFilterConfig, segmentWriter);
+      createAndSealBloomFilterForNonDictionaryColumn(indexDir, columnMetadata, 
bloomFilterConfig, segmentWriter);
     }
 
     // For v3, write the generated bloom filter file into the single file and 
remove it.
     if (_segmentDirectory.getSegmentMetadata().getVersion() == 
SegmentVersion.v3) {
-      LoaderUtils.writeIndexToV3Format(segmentWriter, columnName, 
bloomFilterFile, ColumnIndexType.BLOOM_FILTER);
+      LoaderUtils.writeIndexToV3Format(segmentWriter, columnName, 
bloomFilterFile, StandardIndexes.bloomFilter());
     }
 
     // Delete the marker file.
     FileUtils.deleteQuietly(bloomFilterFileInProgress);
     LOGGER.info("Created bloom filter for segment: {}, column: {}", 
segmentName, columnName);
   }
 
-  private BaseImmutableDictionary getDictionaryReader(ColumnMetadata 
columnMetadata,
-      SegmentDirectory.Writer segmentWriter)
+  private Dictionary getDictionaryReader(ColumnMetadata columnMetadata, 
SegmentDirectory.Writer segmentWriter)
       throws IOException {
-    PinotDataBuffer dictionaryBuffer =
-        segmentWriter.getIndexFor(columnMetadata.getColumnName(), 
ColumnIndexType.DICTIONARY);
-    int cardinality = columnMetadata.getCardinality();
     DataType dataType = columnMetadata.getDataType();
+
     switch (dataType) {
       case INT:
-        return new IntDictionary(dictionaryBuffer, cardinality);
       case LONG:
-        return new LongDictionary(dictionaryBuffer, cardinality);
       case FLOAT:
-        return new FloatDictionary(dictionaryBuffer, cardinality);
       case DOUBLE:
-        return new DoubleDictionary(dictionaryBuffer, cardinality);
       case STRING:
-        return new StringDictionary(dictionaryBuffer, cardinality, 
columnMetadata.getColumnMaxLength());
       case BYTES:
-        return new BytesDictionary(dictionaryBuffer, cardinality, 
columnMetadata.getColumnMaxLength());
+        PinotDataBuffer buf = 
segmentWriter.getIndexFor(columnMetadata.getColumnName(), 
StandardIndexes.dictionary());
+        return DictionaryIndexType.read(buf, columnMetadata, 
DictionaryIndexConfig.DEFAULT_OFFHEAP);

Review Comment:
   StandardIndexes.dictionary() to access the index reader?
   
   and same for a few places in other index handler classes, to use 
StandardIndexes.xxx() to access the index handler?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/nullvalue/NullValueIndexType.java:
##########
@@ -60,34 +66,56 @@ public IndexConfig getDefaultConfig() {
   }
 
   @Override
-  public IndexConfig getConfig(TableConfig tableConfig, Schema schema) {
-    throw new UnsupportedOperationException();
+  public ColumnConfigDeserializer<IndexConfig> getDeserializer() {
+    return IndexConfigDeserializer.fromIndexes("null", getIndexConfigClass())
+        .withFallbackAlternative(
+            IndexConfigDeserializer.ifIndexingConfig(
+                IndexConfigDeserializer.alwaysCall((TableConfig tableConfig, 
Schema schema) ->
+                  tableConfig.getIndexingConfig().isNullHandlingEnabled()
+                      ? IndexConfig.ENABLED
+                      : IndexConfig.DISABLED))
+        );
   }
 
-  @Override
-  public IndexCreator createIndexCreator(IndexCreationContext context, 
IndexConfig indexConfig)
-      throws Exception {
-    throw new UnsupportedOperationException();
+  public NullValueVectorCreator createIndexCreator(File indexDir, String 
columnName) {
+    return new NullValueVectorCreator(indexDir, columnName);
   }
 
   @Override
-  public IndexReaderFactory<IndexReader> getReaderFactory() {
-    throw new UnsupportedOperationException();
+  public IndexReaderFactory<NullValueVectorReader> getReaderFactory() {
+    return ReaderFactory.INSTANCE;
   }
 
   @Override
   public IndexHandler createIndexHandler(SegmentDirectory segmentDirectory, 
Map<String, FieldIndexConfigs> configsByCol,
       @Nullable Schema schema, @Nullable TableConfig tableConfig) {
-    throw new UnsupportedOperationException();
+    return IndexHandler.NoOp.INSTANCE;
   }
 
   @Override
   public String getFileExtension(ColumnMetadata columnMetadata) {
     return V1Constants.Indexes.NULLVALUE_VECTOR_FILE_EXTENSION;
   }
 
-  @Override
-  public String toString() {
-    return getId();
+  public static class ReaderFactory implements 
IndexReaderFactory<NullValueVectorReader> {
+
+    public static final ReaderFactory INSTANCE = new ReaderFactory();
+
+    private ReaderFactory() {
+    }
+
+    @Nullable
+    @Override
+    public NullValueVectorReader createIndexReader(SegmentDirectory.Reader 
segmentReader,
+        FieldIndexConfigs fieldIndexConfigs, ColumnMetadata metadata)
+          throws IOException {
+      // For historical and test reasons, NullValueIndexType doesn't really 
care about its config
+      // if there is a buffer for this index, it is read even if the config 
explicitly ask to disable it.

Review Comment:
   add a TODO to fix the legacy logic? 



##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java:
##########
@@ -112,6 +143,16 @@ public Builder withColumnMetadata(ColumnMetadata 
columnMetadata) {
           
.withMaxNumberOfMultiValueElements(columnMetadata.getMaxNumberOfMultiValues());
     }
 
+    public Builder withIsOptimizedDictionary(boolean optimized) {

Review Comment:
   nit: should this be called withOptimizeDictionary? i.e. a hint to optimize 
dictionary



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/text/TextIndexConfigBuilder.java:
##########
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.segment.local.segment.index.text;
+
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.local.segment.store.TextIndexUtils;
+import org.apache.pinot.segment.spi.index.TextIndexConfig;
+import org.apache.pinot.spi.config.table.FSTType;
+import org.apache.pinot.spi.config.table.FieldConfig;
+
+
+public class TextIndexConfigBuilder extends TextIndexConfig.AbstractBuilder {
+  public TextIndexConfigBuilder(@Nullable FSTType fstType) {
+    super(fstType);
+  }
+
+  public TextIndexConfigBuilder(TextIndexConfig other) {
+    super(other);
+  }
+
+  @Override
+  public TextIndexConfig.AbstractBuilder withProperties(@Nullable Map<String, 
String> textIndexProperties) {
+    if (textIndexProperties != null) {
+      if 
(Boolean.parseBoolean(textIndexProperties.get(FieldConfig.TEXT_INDEX_NO_RAW_DATA)))
 {
+        _rawValueForTextIndex = 
textIndexProperties.get(FieldConfig.TEXT_INDEX_RAW_VALUE);
+        if (_rawValueForTextIndex == null) {
+          _rawValueForTextIndex = FieldConfig.TEXT_INDEX_DEFAULT_RAW_VALUE;
+        }
+      }
+      _enableQueryCache = 
Boolean.parseBoolean(textIndexProperties.get(FieldConfig.TEXT_INDEX_ENABLE_QUERY_CACHE));
+      _useANDForMultiTermQueries = Boolean.parseBoolean(
+          
textIndexProperties.get(FieldConfig.TEXT_INDEX_USE_AND_FOR_MULTI_TERM_QUERIES));
+      _stopWordsInclude = 
TextIndexUtils.extractStopWordsInclude(textIndexProperties);
+      _stopWordsExclude = 
TextIndexUtils.extractStopWordsExclude(textIndexProperties);
+    }
+    return this;
+  }
+
+  @Override
+  public TextIndexConfigBuilder withStopWordsInclude(List<String> 
stopWordsInclude) {

Review Comment:
   looks like no need to overwrite this and next two methods.



##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java:
##########
@@ -250,6 +208,47 @@ public SegmentGeneratorConfig(TableConfig tableConfig, 
Schema schema) {
       _rowTimeValueCheck = ingestionConfig.isRowTimeValueCheck();
       _segmentTimeValueCheck = ingestionConfig.isSegmentTimeValueCheck();
     }
+
+    _indexConfigsByColName = 
FieldIndexConfigsUtil.createIndexConfigsByColName(tableConfig, schema);
+
+    if (indexingConfig != null) {
+      // NOTE: There are 2 ways to configure creating inverted index during 
segment generation:
+      //       - Set 'generate.inverted.index.before.push' to 'true' in custom 
config (deprecated)
+      //       - Enable 'createInvertedIndexDuringSegmentGeneration' in 
indexing config
+      // TODO: Clean up the table configs with the deprecated settings, and 
always use the one in the indexing config
+      // TODO 2: Decide what to do with this. Index-spi is based on the idea 
that TableConfig is the source of truth
+      if (indexingConfig.getInvertedIndexColumns() != null) {
+        Map<String, String> customConfigs = 
tableConfig.getCustomConfig().getCustomConfigs();
+        if ((customConfigs != null && 
Boolean.parseBoolean(customConfigs.get("generate.inverted.index.before.push")))
+            || indexingConfig.isCreateInvertedIndexDuringSegmentGeneration()) {
+          setIndexOn(StandardIndexes.inverted(), IndexConfig.ENABLED, 
indexingConfig.getInvertedIndexColumns());
+        }
+      }
+    }
+  }
+
+  public <C extends IndexConfig> void setIndexOn(IndexType<C, ?, ?> indexType, 
C config, String... columns) {
+    setIndexOn(indexType, config, Arrays.asList(columns));
+  }
+
+  @VisibleForTesting
+  public <C extends IndexConfig> void setIndexOn(IndexType<C, ?, ?> indexType, 
C config,
+      @Nullable Iterable<String> columns) {
+    if (columns == null) {
+      return;
+    }
+    for (String column : columns) {
+      _indexConfigsByColName.compute(column, (key, old) -> {
+        FieldIndexConfigs.Builder builder;
+        if (old == null) {
+          builder = new FieldIndexConfigs.Builder();
+        } else {
+          builder = new FieldIndexConfigs.Builder(old);
+        }
+        return builder.add(indexType, config)
+            .build();

Review Comment:
   fold `.build()` up?



##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java:
##########
@@ -250,6 +208,47 @@ public SegmentGeneratorConfig(TableConfig tableConfig, 
Schema schema) {
       _rowTimeValueCheck = ingestionConfig.isRowTimeValueCheck();
       _segmentTimeValueCheck = ingestionConfig.isSegmentTimeValueCheck();
     }
+
+    _indexConfigsByColName = 
FieldIndexConfigsUtil.createIndexConfigsByColName(tableConfig, schema);
+
+    if (indexingConfig != null) {
+      // NOTE: There are 2 ways to configure creating inverted index during 
segment generation:
+      //       - Set 'generate.inverted.index.before.push' to 'true' in custom 
config (deprecated)
+      //       - Enable 'createInvertedIndexDuringSegmentGeneration' in 
indexing config
+      // TODO: Clean up the table configs with the deprecated settings, and 
always use the one in the indexing config
+      // TODO 2: Decide what to do with this. Index-spi is based on the idea 
that TableConfig is the source of truth
+      if (indexingConfig.getInvertedIndexColumns() != null) {
+        Map<String, String> customConfigs = 
tableConfig.getCustomConfig().getCustomConfigs();
+        if ((customConfigs != null && 
Boolean.parseBoolean(customConfigs.get("generate.inverted.index.before.push")))

Review Comment:
   define this config name in some existing Config or Constants util classes? 



##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java:
##########
@@ -441,53 +336,23 @@ public void setRawIndexCreationColumns(List<String> 
rawIndexCreationColumns) {
     _rawIndexCreationColumns.addAll(rawIndexCreationColumns);
   }
 
-  // NOTE: Should always be extracted from the table config
-  @Deprecated
-  public void setInvertedIndexCreationColumns(List<String> 
indexCreationColumns) {
-    Preconditions.checkNotNull(indexCreationColumns);
-    _invertedIndexCreationColumns.addAll(indexCreationColumns);
-  }
-
-  /**
-   * Used by org.apache.pinot.core.realtime.converter.RealtimeSegmentConverter
-   * and text search functional tests
-   * @param textIndexCreationColumns list of columns with text index creation 
enabled
-   */
-  public void setTextIndexCreationColumns(List<String> 
textIndexCreationColumns) {
-    if (textIndexCreationColumns != null) {
-      _textIndexCreationColumns.addAll(textIndexCreationColumns);
-    }
-  }
-
   @VisibleForTesting
   public void setRangeIndexCreationColumns(List<String> 
rangeIndexCreationColumns) {
     if (rangeIndexCreationColumns != null) {
-      _rangeIndexCreationColumns.addAll(rangeIndexCreationColumns);
+      setIndexOn(StandardIndexes.range(), new RangeIndexConfig(2), 
rangeIndexCreationColumns);

Review Comment:
   s/2/the_version_constant



##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/RangeIndexReader.java:
##########
@@ -18,15 +18,16 @@
  */
 package org.apache.pinot.segment.spi.index.reader;
 
-import java.io.Closeable;
 import javax.annotation.Nullable;
+import org.apache.pinot.segment.spi.index.IndexReader;
+

Review Comment:
   extra space?



##########
pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java:
##########
@@ -317,7 +317,7 @@ private void convertOneColumn(IndexSegment segment, String 
column, File newSegme
     int numDocs = segment.getSegmentMetadata().getTotalDocs();
     int lengthOfLongestEntry = (storedType == DataType.STRING) ? 
getLengthOfLongestEntry(dictionary) : -1;
 
-    try (ForwardIndexCreator rawIndexCreator = DefaultIndexCreatorProvider
+    try (ForwardIndexCreator rawIndexCreator = ForwardIndexCreatorFactory

Review Comment:
   should we use StandardIndexes.forward() to get reference of index creator, 
instead of referring to ForwardIndexCreatorFactory class directly? perhaps just 
make those index specific factories pkg private?



##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/provider/MutableInvertedIndexProvider.java:
##########
@@ -22,5 +22,6 @@
 
 
 public interface MutableInvertedIndexProvider {
+  @Deprecated

Review Comment:
   as this is deprecated, what's the new way? maybe comment it here for later 
reference



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to