yihua commented on code in PR #18348:
URL: https://github.com/apache/hudi/pull/18348#discussion_r3048419158


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/columnstats/ColumnStatsIndexer.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata.index.columnstats;
+
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.metadata.model.FileInfo;
+import org.apache.hudi.metadata.model.FileSliceAndPartition;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.metadata.index.model.IndexPartitionInitialization;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.metadata.index.BaseIndexer;
+import org.apache.hudi.util.Lazy;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hudi.index.HoodieIndexUtils.register;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.existingIndexVersionOrDefault;
+import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
+
+/**
+ * Implementation of {@link MetadataPartitionType#COLUMN_STATS} metadata
+ */
+@Slf4j
+public class ColumnStatsIndexer extends BaseIndexer {
+  private final Lazy<List<String>> columnsToIndex;
+
+  public ColumnStatsIndexer(HoodieEngineContext engineContext,
+                               HoodieWriteConfig dataTableWriteConfig,
+                               HoodieTableMetaClient dataTableMetaClient) {
+    super(engineContext, dataTableWriteConfig, dataTableMetaClient);
+
+    this.columnsToIndex = Lazy.lazily(() ->
+        new ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(
+            dataTableMetaClient.getTableConfig(),
+            dataTableWriteConfig.getMetadataConfig(),
+            Lazy.lazily(() -> 
HoodieTableMetadataUtil.tryResolveSchemaForTable(dataTableMetaClient)),
+            true,
+            Option.of(dataTableWriteConfig.getRecordMerger().getRecordType()),
+            existingIndexVersionOrDefault(PARTITION_NAME_COLUMN_STATS, 
dataTableMetaClient)).keySet()));
+  }
+
+  @Override
+  public List<IndexPartitionInitialization> buildInitialization(
+      String dataTableInstantTime,
+      String instantTimeForPartition,
+      Map<String, List<FileInfo>> partitionToAllFilesMap,
+      Lazy<List<FileSliceAndPartition>> lazyPartitionFileSlices) throws 
IOException {
+    final int fileGroupCount = 
dataTableWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount();
+    if (partitionToAllFilesMap.isEmpty() || columnsToIndex.get().isEmpty()) {
+      return 
Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, 
COLUMN_STATS.getPartitionPath(), engineContext.emptyHoodieData()));
+    }
+
+    log.info("Indexing {} columns for column stats index", 
columnsToIndex.get().size());
+    // during initialization, we need stats for base and log files.
+    HoodieData<HoodieRecord> records = 
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
+        engineContext, Collections.emptyMap(), partitionToAllFilesMap,
+        dataTableMetaClient, 
dataTableWriteConfig.getColumnStatsIndexParallelism(),
+        dataTableWriteConfig.getMetadataConfig().getMaxReaderBufferSize(),
+        columnsToIndex.get());
+    return 
Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, 
COLUMN_STATS.getPartitionPath(), records));
+  }

Review Comment:
   🤖 When `columnsToIndex` is empty and `buildInitialization` returns empty 
data, `postInitialization` will still be called and will register an index 
definition with empty source fields. Is that intentional? It seems like 
registering a column stats index with no source fields could confuse downstream 
readers.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -432,107 +412,78 @@ private boolean initializeFromFilesystem(String 
dataTableInstantTime, List<Metad
         partitionInfoList = Collections.emptyList();
       }
     }
-    Map<String, Map<String, Long>> partitionIdToAllFilesMap = 
partitionInfoList.stream()
-        .map(p -> {
-          String partitionName = 
HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition(p.getRelativePath());
-          return Pair.of(partitionName, p.getFilenameToSizeMap());
-        })
-        .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
-
-    // validate that each index is eligible to be initialized
-    Iterator<MetadataPartitionType> iterator = partitionsToInit.iterator();
-    while (iterator.hasNext()) {
-      MetadataPartitionType partitionType = iterator.next();
-      if (partitionType == PARTITION_STATS && 
!dataMetaClient.getTableConfig().isTablePartitioned()) {
-        // Partition stats index cannot be enabled for a non-partitioned table
-        iterator.remove();
-        this.enabledPartitionTypes.remove(partitionType);
-      }
-    }
+    Map<String, List<FileInfo>> partitionIdToAllFilesMap = 
DirectoryInfo.getPartitionToFileInfo(partitionInfoList);
+    Lazy<List<FileSliceAndPartition>> lazyLatestMergedPartitionFileSliceList = 
getLazyLatestMergedPartitionFileSliceList();
 
