vrajat commented on code in PR #14451:
URL: https://github.com/apache/pinot/pull/14451#discussion_r1854163236


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java:
##########
@@ -1046,6 +1064,261 @@ public boolean needReloadSegments()
     return needReload;
   }
 
+  @Override
+  public List<NeedRefreshResponse> getSegmentsForRefresh(TableConfig 
tableConfig, Schema schema) {
+    List<NeedRefreshResponse> segmentsRequiringRefresh = new ArrayList<>();
+    List<SegmentDataManager> segmentDataManagers = acquireAllSegments();
+    for (SegmentDataManager segmentDataManager : segmentDataManagers) {
+      NeedRefreshResponse response = needRefresh(tableConfig, schema, 
segmentDataManager);
+      if (response.isNeedRefresh()) {
+        segmentsRequiringRefresh.add(response);
+      }
+    }
+
+    return segmentsRequiringRefresh;
+  }
+
+  protected NeedRefreshResponse needRefresh(TableConfig tableConfig, Schema 
schema,
+      SegmentDataManager segmentDataManager) {
+    String tableNameWithType = tableConfig.getTableName();
+    Map<String, FieldIndexConfigs> indexConfigsMap =
+        FieldIndexConfigsUtil.createIndexConfigsByColName(tableConfig, schema);
+
+    String segmentName = segmentDataManager.getSegmentName();
+    IndexSegment segment = segmentDataManager.getSegment();
+    SegmentMetadata segmentMetadata = segment.getSegmentMetadata();
+    Set<String> segmentPhysicalColumns = segment.getPhysicalColumnNames();
+
+    // Time column changed
+    String timeColumn = tableConfig.getValidationConfig().getTimeColumnName();
+    if (timeColumn != null) {
+      if (segmentMetadata.getTimeColumn() == null || 
!segmentMetadata.getTimeColumn().equals(timeColumn)) {
+        LOGGER.debug("tableNameWithType: {}, segmentName: {}, change: time 
column", tableNameWithType, segmentName);
+        return new NeedRefreshResponse(segmentName, true, "time column");
+      }
+    }
+
+    List<String> sortedColumns = 
tableConfig.getIndexingConfig().getSortedColumn();
+    String sortedColumn = CollectionUtils.isNotEmpty(sortedColumns) ? 
sortedColumns.get(0) : null;
+
+    String partitionColumn = null;
+    ColumnPartitionConfig partitionConfig = null;
+    SegmentPartitionConfig segmentPartitionConfig = 
tableConfig.getIndexingConfig().getSegmentPartitionConfig();
+    // NOTE: Partition can only be enabled on a single column
+    if (segmentPartitionConfig != null && 
segmentPartitionConfig.getColumnPartitionMap().size() == 1) {
+      Map.Entry<String, ColumnPartitionConfig> entry =
+          
segmentPartitionConfig.getColumnPartitionMap().entrySet().iterator().next();
+      partitionColumn = entry.getKey();
+      partitionConfig = entry.getValue();
+    }
+
+    Set<String> columnsInSegment = segmentMetadata.getAllColumns();
+
+    // Column is added
+    if (!columnsInSegment.containsAll(schema.getPhysicalColumnNames())) {
+      LOGGER.debug("tableNameWithType: {}, segmentName: {}, change: column 
added", tableNameWithType, segmentName);
+      return new NeedRefreshResponse(segmentName, true, "column added");
+    }
+
+    // Get Index configuration for the Table Config
+    Set<String> noDictionaryColumns =
+        
FieldIndexConfigsUtil.columnsWithIndexDisabled(StandardIndexes.dictionary(), 
indexConfigsMap);
+    Set<String> bloomFilters =
+        
FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.bloomFilter(), 
indexConfigsMap);
+    Set<String> jsonIndex = 
FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.json(), 
indexConfigsMap);
+    Set<String> invertedIndex =
+        
FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.inverted(), 
indexConfigsMap);
+    Set<String> nullValueVectorIndex =
+        
FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.nullValueVector(),
 indexConfigsMap);
