gortiz commented on code in PR #10184:
URL: https://github.com/apache/pinot/pull/10184#discussion_r1151572494


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java:
##########
@@ -133,183 +120,141 @@ public void init(SegmentGeneratorConfig 
segmentCreationSpec, SegmentIndexCreatio
       return;
     }
 
-    Collection<FieldSpec> fieldSpecs = schema.getAllFieldSpecs();
-    Set<String> invertedIndexColumns = new HashSet<>();
-    for (String columnName : _config.getInvertedIndexCreationColumns()) {
-      Preconditions.checkState(schema.hasColumn(columnName),
-          "Cannot create inverted index for column: %s because it is not in 
schema", columnName);
-      invertedIndexColumns.add(columnName);
-    }
+    Map<String, FieldIndexConfigs> indexConfigs = 
segmentCreationSpec.getIndexConfigsByColName();
 
-    Set<String> bloomFilterColumns = new HashSet<>();
-    for (String columnName : _config.getBloomFilterCreationColumns()) {
-      Preconditions.checkState(schema.hasColumn(columnName),
-          "Cannot create bloom filter for column: %s because it is not in 
schema", columnName);
-      bloomFilterColumns.add(columnName);
-    }
-
-    Set<String> rangeIndexColumns = new HashSet<>();
-    for (String columnName : _config.getRangeIndexCreationColumns()) {
-      Preconditions.checkState(schema.hasColumn(columnName),
-          "Cannot create range index for column: %s because it is not in 
schema", columnName);
-      rangeIndexColumns.add(columnName);
-    }
-
-    Set<String> textIndexColumns = new HashSet<>();
-    for (String columnName : _config.getTextIndexCreationColumns()) {
-      Preconditions.checkState(schema.hasColumn(columnName),
-          "Cannot create text index for column: %s because it is not in 
schema", columnName);
-      textIndexColumns.add(columnName);
-    }
-
-    Set<String> fstIndexColumns = new HashSet<>();
-    for (String columnName : _config.getFSTIndexCreationColumns()) {
-      Preconditions.checkState(schema.hasColumn(columnName),
-          "Cannot create FST index for column: %s because it is not in 
schema", columnName);
-      fstIndexColumns.add(columnName);
-    }
-
-    Map<String, JsonIndexConfig> jsonIndexConfigs = 
_config.getJsonIndexConfigs();
-    for (String columnName : jsonIndexConfigs.keySet()) {
-      Preconditions.checkState(schema.hasColumn(columnName),
-          "Cannot create json index for column: %s because it is not in 
schema", columnName);
-    }
-
-    Set<String> forwardIndexDisabledColumns = new HashSet<>();
-    for (String columnName : _config.getForwardIndexDisabledColumns()) {
-      Preconditions.checkState(schema.hasColumn(columnName), 
String.format("Invalid config. Can't disable "
-          + "forward index creation for a column: %s that does not exist in 
schema", columnName));
-      forwardIndexDisabledColumns.add(columnName);
-    }
-
-    Map<String, H3IndexConfig> h3IndexConfigs = _config.getH3IndexConfigs();
-    for (String columnName : h3IndexConfigs.keySet()) {
-      Preconditions.checkState(schema.hasColumn(columnName),
-          "Cannot create H3 index for column: %s because it is not in schema", 
columnName);
-    }
+    _creatorsByColAndIndex = 
Maps.newHashMapWithExpectedSize(indexConfigs.keySet().size());
 
-    // Initialize creators for dictionary, forward index and inverted index
-    IndexingConfig indexingConfig = 
_config.getTableConfig().getIndexingConfig();
-    int rangeIndexVersion = indexingConfig.getRangeIndexVersion();
-    for (FieldSpec fieldSpec : fieldSpecs) {
-      // Ignore virtual columns
+    for (String columnName : indexConfigs.keySet()) {
+      FieldSpec fieldSpec = schema.getFieldSpecFor(columnName);
+      if (fieldSpec == null) {
+        Preconditions.checkState(schema.hasColumn(columnName),
+            "Cannot create index for column: %s because it is not in schema", 
columnName);
+      }
       if (fieldSpec.isVirtualColumn()) {
+        LOGGER.warn("Ignoring index creation for virtual column " + 
columnName);
         continue;
       }
 
-      String columnName = fieldSpec.getName();
+      FieldIndexConfigs originalConfig = indexConfigs.get(columnName);
       ColumnIndexCreationInfo columnIndexCreationInfo = 
indexCreationInfoMap.get(columnName);
       Preconditions.checkNotNull(columnIndexCreationInfo, "Missing index 
creation info for column: %s", columnName);
       boolean dictEnabledColumn = 
createDictionaryForColumn(columnIndexCreationInfo, segmentCreationSpec, 
fieldSpec);
-      Preconditions.checkState(dictEnabledColumn || 
!invertedIndexColumns.contains(columnName),
+      Preconditions.checkState(dictEnabledColumn || 
!originalConfig.getConfig(StandardIndexes.inverted()).isEnabled(),
           "Cannot create inverted index for raw index column: %s", columnName);
 
-      boolean forwardIndexDisabled = 
forwardIndexDisabledColumns.contains(columnName);
+      IndexType<ForwardIndexConfig, ?, ForwardIndexCreator> forwardIdx = 
StandardIndexes.forward();
+      boolean forwardIndexDisabled = 
!originalConfig.getConfig(forwardIdx).isEnabled();
 
       IndexCreationContext.Common context = IndexCreationContext.builder()
           .withIndexDir(_indexDir)
-          .withCardinality(columnIndexCreationInfo.getDistinctValueCount())
           .withDictionary(dictEnabledColumn)
           .withFieldSpec(fieldSpec)
           .withTotalDocs(segmentIndexCreationInfo.getTotalDocs())
-          .withMinValue((Comparable<?>) columnIndexCreationInfo.getMin())
-          .withMaxValue((Comparable<?>) columnIndexCreationInfo.getMax())
-          
.withTotalNumberOfEntries(columnIndexCreationInfo.getTotalNumberOfEntries())
           .withColumnIndexCreationInfo(columnIndexCreationInfo)
-          .sorted(columnIndexCreationInfo.isSorted())
+          .withIsOptimizedDictionary(_config.isOptimizeDictionary()
+              || _config.isOptimizeDictionaryForMetrics() && 
fieldSpec.getFieldType() == FieldSpec.FieldType.METRIC)
           .onHeap(segmentCreationSpec.isOnHeap())
           .withForwardIndexDisabled(forwardIndexDisabled)
+          .withTextCommitOnClose(true)
           .build();
-      // Initialize forward index creator
-      ChunkCompressionType chunkCompressionType =
-          dictEnabledColumn ? null : 
getColumnCompressionType(segmentCreationSpec, fieldSpec);
-      // Sorted columns treat the 'forwardIndexDisabled' flag as a no-op
-      _forwardIndexCreatorMap.put(columnName, (forwardIndexDisabled && 
!columnIndexCreationInfo.isSorted())
-          ? null : _indexCreatorProvider.newForwardIndexCreator(
-              context.forForwardIndex(chunkCompressionType, 
segmentCreationSpec.getColumnProperties())));
-
-      // Initialize inverted index creator; skip creating inverted index if 
sorted
-      if (invertedIndexColumns.contains(columnName) && 
!columnIndexCreationInfo.isSorted()) {
-        _invertedIndexCreatorMap.put(columnName,
-            
_indexCreatorProvider.newInvertedIndexCreator(context.forInvertedIndex()));
-      }
+
+      FieldIndexConfigs config = adaptConfig(columnName, originalConfig, 
columnIndexCreationInfo, segmentCreationSpec);
+
       if (dictEnabledColumn) {
         // Create dictionary-encoded index
         // Initialize dictionary creator
         // TODO: Dictionary creator holds all unique values on heap. Consider 
keeping dictionary instead of creator
         //       which uses off-heap memory.
-        SegmentDictionaryCreator dictionaryCreator =
-            new SegmentDictionaryCreator(fieldSpec, _indexDir, 
columnIndexCreationInfo.isUseVarLengthDictionary());
-        _dictionaryCreatorMap.put(columnName, dictionaryCreator);
-        // Create dictionary
+
+        // Index conf should be present if dictEnabledColumn is true. In case 
it doesn't, getConfig will throw an
+        // exception
+        DictionaryIndexConfig dictConfig = 
config.getConfig(StandardIndexes.dictionary());
+        if (!dictConfig.isEnabled()) {
+          throw new IllegalArgumentException("Dictionary index should be 
enabled");
+        }
+        SegmentDictionaryCreator creator = new 
DictionaryIndexPlugin().getIndexType()
+            .createIndexCreator(context, dictConfig);
+
         try {
-          
dictionaryCreator.build(columnIndexCreationInfo.getSortedUniqueElementsArray());
+          creator.build(context.getSortedUniqueElementsArray());
         } catch (Exception e) {
           LOGGER.error("Error building dictionary for field: {}, cardinality: 
{}, number of bytes per entry: {}",
-              fieldSpec.getName(), 
columnIndexCreationInfo.getDistinctValueCount(),
-              dictionaryCreator.getNumBytesPerEntry());
+              context.getFieldSpec().getName(), context.getCardinality(), 
creator.getNumBytesPerEntry());
           throw e;
         }
-      }
 
-      if (bloomFilterColumns.contains(columnName)) {
-        if (indexingConfig.getBloomFilterConfigs() != null
-            && indexingConfig.getBloomFilterConfigs().containsKey(columnName)) 
{
-          _bloomFilterCreatorMap.put(columnName, 
_indexCreatorProvider.newBloomFilterCreator(
-              
context.forBloomFilter(indexingConfig.getBloomFilterConfigs().get(columnName))));
-        } else {
-          _bloomFilterCreatorMap.put(columnName, 
_indexCreatorProvider.newBloomFilterCreator(
-              context.forBloomFilter(new 
BloomFilterConfig(BloomFilterConfig.DEFAULT_FPP, 0, false))));
-        }
+        _dictionaryCreatorMap.put(columnName, creator);
       }
 
-      if (!columnIndexCreationInfo.isSorted() && 
rangeIndexColumns.contains(columnName)) {
-        _rangeIndexFilterCreatorMap.put(columnName,
-            
_indexCreatorProvider.newRangeIndexCreator(context.forRangeIndex(rangeIndexVersion)));
+      Map<IndexType<?, ?, ?>, IndexCreator> creatorsByIndex =
+          
Maps.newHashMapWithExpectedSize(IndexService.getInstance().getAllIndexes().size());
+      for (IndexType<?, ?, ?> index : 
IndexService.getInstance().getAllIndexes()) {
+        if (hasSpecialLifecycle(index)) {
+          continue;
+        }
+        tryCreateIndexCreator(creatorsByIndex, index, context, config);
       }
-
-      if (textIndexColumns.contains(columnName)) {
-        FSTType fstType = FSTType.LUCENE;
-        List<FieldConfig> fieldConfigList = 
_config.getTableConfig().getFieldConfigList();
-        if (fieldConfigList != null) {
-          for (FieldConfig fieldConfig : fieldConfigList) {
-            if (fieldConfig.getName().equals(columnName)) {
-              Map<String, String> properties = fieldConfig.getProperties();
-              if (TextIndexUtils.isFstTypeNative(properties)) {
-                fstType = FSTType.NATIVE;
-              }
-            }
-          }
+      // TODO: Remove this when values stored as ForwardIndex stop depending 
on TextIndex config
+      IndexCreator oldFwdCreator = creatorsByIndex.get(forwardIdx);
+      if (oldFwdCreator instanceof ForwardIndexCreator) { // this implies that 
oldFwdCreator != null

Review Comment:
   I don't think so, but I'm not 100% sure and I don't think it really deserves 
to analyze the code to check that. It would mean that reviewers will also need 
to know that.
   
   In my opinion is quite better to have a instanceof check than a null check 
here because it isn't going to impact performance, it is going to make the code 
easier to read and it is going to be more resilient to failure if somehow the 
invariant is broken in future.
   
   But this code shouldn't be here for too long, so I'm ok with changing it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to