-    // For a fresh table, defer RLI initialization
-    if (dataWriteConfig.getMetadataConfig().shouldDeferRliInitForFreshTable() 
&& this.enabledPartitionTypes.contains(RECORD_INDEX)
-        && 
dataMetaClient.getActiveTimeline().filterCompletedInstants().countInstants() == 
0) {
-      this.enabledPartitionTypes.remove(RECORD_INDEX);
-      partitionsToInit.remove(RECORD_INDEX);
+    // FILES partition should always be initialized first if enabled
+    if (!filesPartitionAvailable) {
+      initializeMetadataPartition(FILES, 
indexerMapForPartitionsToInit.get(FILES),
+          dataTableInstantTime, partitionIdToAllFilesMap, 
lazyLatestMergedPartitionFileSliceList);
+      hasPartitionsStateChanged = true;
     }
 
-    Lazy<List<Pair<String, FileSlice>>> lazyLatestMergedPartitionFileSliceList 
= getLazyLatestMergedPartitionFileSliceList();
-    for (MetadataPartitionType partitionType : partitionsToInit) {
-      // Find the commit timestamp to use for this partition. Each 
initialization should use its own unique commit time.
-      String instantTimeForPartition = 
generateUniqueInstantTime(dataTableInstantTime);
-      String partitionTypeName = partitionType.name();
-      LOG.info("Initializing MDT partition {} at instant {}", 
partitionTypeName, instantTimeForPartition);
-      String relativePartitionPath;
-      Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair;
-      Lazy<Option<HoodieSchema>> tableSchema = Lazy.lazily(() -> 
HoodieTableMetadataUtil.tryResolveSchemaForTable(dataMetaClient));
-      try {
-        switch (partitionType) {
-          case FILES:
-            fileGroupCountAndRecordsPair = 
initializeFilesPartition(partitionIdToAllFilesMap);
-            initializeFilegroupsAndCommit(partitionType, 
FILES.getPartitionPath(), fileGroupCountAndRecordsPair, 
instantTimeForPartition);
-            break;
-          case BLOOM_FILTERS:
-            fileGroupCountAndRecordsPair = 
initializeBloomFiltersPartition(dataTableInstantTime, partitionIdToAllFilesMap);
-            initializeFilegroupsAndCommit(partitionType, 
BLOOM_FILTERS.getPartitionPath(), fileGroupCountAndRecordsPair, 
instantTimeForPartition);
-            break;
-          case COLUMN_STATS:
-            Pair<List<String>, Pair<Integer, HoodieData<HoodieRecord>>> 
colStatsColumnsAndRecord = 
initializeColumnStatsPartition(partitionIdToAllFilesMap, tableSchema);
-            fileGroupCountAndRecordsPair = colStatsColumnsAndRecord.getValue();
-            initializeFilegroupsAndCommit(partitionType, 
COLUMN_STATS.getPartitionPath(), fileGroupCountAndRecordsPair, 
instantTimeForPartition, colStatsColumnsAndRecord.getKey());
-            break;
-          case RECORD_INDEX:
-            boolean isPartitionedRLI = 
dataWriteConfig.isRecordLevelIndexEnabled();
-            
initializeFilegroupsAndCommitToRecordIndexPartition(instantTimeForPartition, 
lazyLatestMergedPartitionFileSliceList, isPartitionedRLI);
-            break;
-          case EXPRESSION_INDEX:
-            Set<String> expressionIndexPartitionsToInit = 
getExpressionIndexPartitionsToInit(partitionType, 
dataWriteConfig.getMetadataConfig(), dataMetaClient);
-            if (expressionIndexPartitionsToInit.size() != 1) {
-              if (expressionIndexPartitionsToInit.size() > 1) {
-                LOG.warn("Skipping expression index initialization as only one 
expression index bootstrap at a time is supported for now. Provided: {}", 
expressionIndexPartitionsToInit);
-              }
-              continue;
-            }
-            relativePartitionPath = 
expressionIndexPartitionsToInit.iterator().next();
-            fileGroupCountAndRecordsPair = 
initializeExpressionIndexPartition(relativePartitionPath, dataTableInstantTime, 
lazyLatestMergedPartitionFileSliceList, tableSchema);
-            initializeFilegroupsAndCommit(partitionType, 
relativePartitionPath, fileGroupCountAndRecordsPair, instantTimeForPartition);
-            break;
-          case PARTITION_STATS:
-            // For PARTITION_STATS, COLUMN_STATS should also be enabled
-            if (!dataWriteConfig.isMetadataColumnStatsIndexEnabled()) {
-              LOG.debug("Skipping partition stats initialization as column 
stats index is not enabled. Please enable {}",
-                  
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key());
-              continue;
-            }
-            fileGroupCountAndRecordsPair = 
initializePartitionStatsIndex(lazyLatestMergedPartitionFileSliceList, 
tableSchema);
-            initializeFilegroupsAndCommit(partitionType, 
PARTITION_STATS.getPartitionPath(), fileGroupCountAndRecordsPair, 
instantTimeForPartition);
-            break;
-          case SECONDARY_INDEX:
-            Set<String> secondaryIndexPartitionsToInit = 
getSecondaryIndexPartitionsToInit(partitionType, 
dataWriteConfig.getMetadataConfig(), dataMetaClient);
-            if (secondaryIndexPartitionsToInit.size() != 1) {
-              if (secondaryIndexPartitionsToInit.size() > 1) {
-                LOG.warn("Skipping secondary index initialization as only one 
secondary index bootstrap at a time is supported for now. Provided: {}", 
secondaryIndexPartitionsToInit);
-              }
-              continue;
-            }
-            relativePartitionPath = 
secondaryIndexPartitionsToInit.iterator().next();
-            fileGroupCountAndRecordsPair = 
initializeSecondaryIndexPartition(relativePartitionPath, 
lazyLatestMergedPartitionFileSliceList);
-            initializeFilegroupsAndCommit(partitionType, 
relativePartitionPath, fileGroupCountAndRecordsPair, instantTimeForPartition);
-            break;
-          default:
-            throw new HoodieMetadataException(String.format("Unsupported MDT 
partition type: %s", partitionType));
-        }
-      } catch (Exception e) {
-        String metricKey = partitionType.getPartitionPath() + "_" + 
HoodieMetadataMetrics.BOOTSTRAP_ERR_STR;
-        metrics.ifPresent(m -> m.setMetric(metricKey, 1));
-        String errMsg = String.format("Bootstrap on %s partition failed for 
%s",
-            partitionType.getPartitionPath(), 
metadataMetaClient.getBasePath());
-        LOG.error(errMsg, e);
-        throw new HoodieMetadataException(errMsg, e);
+    indexerMapForPartitionsToInit.entrySet().stream().filter(e -> e.getKey() 
!= FILES).forEach(
+        e -> {
+          try {
+            initializeMetadataPartition(e.getKey(), e.getValue(), 
dataTableInstantTime, partitionIdToAllFilesMap, 
lazyLatestMergedPartitionFileSliceList);
+          } catch (IOException ex) {
+            throw new HoodieMetadataException("Failed to initialize metadata 
partition: " + e.getKey(), ex);
+          }
+          hasPartitionsStateChanged = true;
+        });
+    return true;
+  }
+
+  private void initializeMetadataPartition(
+      MetadataPartitionType partitionType,
+      Indexer indexer,

Review Comment:
   🤖 `hasPartitionsStateChanged = true` is set after 
`initializeMetadataPartition` returns, even when `buildInitialization` returns 
an empty list and the method returns early without actually initializing 
anything. In the old code, `continue` in the for-loop would skip setting this 
flag. Could you move `hasPartitionsStateChanged = true` inside 
`initializeMetadataPartition` so it's only set when initialization actually 
happened?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/secondary/SecondaryIndexer.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata.index.secondary;
+
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.metadata.model.FileInfo;
+import org.apache.hudi.metadata.model.FileSliceAndPartition;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.metadata.index.model.IndexPartitionInitialization;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.metadata.index.BaseIndexer;
+import org.apache.hudi.util.Lazy;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.RECORD_INDEX_AVERAGE_RECORD_SIZE;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getSecondaryIndexPartitionsToInit;
+import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX;
+import static org.apache.hudi.metadata.MetadataPartitionType.SECONDARY_INDEX;
+import static 
org.apache.hudi.metadata.SecondaryIndexRecordGenerationUtils.readSecondaryKeysFromFileSlices;
+
+/**
+ * Implementation of {@link MetadataPartitionType#SECONDARY_INDEX} index
+ */
+@Slf4j
+public class SecondaryIndexer extends BaseIndexer {
+
+  public SecondaryIndexer(
+      HoodieEngineContext engineContext,
+      HoodieWriteConfig dataTableWriteConfig,
+      HoodieTableMetaClient dataTableMetaClient) {
+    super(engineContext, dataTableWriteConfig, dataTableMetaClient);
+  }
+
+  @Override
+  public List<IndexPartitionInitialization> buildInitialization(String 
dataTableInstantTime, String instantTimeForPartition, Map<String, 
List<FileInfo>> partitionToAllFilesMap,
+                                                                
Lazy<List<FileSliceAndPartition>> lazyPartitionFileSlices) throws IOException {
+    Set<String> secondaryIndexPartitionsToInit = 
getSecondaryIndexPartitionsToInit(SECONDARY_INDEX, 
dataTableWriteConfig.getMetadataConfig(), dataTableMetaClient);
+    if (secondaryIndexPartitionsToInit.size() != 1) {
+      if (secondaryIndexPartitionsToInit.size() > 1) {
+        log.warn("Skipping secondary index initialization as only one 
secondary index bootstrap at a time is supported for now. Provided: {}", 
secondaryIndexPartitionsToInit);
+      }
+      return Collections.emptyList();
+    }
+
+    String indexName = secondaryIndexPartitionsToInit.iterator().next();
+    HoodieIndexDefinition indexDefinition = 
HoodieTableMetadataUtil.getHoodieIndexDefinition(indexName, 
dataTableMetaClient);
+    ValidationUtils.checkState(indexDefinition != null, "Secondary Index 
definition is not present for index " + indexName);
+
+    List<FileSliceAndPartition> partitionFileSlicePairs = 
lazyPartitionFileSlices.get();
+
+    int parallelism = Math.min(partitionFileSlicePairs.size(), 
dataTableWriteConfig.getMetadataConfig().getSecondaryIndexParallelism());
+    HoodieData<HoodieRecord> records = readSecondaryKeysFromFileSlices(
+        engineContext,
+        partitionFileSlicePairs,
+        parallelism,
+        this.getClass().getSimpleName(),
+        dataTableMetaClient,
+        indexDefinition,
+        dataTableWriteConfig.getProps());

Review Comment:
   🤖 This passes `RECORD_INDEX` as the partition type to 
`estimateFileGroupCount`, but this is for a secondary index partition. The 
partition type is used in logging, so this will produce misleading log messages 
(e.g. "Estimated file group count for RECORD_INDEX: ..."). Should this be the 
actual secondary index partition type instead?



##########
hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java:
##########
@@ -439,22 +439,22 @@ public static List<MetadataPartitionType> 
getMetadataPartitionsNeedingWriteStatu
     return Collections.singletonList(MetadataPartitionType.RECORD_INDEX);
   }
 
-  /**
-   * Returns the set of all metadata partition names.
-   */
-  public static Set<String> getAllPartitionPaths() {
-    return Arrays.stream(getValidValues())
-        .map(MetadataPartitionType::getPartitionPath)
-        .collect(Collectors.toSet());
-  }
-
   /**
    * Returns the set of all valid metadata partition types. Prefer using this 
method over {@link #values()}.
    */
-  public static MetadataPartitionType[] getValidValues() {
-    // ALL_PARTITIONS is just another record type in FILES partition
+  public static MetadataPartitionType[] getValidValues(HoodieTableVersion 
tableVersion) {
+    if (tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
+      // ALL_PARTITIONS is just another record type in FILES partition
+      return EnumSet.complementOf(EnumSet.of(

Review Comment:
   🤖 For table versions < 8, this now filters out `SECONDARY_INDEX`, 
`EXPRESSION_INDEX`, and `PARTITION_STATS`. Previously `getValidValues()` 
returned all types regardless of version. Is this intentional — were these 
partition types never actually functional on pre-v8 tables? I want to make sure 
this doesn't accidentally break enabling these indexes on older tables that 
haven't upgraded.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -432,107 +412,78 @@ private boolean initializeFromFilesystem(String 
dataTableInstantTime, List<Metad
         partitionInfoList = Collections.emptyList();
       }
     }
-    Map<String, Map<String, Long>> partitionIdToAllFilesMap = 
partitionInfoList.stream()
-        .map(p -> {
-          String partitionName = 
HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition(p.getRelativePath());
-          return Pair.of(partitionName, p.getFilenameToSizeMap());
-        })
-        .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
-
-    // validate that each index is eligible to be initialized
-    Iterator<MetadataPartitionType> iterator = partitionsToInit.iterator();
-    while (iterator.hasNext()) {
-      MetadataPartitionType partitionType = iterator.next();
-      if (partitionType == PARTITION_STATS && 
!dataMetaClient.getTableConfig().isTablePartitioned()) {
-        // Partition stats index cannot be enabled for a non-partitioned table
-        iterator.remove();
-        this.enabledPartitionTypes.remove(partitionType);
-      }
-    }
+    Map<String, List<FileInfo>> partitionIdToAllFilesMap = 
DirectoryInfo.getPartitionToFileInfo(partitionInfoList);
+    Lazy<List<FileSliceAndPartition>> lazyLatestMergedPartitionFileSliceList = 
getLazyLatestMergedPartitionFileSliceList();
 
-    // For a fresh table, defer RLI initialization
-    if (dataWriteConfig.getMetadataConfig().shouldDeferRliInitForFreshTable() 
&& this.enabledPartitionTypes.contains(RECORD_INDEX)
-        && 
dataMetaClient.getActiveTimeline().filterCompletedInstants().countInstants() == 
0) {
-      this.enabledPartitionTypes.remove(RECORD_INDEX);
-      partitionsToInit.remove(RECORD_INDEX);
+    // FILES partition should always be initialized first if enabled
+    if (!filesPartitionAvailable) {
+      initializeMetadataPartition(FILES, 
indexerMapForPartitionsToInit.get(FILES),
+          dataTableInstantTime, partitionIdToAllFilesMap, 
lazyLatestMergedPartitionFileSliceList);
+      hasPartitionsStateChanged = true;
     }
 
-    Lazy<List<Pair<String, FileSlice>>> lazyLatestMergedPartitionFileSliceList 
= getLazyLatestMergedPartitionFileSliceList();
-    for (MetadataPartitionType partitionType : partitionsToInit) {
-      // Find the commit timestamp to use for this partition. Each 
initialization should use its own unique commit time.
-      String instantTimeForPartition = 
generateUniqueInstantTime(dataTableInstantTime);
-      String partitionTypeName = partitionType.name();
-      LOG.info("Initializing MDT partition {} at instant {}", 
partitionTypeName, instantTimeForPartition);
-      String relativePartitionPath;
-      Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair;
-      Lazy<Option<HoodieSchema>> tableSchema = Lazy.lazily(() -> 
HoodieTableMetadataUtil.tryResolveSchemaForTable(dataMetaClient));
-      try {
-        switch (partitionType) {
-          case FILES:
-            fileGroupCountAndRecordsPair = 
initializeFilesPartition(partitionIdToAllFilesMap);
-            initializeFilegroupsAndCommit(partitionType, 
FILES.getPartitionPath(), fileGroupCountAndRecordsPair, 
instantTimeForPartition);
-            break;
-          case BLOOM_FILTERS:
-            fileGroupCountAndRecordsPair = 
initializeBloomFiltersPartition(dataTableInstantTime, partitionIdToAllFilesMap);
-            initializeFilegroupsAndCommit(partitionType, 
BLOOM_FILTERS.getPartitionPath(), fileGroupCountAndRecordsPair, 
instantTimeForPartition);
-            break;
-          case COLUMN_STATS:
-            Pair<List<String>, Pair<Integer, HoodieData<HoodieRecord>>> 
colStatsColumnsAndRecord = 
initializeColumnStatsPartition(partitionIdToAllFilesMap, tableSchema);
-            fileGroupCountAndRecordsPair = colStatsColumnsAndRecord.getValue();
-            initializeFilegroupsAndCommit(partitionType, 
COLUMN_STATS.getPartitionPath(), fileGroupCountAndRecordsPair, 
instantTimeForPartition, colStatsColumnsAndRecord.getKey());
-            break;
-          case RECORD_INDEX:
-            boolean isPartitionedRLI = 
dataWriteConfig.isRecordLevelIndexEnabled();
-            
initializeFilegroupsAndCommitToRecordIndexPartition(instantTimeForPartition, 
lazyLatestMergedPartitionFileSliceList, isPartitionedRLI);
-            break;
-          case EXPRESSION_INDEX:
-            Set<String> expressionIndexPartitionsToInit = 
getExpressionIndexPartitionsToInit(partitionType, 
dataWriteConfig.getMetadataConfig(), dataMetaClient);
-            if (expressionIndexPartitionsToInit.size() != 1) {
-              if (expressionIndexPartitionsToInit.size() > 1) {
-                LOG.warn("Skipping expression index initialization as only one 
expression index bootstrap at a time is supported for now. Provided: {}", 
expressionIndexPartitionsToInit);
-              }
-              continue;
-            }
-            relativePartitionPath = 
expressionIndexPartitionsToInit.iterator().next();
-            fileGroupCountAndRecordsPair = 
initializeExpressionIndexPartition(relativePartitionPath, dataTableInstantTime, 
lazyLatestMergedPartitionFileSliceList, tableSchema);
-            initializeFilegroupsAndCommit(partitionType, 
relativePartitionPath, fileGroupCountAndRecordsPair, instantTimeForPartition);
-            break;
-          case PARTITION_STATS:
-            // For PARTITION_STATS, COLUMN_STATS should also be enabled
-            if (!dataWriteConfig.isMetadataColumnStatsIndexEnabled()) {
-              LOG.debug("Skipping partition stats initialization as column 
stats index is not enabled. Please enable {}",
-                  
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key());
-              continue;
-            }
-            fileGroupCountAndRecordsPair = 
initializePartitionStatsIndex(lazyLatestMergedPartitionFileSliceList, 
tableSchema);
-            initializeFilegroupsAndCommit(partitionType, 
PARTITION_STATS.getPartitionPath(), fileGroupCountAndRecordsPair, 
instantTimeForPartition);
-            break;
-          case SECONDARY_INDEX:
-            Set<String> secondaryIndexPartitionsToInit = 
getSecondaryIndexPartitionsToInit(partitionType, 
dataWriteConfig.getMetadataConfig(), dataMetaClient);
-            if (secondaryIndexPartitionsToInit.size() != 1) {
-              if (secondaryIndexPartitionsToInit.size() > 1) {
-                LOG.warn("Skipping secondary index initialization as only one 
secondary index bootstrap at a time is supported for now. Provided: {}", 
secondaryIndexPartitionsToInit);
-              }
-              continue;
-            }
-            relativePartitionPath = 
secondaryIndexPartitionsToInit.iterator().next();
-            fileGroupCountAndRecordsPair = 
initializeSecondaryIndexPartition(relativePartitionPath, 
lazyLatestMergedPartitionFileSliceList);
-            initializeFilegroupsAndCommit(partitionType, 
relativePartitionPath, fileGroupCountAndRecordsPair, instantTimeForPartition);
-            break;
-          default:
-            throw new HoodieMetadataException(String.format("Unsupported MDT 
partition type: %s", partitionType));
-        }
-      } catch (Exception e) {
-        String metricKey = partitionType.getPartitionPath() + "_" + 
HoodieMetadataMetrics.BOOTSTRAP_ERR_STR;
-        metrics.ifPresent(m -> m.setMetric(metricKey, 1));
-        String errMsg = String.format("Bootstrap on %s partition failed for 
%s",
-            partitionType.getPartitionPath(), 
metadataMetaClient.getBasePath());
-        LOG.error(errMsg, e);
-        throw new HoodieMetadataException(errMsg, e);
+    indexerMapForPartitionsToInit.entrySet().stream().filter(e -> e.getKey() 
!= FILES).forEach(
+        e -> {
+          try {
+            initializeMetadataPartition(e.getKey(), e.getValue(), 
dataTableInstantTime, partitionIdToAllFilesMap, 
lazyLatestMergedPartitionFileSliceList);
+          } catch (IOException ex) {
+            throw new HoodieMetadataException("Failed to initialize metadata 
partition: " + e.getKey(), ex);
+          }
+          hasPartitionsStateChanged = true;

Review Comment:
   🤖 The old code used a `List` for `partitionsToInit`, which preserved 
ordering (FILES always first, then the rest in enum order). The new code uses 
`HashMap` with FILES handled separately, but remaining partitions iterate in 
non-deterministic order. Have you verified there are no implicit ordering 
dependencies between non-FILES partition initializations? For example, does 
PARTITION_STATS initialization ever depend on COLUMN_STATS being initialized 
first?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/expression/ExpressionIndexer.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata.index.expression;
+
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.metadata.index.ExpressionIndexRecordGenerator;
+import org.apache.hudi.metadata.model.FileInfo;
+import org.apache.hudi.metadata.model.FileSliceAndPartition;
+import org.apache.hudi.metadata.model.FileInfoAndPartition;
+import org.apache.hudi.metadata.index.model.IndexPartitionInitialization;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieMetadataException;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.metadata.index.BaseIndexer;
+import org.apache.hudi.util.Lazy;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getExpressionIndexPartitionsToInit;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getProjectedSchemaForExpressionIndex;
+import static org.apache.hudi.metadata.MetadataPartitionType.EXPRESSION_INDEX;
+
+/**
+ * Implementation of {@link MetadataPartitionType#EXPRESSION_INDEX} index
+ */
+@Slf4j
+public class ExpressionIndexer extends BaseIndexer {
+
+  private final ExpressionIndexRecordGenerator expressionIndexRecordGenerator;
+
+  public ExpressionIndexer(
+      HoodieEngineContext engineContext,
+      HoodieWriteConfig dataTableWriteConfig,
+      HoodieTableMetaClient dataTableMetaClient,
+      ExpressionIndexRecordGenerator expressionIndexRecordGenerator) {
+    super(engineContext, dataTableWriteConfig, dataTableMetaClient);
+
+    this.expressionIndexRecordGenerator = expressionIndexRecordGenerator;
+  }
+
+  @Override
+  public List<IndexPartitionInitialization> buildInitialization(
+      String dataTableInstantTime,
+      String instantTimeForPartition,
+      Map<String, List<FileInfo>> partitionToAllFilesMap,
+      Lazy<List<FileSliceAndPartition>> lazyPartitionFileSlices) throws 
IOException {
+    Set<String> expressionIndexPartitionsToInit = 
getExpressionIndexPartitionsToInit(
+        EXPRESSION_INDEX, dataTableWriteConfig.getMetadataConfig(), 
dataTableMetaClient);
+    if (expressionIndexPartitionsToInit.size() != 1) {
+      if (expressionIndexPartitionsToInit.size() > 1) {
+        log.warn("Skipping expression index initialization as only one 
expression index "
+            + "bootstrap at a time is supported for now. Provided: {}", 
expressionIndexPartitionsToInit);
+      }
+      return Collections.emptyList();
+    }
+
+    String indexName = expressionIndexPartitionsToInit.iterator().next();
+    HoodieIndexDefinition indexDefinition = 
HoodieTableMetadataUtil.getHoodieIndexDefinition(indexName, 
dataTableMetaClient);
+    ValidationUtils.checkState(indexDefinition != null, "Expression Index 
definition is not present for index " + indexName);
+
+    List<FileSliceAndPartition> partitionFileSlicePairs = 
lazyPartitionFileSlices.get();
+    List<FileInfoAndPartition> filesToIndex = new ArrayList<>();
+    partitionFileSlicePairs.forEach(fsp -> {
+      if (fsp.fileSlice().getBaseFile().isPresent()) {
+        filesToIndex.add(FileInfoAndPartition.of(fsp.partitionPath(), 
fsp.fileSlice().getBaseFile().get().getPath(), 
fsp.fileSlice().getBaseFile().get().getFileSize()));
+      }
+      fsp.fileSlice().getLogFiles()
+          .forEach(hoodieLogFile
+              -> filesToIndex.add(FileInfoAndPartition.of(fsp.partitionPath(), 
hoodieLogFile.getPath().toString(), hoodieLogFile.getFileSize())));
+    });
+
+    int fileGroupCount = 
dataTableWriteConfig.getMetadataConfig().getExpressionIndexFileGroupCount();
+    if (partitionFileSlicePairs.isEmpty()) {
+      return 
Collections.singletonList(IndexPartitionInitialization.of(fileGroupCount, 
indexName, engineContext.emptyHoodieData()));
+    }
+
+    int parallelism = Math.min(filesToIndex.size(), 
dataTableWriteConfig.getMetadataConfig().getExpressionIndexParallelism());

Review Comment:
   🤖 If `partitionFileSlicePairs` is non-empty but every file slice happens to 
have no base file and no log files, `filesToIndex` would be empty and 
`parallelism` becomes 0. A zero parallelism value can cause issues downstream 
(e.g., Spark's `parallelize` with 0 partitions). Could you add an emptiness 
check on `filesToIndex` similar to the one on `partitionFileSlicePairs`?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/record/BaseRecordIndexer.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata.index.record;
+
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.engine.ReaderContextFactory;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.storage.HoodieAvroFileReader;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.metadata.index.model.DataPartitionAndRecords;
+import org.apache.hudi.metadata.model.FileSliceAndPartition;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaCache;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.utils.SerDeHelper;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.metadata.index.BaseIndexer;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.HoodieStorageUtils;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.schema.HoodieSchemaUtils.getRecordKeySchema;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.RECORD_INDEX_AVERAGE_RECORD_SIZE;
+
+/**
+ * Base implementation of {@link MetadataPartitionType#RECORD_INDEX} index.
+ */
+@Slf4j
+public abstract class BaseRecordIndexer extends BaseIndexer {
+
+  protected BaseRecordIndexer(HoodieEngineContext engineContext, 
HoodieWriteConfig dataTableWriteConfig, HoodieTableMetaClient 
dataTableMetaClient) {
+    super(engineContext, dataTableWriteConfig, dataTableMetaClient);
+  }
+
+  protected DataPartitionAndRecords initializeRecordIndexPartition(
+      List<FileSliceAndPartition> latestMergedPartitionFileSliceList,
+      int recordIndexMaxParallelism) {
+    return initializeRecordIndexPartition(null, 
latestMergedPartitionFileSliceList, recordIndexMaxParallelism);
+  }
+
+  protected DataPartitionAndRecords initializeRecordIndexPartition(
+      String dataPartition,
+      List<FileSliceAndPartition> latestMergedPartitionFileSliceList,
+      int recordIndexMaxParallelism) {
+    log.info("Initializing record index from {} file slices", 
latestMergedPartitionFileSliceList.size());
+    HoodieData<HoodieRecord> records = readRecordKeysFromFileSliceSnapshot(
+        engineContext,
+        latestMergedPartitionFileSliceList,
+        recordIndexMaxParallelism,
+        this.getClass().getSimpleName(),
+        dataTableMetaClient,
+        dataTableWriteConfig);
+
+    // Initialize the file groups
+    final int fileGroupCount = estimateFileGroupCount(records);
+    log.info("Initializing record index with {} file groups.", fileGroupCount);
+    return new DataPartitionAndRecords(fileGroupCount, 
Option.ofNullable(dataPartition), records);
+  }
+
+  @Override
+  public void postInitialization(HoodieTableMetaClient metadataMetaClient, 
HoodieData<HoodieRecord> records, int fileGroupCount, String 
relativePartitionPath) {
+    super.postInitialization(metadataMetaClient, records, fileGroupCount, 
relativePartitionPath);
+    // Validate record index after commit if validation is enabled

Review Comment:
   🤖 The `postInitialization` calls `records.unpersist()` on the union records, 
but each per-partition HoodieData was individually persisted inside 
`estimateFileGroupCount` (via `records.persist("MEMORY_AND_DISK_SER")`). For 
the `PartitionedRecordIndexer` case, are these per-partition persisted datasets 
also being unpersisted somewhere, or could they leak?



##########
hudi-common/src/main/java/org/apache/hudi/metadata/model/FileAndPartitionFlag.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata.model;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.experimental.Accessors;
+
+/**
+ * Data class representing a file and partition with a flag.
+ */
+@Getter
+@AllArgsConstructor
+@Accessors(fluent = true)

Review Comment:
   🤖 `FileAndPartitionFlag` is passed to `engineContext.parallelize()` in 
`fetchPartitionFileInfo` (used by both bloom filter and column stats record 
conversion), which requires Spark serialization. The old `Tuple3` it replaces 
implements `Serializable`, but this class does not. This will cause a runtime 
`NotSerializableException` when running on Spark. Could you add `implements 
Serializable`?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/record/PartitionedRecordIndexer.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata.index.record;
+
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.metadata.index.model.DataPartitionAndRecords;
+import org.apache.hudi.metadata.model.FileInfo;
+import org.apache.hudi.metadata.model.FileSliceAndPartition;
+import org.apache.hudi.metadata.index.model.IndexPartitionInitialization;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.record.HoodieRecordIndex;
+import org.apache.hudi.metadata.BucketizedMetadataTableFileGroupIndexParser;
+import org.apache.hudi.util.Lazy;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.createRecordIndexDefinition;
+import static org.apache.hudi.metadata.MetadataPartitionType.RECORD_INDEX;
+
+/**
+ * Implementation of the partitioned {@link 
MetadataPartitionType#RECORD_INDEX} index
+ */
+@Slf4j
+public class PartitionedRecordIndexer extends BaseRecordIndexer {
+  public PartitionedRecordIndexer(HoodieEngineContext engineContext, 
HoodieWriteConfig dataTableWriteConfig,
+                                     HoodieTableMetaClient 
dataTableMetaClient) {
+    super(engineContext, dataTableWriteConfig, dataTableMetaClient);
+  }
+
+  @Override
+  public List<IndexPartitionInitialization> buildInitialization(String 
dataTableInstantTime, String instantTimeForPartition, Map<String, 
List<FileInfo>> partitionToAllFilesMap,
+                                                                
Lazy<List<FileSliceAndPartition>> lazyPartitionFileSlices) throws IOException {
+    createRecordIndexDefinition(dataTableMetaClient, 
Collections.singletonMap(HoodieRecordIndex.IS_PARTITIONED_OPTION, "true"));
+    Map<String, List<FileSliceAndPartition>> partitionFileSlicePairsMap = 
lazyPartitionFileSlices.get().stream()
+        .collect(Collectors.groupingBy(FileSliceAndPartition::partitionPath));
+    Map<String, DataPartitionAndRecords> fileGroupCountAndRecordsPairMap = new 
HashMap<>(partitionFileSlicePairsMap.size());
+    int maxParallelismPerHudiPartition = partitionFileSlicePairsMap.isEmpty()
+        ? 1 : Math.max(1, 
dataTableWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism() / 
partitionFileSlicePairsMap.size());
+    int totalFileGroupCount = 0;
+    for (Map.Entry<String, List<FileSliceAndPartition>> entry: 
partitionFileSlicePairsMap.entrySet()) {
+      String partition = entry.getKey();
+      List<FileSliceAndPartition> fileSliceAndPartitions = entry.getValue();
+      log.info("Initializing partitioned record index from data partition {}", 
partition);
+      DataPartitionAndRecords dataPartitionAndRecords = 
initializeRecordIndexPartition(partition, fileSliceAndPartitions, 
maxParallelismPerHudiPartition);
+      fileGroupCountAndRecordsPairMap.put(partition, dataPartitionAndRecords);
+      totalFileGroupCount += dataPartitionAndRecords.numFileGroups();

Review Comment:
   🤖 When `partitionFileSlicePairsMap` is empty (e.g., empty table), 
`totalFileGroupCount` will be 0 and `initializationList` will be empty, but the 
code still returns an `IndexPartitionInitialization` with 0 file groups. Could 
this cause issues downstream? The non-partitioned `RecordIndexer` path goes 
through `estimateFileGroupCount` which guarantees at least 1 file group via 
`Math.max(1, ...)`. Is an empty-table scenario expected to be gated earlier in 
the call chain?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/index/record/BaseRecordIndexer.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata.index.record;
+
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.engine.ReaderContextFactory;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.storage.HoodieAvroFileReader;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.metadata.index.model.DataPartitionAndRecords;
+import org.apache.hudi.metadata.model.FileSliceAndPartition;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaCache;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.utils.SerDeHelper;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.metadata.index.BaseIndexer;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.HoodieStorageUtils;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.schema.HoodieSchemaUtils.getRecordKeySchema;
+
+/**
+ * Base implementation for record-index.
+ */
+@Slf4j
+public abstract class BaseRecordIndexer extends BaseIndexer {
+
+  private static final int RECORD_INDEX_AVERAGE_RECORD_SIZE = 48;
+
+  protected BaseRecordIndexer(HoodieEngineContext engineContext, 
HoodieWriteConfig dataTableWriteConfig, HoodieTableMetaClient 
dataTableMetaClient) {
+    super(engineContext, dataTableWriteConfig, dataTableMetaClient);
+  }
+
+  protected DataPartitionAndRecords initializeRecordIndexPartition(
+      List<FileSliceAndPartition> latestMergedPartitionFileSliceList,
+      int recordIndexMaxParallelism) {
+    return initializeRecordIndexPartition(null, 
latestMergedPartitionFileSliceList, recordIndexMaxParallelism);
+  }
+
+  protected DataPartitionAndRecords initializeRecordIndexPartition(
+      String dataPartition,
+      List<FileSliceAndPartition> latestMergedPartitionFileSliceList,
+      int recordIndexMaxParallelism) {
+    log.info("Initializing record index from {} file slices", 
latestMergedPartitionFileSliceList.size());
+    HoodieData<HoodieRecord> records = readRecordKeysFromFileSliceSnapshot(
+        engineContext,
+        latestMergedPartitionFileSliceList,
+        recordIndexMaxParallelism,
+        this.getClass().getSimpleName(),
+        dataTableMetaClient,
+        dataTableWriteConfig);
+
+    // Initialize the file groups
+    final int fileGroupCount = estimateFileGroupCount(records);
+    log.info("Initializing record index with {} file groups.", fileGroupCount);
+    return new DataPartitionAndRecords(fileGroupCount, 
Option.ofNullable(dataPartition), records);
+  }
+
+  @Override
+  public void postInitialization(HoodieTableMetaClient metadataMetaClient, 
HoodieData<HoodieRecord> records, int fileGroupCount, String 
relativePartitionPath) {
+    super.postInitialization(metadataMetaClient, records, fileGroupCount, 
relativePartitionPath);
+    // Validate record index after commit if validation is enabled

Review Comment:
   🤖 Worth noting that since `records` passed to `postInitialization` is the 
*union* of all per-partition records (built in `initializeMetadataPartition`), 
calling `records.unpersist()` on a Spark union RDD does not automatically 
unpersist the parent RDDs that were individually persisted inside 
`estimateFileGroupCount`. For non-partitioned RLI this is fine (single source), 
but for partitioned RLI, the individual per-partition records that were 
persisted may leak. The old code had the same pattern though, so it's not a 
regression.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to