yihua commented on code in PR #18421:
URL: https://github.com/apache/hudi/pull/18421#discussion_r3066934989


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java:
##########
@@ -311,4 +316,113 @@ protected boolean 
isStreamingWriteToMetadataEnabled(HoodieTable table) {
     return config.isMetadataTableEnabled()
         && 
config.isMetadataStreamingWritesEnabled(table.getMetaClient().getTableConfig().getTableVersion());
   }
+
+  /**
+   * Merges rolling metadata from recent completed commits into the current 
commit metadata.
+   * This method MUST be called within the transaction lock after conflict 
resolution.
+   *
+   * <p>Rolling metadata keys configured via {@link 
HoodieWriteConfig#ROLLING_METADATA_KEYS} will be
+   * automatically carried forward from recent commits. The system walks back 
up to
+   * {@link HoodieWriteConfig#ROLLING_METADATA_TIMELINE_LOOKBACK_COMMITS} 
commits to find the most
+   * recent value for each key. This ensures that important metadata like 
checkpoint information
+   * remains accessible without worrying about archival or missing keys in 
individual commits.
+   *
+   * @param table HoodieTable instance (may have refreshed timeline after 
conflict resolution)
+   * @param metadata Current commit metadata to be augmented with rolling 
metadata
+   */
+  protected void mergeRollingMetadata(HoodieTable table, HoodieCommitMetadata 
metadata) {
+    // Skip for metadata table - rolling metadata is only for data tables
+    if (table.isMetadataTable()) {
+      return;
+    }
+
+    Set<String> rollingKeys = config.getRollingMetadataKeys();
+    if (rollingKeys.isEmpty()) {
+      return;  // No rolling metadata configured
+    }
+
+    // IMPORTANT: We're inside the lock here. The timeline in 'table' is 
either:
+    // 1. Fresh from createTable() if no conflict resolution happened
+    // 2. Reloaded during resolveWriteConflict() if conflicts were checked
+    // In both cases, we have the latest view of the timeline.
+
+    HoodieTimeline commitsTimeline = 
table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+
+    if (commitsTimeline.empty()) {
+      log.info("No previous commits found. Rolling metadata will start with 
current commit.");
+      return;  // First commit - nothing to roll forward
+    }
+
+    try {
+      Map<String, String> existingExtraMetadata = metadata.getExtraMetadata();
+      Map<String, String> foundRollingMetadata = new HashMap<>();
+      Set<String> remainingKeys = new HashSet<>(rollingKeys);
+
+      // Remove keys that are already present in current commit (current 
values take precedence)
+      for (String key : rollingKeys) {
+        if (existingExtraMetadata.containsKey(key)) {
+          remainingKeys.remove(key);
+        }
+      }
+
+      if (remainingKeys.isEmpty()) {
+        log.debug("All rolling metadata keys are present in current commit. No 
walkback needed.");
+        return;
+      }
+
+      int lookbackLimit = config.getRollingMetadataTimelineLookbackCommits();
+      int commitsWalkedBack = 0;
+
+      // Walk back through the timeline in reverse order (most recent first) 
to find values for all remaining keys
+      List<HoodieInstant> recentCommits = 
commitsTimeline.getReverseOrderedInstantsByCompletionTime()
+          .limit(lookbackLimit)
+          .collect(Collectors.toList());
+
+      log.debug("Walking back up to {} commits to find rolling metadata for 
keys: {}",
+          lookbackLimit, remainingKeys);
+
+      for (HoodieInstant instant : recentCommits) {
+        if (remainingKeys.isEmpty()) {
+          break;  // Found all keys
+        }
+
+        commitsWalkedBack++;
+        HoodieCommitMetadata commitMetadata = 
table.getMetaClient().getActiveTimeline().readInstantContent(instant, 
HoodieCommitMetadata.class);
+
+        // Check for remaining keys in this commit
+        for (String key : new HashSet<>(remainingKeys)) {
+          String value = commitMetadata.getMetadata(key);
+          if (value != null) {
+            foundRollingMetadata.put(key, value);
+            remainingKeys.remove(key);
+            log.debug("Found rolling metadata key '{}' in commit {} with 
value: {}",
+                key, instant.requestedTime(), value);
+          }
+        }
+      }
+
+      // Add found rolling metadata to current commit
+      for (Map.Entry<String, String> entry : foundRollingMetadata.entrySet()) {
+        metadata.addMetadata(entry.getKey(), entry.getValue());
+      }
+
+      int rolledForwardCount = foundRollingMetadata.size();
+      int updatedCount = rollingKeys.size() - remainingKeys.size() - 
rolledForwardCount;
+
+      if (rolledForwardCount > 0 || updatedCount > 0 || 
!remainingKeys.isEmpty()) {
+        log.info("Rolling metadata merge completed. Walked back {} commits. "
+                + "Rolled forward: {}, Updated in current: {}, Not found: {}, 
Total rolling keys: {}",
+            commitsWalkedBack, rolledForwardCount, updatedCount, 
remainingKeys.size(), rollingKeys.size());
+      }
+
+      if (!remainingKeys.isEmpty()) {
+        log.warn("Rolling metadata keys not found in last {} commits: {}. "
+            + "These keys will not be included in the current commit.", 
lookbackLimit, remainingKeys);
+      }
+
+    } catch (IOException e) {
+      log.error("Failed to read previous commit metadata for rolling metadata 
keys: {}.", rollingKeys, e);
+      throw new HoodieIOException("Failed to read previous commit metadata for 
rolling metadata keys: " + rollingKeys, e);

Review Comment:
   🤖 This change flips from catch-and-log to throwing `HoodieIOException`, but 
@nsivabalan's earlier comment on this same block said: "Rolling metadata is an 
optional feature. why would we fail the writes if we hit issues in reading the 
prev commit metadata." Was there an offline agreement to change this to a hard 
failure? If not, the previous catch-and-log behavior seems safer — a transient 
read failure on prev commit metadata shouldn't take down an otherwise healthy 
write.
   
   <sub><i>- Generated by an AI agent and may contain mistakes. Please verify 
any suggestions before applying.</i></sub>



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -394,6 +398,7 @@ protected void completeCompaction(HoodieCommitMetadata 
metadata, HoodieTable tab
       final HoodieInstant compactionInstant = 
instantGenerator.getCompactionInflightInstant(compactionCommitTime);
       try {
         this.txnManager.beginStateChange(Option.of(compactionInstant), 
Option.empty());
+        preCommit(metadata, false);

Review Comment:
   🤖 Could you confirm this compiles as-is? The existing `preCommit` in 
`BaseHoodieTableServiceClient` only takes `(HoodieCommitMetadata)`. I assume 
the two-arg overload `preCommit(HoodieCommitMetadata, boolean)` was introduced 
in earlier commits of this PR — just want to make sure the boolean `false` 
correctly skips conflict resolution while still running the rolling metadata 
merge.
   
   <sub><i>- Generated by an AI agent and may contain mistakes. Please verify 
any suggestions before applying.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to