+    Set<String> rangeIndex = 
FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.range(), 
indexConfigsMap);
+    Set<String> h3Indexes = 
FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.h3(), 
indexConfigsMap);
+    Set<String> fstIndexes = 
FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.fst(), 
indexConfigsMap);
+    Set<String> textIndexes = 
FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.text(), 
indexConfigsMap);
+    List<StarTreeIndexConfig> starTreeIndexConfigsFromTableConfig =
+        tableConfig.getIndexingConfig().getStarTreeIndexConfigs();
+
+    // Get the index configuration for StarTree index from segment metadata as 
JsonNode.
+    List<StarTreeV2> starTreeIndexMetadata = segment.getStarTrees();
+
+    // Generate StarTree index builder config from the segment metadata.
+    List<StarTreeV2BuilderConfig> builderConfigFromSegmentMetadata = new 
ArrayList<>();
+    if (starTreeIndexMetadata != null) {
+      for (StarTreeV2 starTreeV2 : starTreeIndexMetadata) {
+        
builderConfigFromSegmentMetadata.add(StarTreeV2BuilderConfig.fromMetadata(starTreeV2.getMetadata()));
+      }
+    }
+
+    // Generate StarTree index builder configs from the table config.
+    //TODO: RV This maybe using the wrong function. It is not using the 
table's schema
+    List<StarTreeV2BuilderConfig> builderConfigFromTableConfigs =
+        
StarTreeBuilderUtils.generateBuilderConfigs(starTreeIndexConfigsFromTableConfig,
+            tableConfig.getIndexingConfig().isEnableDefaultStarTree(), 
segmentMetadata);
+
+    // TODO: RV Test
+    // Check if there is a mismatch between the StarTree index builder configs 
from the table config and the segment
+    // metadata.
+    if 
(!StarTreeBuilderUtils.areStarTreeBuilderConfigListsEqual(builderConfigFromTableConfigs,
+        builderConfigFromSegmentMetadata)) {
+      return new NeedRefreshResponse(segmentName, true, "startree index");
+    }
+
+    for (String columnName : segmentPhysicalColumns) {
+      ColumnMetadata columnMetadata = 
segmentMetadata.getColumnMetadataFor(columnName);
+      FieldSpec fieldSpecInSchema = schema.getFieldSpecFor(columnName);
+      DataSource source = segment.getDataSource(columnName);
+      Preconditions.checkNotNull(columnMetadata);
+      Preconditions.checkNotNull(source);
+
+      // Column is deleted
+      if (fieldSpecInSchema == null) {
+        LOGGER.debug("tableNameWithType: {}, segmentName: {}, change: column 
deleted", tableNameWithType, segmentName);
+        return new NeedRefreshResponse(segmentName, true, "column deleted: " + 
columnName);
+      }
+
+      // Field type changed
+      if 
(columnMetadata.getFieldType().compareTo(fieldSpecInSchema.getFieldType()) != 
0) {
+        LOGGER.debug("tableNameWithType: {}, segmentName: {}, change: field 
type", tableNameWithType, segmentName);
+        return new NeedRefreshResponse(segmentName, true, "field type changed: 
" + columnName);
+      }
+
+      // Data type changed
+      if 
(!columnMetadata.getDataType().equals(fieldSpecInSchema.getDataType())) {
+        LOGGER.debug("tableNameWithType: {}, segmentName: {}, change: data 
type", tableNameWithType, segmentName);
+        return new NeedRefreshResponse(segmentName, true, "data type changed: 
" + columnName);
+      }
+
+      // SV/MV changed
+      if (columnMetadata.isSingleValue() != 
fieldSpecInSchema.isSingleValueField()) {
+        LOGGER.debug("tableNameWithType: {}, segmentName: {}, change: single / 
multi value", tableNameWithType,
+            segmentName);
+        return new NeedRefreshResponse(segmentName, true, "single / multi 
value changed: " + columnName);
+      }
+
+      // TODO: detect if an index changes from Dictionary to Variable Length 
Dictionary or vice versa.
+      // TODO: RV TEST
+      boolean colHasDictionary = columnMetadata.hasDictionary();
+      // Encoding changed
+      if (colHasDictionary == noDictionaryColumns.contains(columnName)) {
+        // Check if dictionary update is needed
+        // 1. If the segment metadata has dictionary enabled and table has it 
disabled, its incompatible and refresh is
+        // needed.
+        // 2. If segment metadata has dictionary disabled, check if it has to 
be overridden. If not overridden,
+        // refresh is needed, since table has it enabled.
+        boolean incompatible = colHasDictionary || 
DictionaryIndexType.ignoreDictionaryOverride(
+            tableConfig.getIndexingConfig().isOptimizeDictionary(),
+            tableConfig.getIndexingConfig().isOptimizeDictionaryForMetrics(),
+            
tableConfig.getIndexingConfig().getNoDictionarySizeRatioThreshold(),
+            
tableConfig.getIndexingConfig().getNoDictionaryCardinalityRatioThreshold(), 
fieldSpecInSchema,
+            indexConfigsMap.get(columnName), columnMetadata.getCardinality(), 
columnMetadata.getTotalNumberOfEntries());
+        if (incompatible) {
+          LOGGER.debug("tableNameWithType: {}, segmentName: {}, change: 
dictionary encoding,", tableNameWithType,
+              segmentName);
+          return new NeedRefreshResponse(segmentName, true, "dictionary 
encoding changed: " + columnName);
+        } else {
+          LOGGER.debug("tableNameWithType: {}, segmentName: {}, no change as 
dictionary overrides applied to col: {}",
+              tableNameWithType, segmentName, columnName);
+        }
+      }
+
+      // Sorted column not sorted
+      if (columnName.equals(sortedColumn) && !columnMetadata.isSorted()) {
+        LOGGER.debug("tableNameWithType: {}, segmentName: {}, change: sort 
column", tableNameWithType, segmentName);
+        return new NeedRefreshResponse(segmentName, true, "sort column 
changed: " + columnName);
+      }
+
+      if (Objects.isNull(source.getBloomFilter()) == 
bloomFilters.contains(columnName)) {
+        LOGGER.debug("tableNameWithType: {}, segmentName: {}, change: bloom 
filter changed", tableNameWithType,
+            segmentName);
+        return new NeedRefreshResponse(segmentName, true, "bloom filter 
changed: " + columnName);
+      }
+
+      if (Objects.isNull(source.getJsonIndex()) == 
jsonIndex.contains(columnName)) {
+        LOGGER.debug("tableNameWithType: {}, segmentName: {}, change: json 
index changed", tableNameWithType,
+            segmentName);
+        return new NeedRefreshResponse(segmentName, true, "json index changed: 
" + columnName);
+      }
+
+      if (Objects.isNull(source.getTextIndex()) == 
textIndexes.contains(columnName)) {
+        LOGGER.debug("tableNameWithType: {}, segmentName: {}, change: text 
index changed", tableNameWithType,
+            segmentName);
+        return new NeedRefreshResponse(segmentName, true, "text index changed: 
" + columnName);
+      }
+
+      if (Objects.isNull(source.getFSTIndex()) == 
fstIndexes.contains(columnName)) {
+        LOGGER.debug("tableNameWithType: {}, segmentName: {}, change: fst 
index changed", tableNameWithType,
+            segmentName);
+        return new NeedRefreshResponse(segmentName, true, "fst index changed: 
" + columnName);
+      }
+
+      if (Objects.isNull(source.getH3Index()) == 
h3Indexes.contains(columnName)) {
+        LOGGER.debug("tableNameWithType: {}, segmentName: {}, change: h3 index 
changed", tableNameWithType,

Review Comment:
   Added `columnName` to all segments. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to