KKcorps commented on code in PR #14451: URL: https://github.com/apache/pinot/pull/14451#discussion_r1853838884
########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java: ########## @@ -1046,6 +1064,261 @@ public boolean needReloadSegments() return needReload; } + @Override + public List<NeedRefreshResponse> getSegmentsForRefresh(TableConfig tableConfig, Schema schema) { + List<NeedRefreshResponse> segmentsRequiringRefresh = new ArrayList<>(); + List<SegmentDataManager> segmentDataManagers = acquireAllSegments(); + for (SegmentDataManager segmentDataManager : segmentDataManagers) { + NeedRefreshResponse response = needRefresh(tableConfig, schema, segmentDataManager); + if (response.isNeedRefresh()) { + segmentsRequiringRefresh.add(response); + } + } + + return segmentsRequiringRefresh; + } + + protected NeedRefreshResponse needRefresh(TableConfig tableConfig, Schema schema, + SegmentDataManager segmentDataManager) { + String tableNameWithType = tableConfig.getTableName(); + Map<String, FieldIndexConfigs> indexConfigsMap = + FieldIndexConfigsUtil.createIndexConfigsByColName(tableConfig, schema); + + String segmentName = segmentDataManager.getSegmentName(); + IndexSegment segment = segmentDataManager.getSegment(); + SegmentMetadata segmentMetadata = segment.getSegmentMetadata(); + Set<String> segmentPhysicalColumns = segment.getPhysicalColumnNames(); + + // Time column changed + String timeColumn = tableConfig.getValidationConfig().getTimeColumnName(); + if (timeColumn != null) { + if (segmentMetadata.getTimeColumn() == null || !segmentMetadata.getTimeColumn().equals(timeColumn)) { + LOGGER.debug("tableNameWithType: {}, segmentName: {}, change: time column", tableNameWithType, segmentName); + return new NeedRefreshResponse(segmentName, true, "time column"); + } + } + + List<String> sortedColumns = tableConfig.getIndexingConfig().getSortedColumn(); + String sortedColumn = CollectionUtils.isNotEmpty(sortedColumns) ? sortedColumns.get(0) : null; + + String partitionColumn = null; + ColumnPartitionConfig partitionConfig = null; + SegmentPartitionConfig segmentPartitionConfig = tableConfig.getIndexingConfig().getSegmentPartitionConfig(); + // NOTE: Partition can only be enabled on a single column + if (segmentPartitionConfig != null && segmentPartitionConfig.getColumnPartitionMap().size() == 1) { + Map.Entry<String, ColumnPartitionConfig> entry = + segmentPartitionConfig.getColumnPartitionMap().entrySet().iterator().next(); + partitionColumn = entry.getKey(); + partitionConfig = entry.getValue(); + } + + Set<String> columnsInSegment = segmentMetadata.getAllColumns(); + + // Column is added + if (!columnsInSegment.containsAll(schema.getPhysicalColumnNames())) { + LOGGER.debug("tableNameWithType: {}, segmentName: {}, change: column added", tableNameWithType, segmentName); + return new NeedRefreshResponse(segmentName, true, "column added"); + } + + // Get Index configuration for the Table Config + Set<String> noDictionaryColumns = + FieldIndexConfigsUtil.columnsWithIndexDisabled(StandardIndexes.dictionary(), indexConfigsMap); + Set<String> bloomFilters = + FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.bloomFilter(), indexConfigsMap); + Set<String> jsonIndex = FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.json(), indexConfigsMap); + Set<String> invertedIndex = + FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.inverted(), indexConfigsMap); + Set<String> nullValueVectorIndex = + FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.nullValueVector(), indexConfigsMap); + Set<String> rangeIndex = FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.range(), indexConfigsMap); + Set<String> h3Indexes = FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.h3(), indexConfigsMap); + Set<String> fstIndexes = FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.fst(), indexConfigsMap); + Set<String> textIndexes = FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.text(), indexConfigsMap); + List<StarTreeIndexConfig> starTreeIndexConfigsFromTableConfig = + tableConfig.getIndexingConfig().getStarTreeIndexConfigs(); + + // Get the index configuration for StarTree index from segment metadata as JsonNode. + List<StarTreeV2> starTreeIndexMetadata = segment.getStarTrees(); + + // Generate StarTree index builder config from the segment metadata. + List<StarTreeV2BuilderConfig> builderConfigFromSegmentMetadata = new ArrayList<>(); + if (starTreeIndexMetadata != null) { + for (StarTreeV2 starTreeV2 : starTreeIndexMetadata) { + builderConfigFromSegmentMetadata.add(StarTreeV2BuilderConfig.fromMetadata(starTreeV2.getMetadata())); + } + } + + // Generate StarTree index builder configs from the table config. + //TODO: RV This maybe using the wrong function. It is not using the table's schema + List<StarTreeV2BuilderConfig> builderConfigFromTableConfigs = + StarTreeBuilderUtils.generateBuilderConfigs(starTreeIndexConfigsFromTableConfig, + tableConfig.getIndexingConfig().isEnableDefaultStarTree(), segmentMetadata); + + // TODO: RV Test + // Check if there is a mismatch between the StarTree index builder configs from the table config and the segment + // metadata. + if (!StarTreeBuilderUtils.areStarTreeBuilderConfigListsEqual(builderConfigFromTableConfigs, + builderConfigFromSegmentMetadata)) { + return new NeedRefreshResponse(segmentName, true, "startree index"); + } + + for (String columnName : segmentPhysicalColumns) { + ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(columnName); + FieldSpec fieldSpecInSchema = schema.getFieldSpecFor(columnName); + DataSource source = segment.getDataSource(columnName); + Preconditions.checkNotNull(columnMetadata); + Preconditions.checkNotNull(source); + + // Column is deleted + if (fieldSpecInSchema == null) { + LOGGER.debug("tableNameWithType: {}, segmentName: {}, change: column deleted", tableNameWithType, segmentName); + return new NeedRefreshResponse(segmentName, true, "column deleted: " + columnName); + } + + // Field type changed + if (columnMetadata.getFieldType().compareTo(fieldSpecInSchema.getFieldType()) != 0) { + LOGGER.debug("tableNameWithType: {}, segmentName: {}, change: field type", tableNameWithType, segmentName); + return new NeedRefreshResponse(segmentName, true, "field type changed: " + columnName); + } + + // Data type changed + if (!columnMetadata.getDataType().equals(fieldSpecInSchema.getDataType())) { + LOGGER.debug("tableNameWithType: {}, segmentName: {}, change: data type", tableNameWithType, segmentName); + return new NeedRefreshResponse(segmentName, true, "data type changed: " + columnName); + } + + // SV/MV changed + if (columnMetadata.isSingleValue() != fieldSpecInSchema.isSingleValueField()) { + LOGGER.debug("tableNameWithType: {}, segmentName: {}, change: single / multi value", tableNameWithType, + segmentName); + return new NeedRefreshResponse(segmentName, true, "single / multi value changed: " + columnName); + } + + // TODO: detect if an index changes from Dictionary to Variable Length Dictionary or vice versa. + // TODO: RV TEST + boolean colHasDictionary = columnMetadata.hasDictionary(); + // Encoding changed + if (colHasDictionary == noDictionaryColumns.contains(columnName)) { + // Check if dictionary update is needed + // 1. If the segment metadata has dictionary enabled and table has it disabled, its incompatible and refresh is + // needed. + // 2. If segment metadata has dictionary disabled, check if it has to be overridden. If not overridden, + // refresh is needed, since table has it enabled. + boolean incompatible = colHasDictionary || DictionaryIndexType.ignoreDictionaryOverride( + tableConfig.getIndexingConfig().isOptimizeDictionary(), + tableConfig.getIndexingConfig().isOptimizeDictionaryForMetrics(), + tableConfig.getIndexingConfig().getNoDictionarySizeRatioThreshold(), + tableConfig.getIndexingConfig().getNoDictionaryCardinalityRatioThreshold(), fieldSpecInSchema, + indexConfigsMap.get(columnName), columnMetadata.getCardinality(), columnMetadata.getTotalNumberOfEntries()); + if (incompatible) { + LOGGER.debug("tableNameWithType: {}, segmentName: {}, change: dictionary encoding,", tableNameWithType, + segmentName); + return new NeedRefreshResponse(segmentName, true, "dictionary encoding changed: " + columnName); + } else { + LOGGER.debug("tableNameWithType: {}, segmentName: {}, no change as dictionary overrides applied to col: {}", + tableNameWithType, segmentName, columnName); + } + } + + // Sorted column not sorted + if (columnName.equals(sortedColumn) && !columnMetadata.isSorted()) { + LOGGER.debug("tableNameWithType: {}, segmentName: {}, change: sort column", tableNameWithType, segmentName); + return new NeedRefreshResponse(segmentName, true, "sort column changed: " + columnName); + } + + if (Objects.isNull(source.getBloomFilter()) == bloomFilters.contains(columnName)) { + LOGGER.debug("tableNameWithType: {}, segmentName: {}, change: bloom filter changed", tableNameWithType, + segmentName); + return new NeedRefreshResponse(segmentName, true, "bloom filter changed: " + columnName); + } + + if (Objects.isNull(source.getJsonIndex()) == jsonIndex.contains(columnName)) { + LOGGER.debug("tableNameWithType: {}, segmentName: {}, change: json index changed", tableNameWithType, + segmentName); + return new NeedRefreshResponse(segmentName, true, "json index changed: " + columnName); + } + + if (Objects.isNull(source.getTextIndex()) == textIndexes.contains(columnName)) { + LOGGER.debug("tableNameWithType: {}, segmentName: {}, change: text index changed", tableNameWithType, + segmentName); + return new NeedRefreshResponse(segmentName, true, "text index changed: " + columnName); + } + + if (Objects.isNull(source.getFSTIndex()) == fstIndexes.contains(columnName)) { + LOGGER.debug("tableNameWithType: {}, segmentName: {}, change: fst index changed", tableNameWithType, + segmentName); + return new NeedRefreshResponse(segmentName, true, "fst index changed: " + columnName); + } + + if (Objects.isNull(source.getH3Index()) == h3Indexes.contains(columnName)) { + LOGGER.debug("tableNameWithType: {}, segmentName: {}, change: h3 index changed", tableNameWithType, Review Comment: nit: Can we improve this log to also have columnName for which the index was changed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org