yihua commented on code in PR #18421:
URL: https://github.com/apache/hudi/pull/18421#discussion_r3048271980


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -172,11 +172,15 @@ protected void setTableServiceTimer(WriteOperationType 
operationType) {
    *
    * @param metadata commit metadata for which pre commit is being invoked.
    */
-  protected void preCommit(HoodieCommitMetadata metadata) {
+  protected void preCommit(HoodieCommitMetadata metadata, boolean 
executeConflictResolution) {
     // Create a Hoodie table after startTxn which encapsulated the commits and 
files visible.
     // Important to create this after the lock to ensure the latest commits 
show up in the timeline without need for reload
     HoodieTable table = createTable(config, storageConf);
-    resolveWriteConflict(table, metadata, 
this.pendingInflightAndRequestedInstants);
+    if (executeConflictResolution) {
+      resolveWriteConflict(table, metadata, 
this.pendingInflightAndRequestedInstants);
+    }
+    // Merge rolling metadata after conflict resolution, still within the lock
+    mergeRollingMetadata(table, metadata);
   }

Review Comment:
   _โš ๏ธ Potential issue_ | _๐ŸŸ  Major_
   
   **Compaction still bypasses the new rolling-metadata hook.**
   
   This wires `mergeRollingMetadata(...)` into log compaction and clustering 
via `preCommit(...)`, but `completeCompaction(...)` still commits without it. 
With `hoodie.write.rolling.metadata.timeline.lookback.commits=1`, a compaction 
instant becomes the newest completed commit without the carried keys, so the 
next write can drop metadata unexpectedly.
   
   
   
   <details>
   <summary>๐Ÿ’ก Minimal fix</summary>
   
   ```diff
    protected void completeCompaction(HoodieCommitMetadata metadata, 
HoodieTable table, String compactionCommitTime, List<HoodieWriteStat> 
partialMetadataWriteStats) {
      try {
        this.context.setJobStatus(this.getClass().getSimpleName(), "Collect 
compaction write status and commit compaction: " + config.getTableName());
        List<HoodieWriteStat> writeStats = metadata.getWriteStats();
        handleWriteErrors(writeStats, TableServiceType.COMPACT);
        InstantGenerator instantGenerator = 
table.getMetaClient().getInstantGenerator();
        final HoodieInstant compactionInstant = 
instantGenerator.getCompactionInflightInstant(compactionCommitTime);
        try {
          this.txnManager.beginStateChange(Option.of(compactionInstant), 
Option.empty());
          finalizeWrite(table, compactionCommitTime, writeStats);
   +      mergeRollingMetadata(table, metadata);
          // commit to data table after committing to metadata table.
          writeToMetadataTable(table, compactionCommitTime, metadata, 
partialMetadataWriteStats);
          log.info("Committing Compaction {}", compactionCommitTime);
          CompactHelpers.getInstance().completeInflightCompaction(table, 
compactionCommitTime, metadata);
   ```
   </details>
   
   
   Also applies to: 472-472, 607-608
   
   <details>
   <summary>๐Ÿค– Prompt for AI Agents</summary>
   
   ```
   Verify each finding against the current code and only fix it if needed.
   
   In
   
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java`
   around lines 175 - 184, Compaction (and other commit paths like
   completeCompaction and the commit paths around lines ~472 and ~607) bypass 
the
   new rolling-metadata merge, causing carried keys to be lost; fix by invoking
   mergeRollingMetadata(...) wherever commits are finalized just like in 
preCommit:
   after creating the HoodieTable (e.g., via createTable(config, storageConf)) 
and
   before the actual commit/commitWrite, construct or obtain the
   HoodieCommitMetadata for that operation and call mergeRollingMetadata(table,
   metadata) (or mergeRollingMetadata(table, compactionMetadata)) inside
   completeCompaction and the other commit-completing methods so rolling 
metadata
   is merged under the same lock/precedence as preCommit.
   ```
   
   </details>
   
   <!-- 
fingerprinting:phantom:medusa:grasshopper:15c30537-b1db-44ce-8f21-20ab6360bab0 
-->
   
   <!-- This is an auto-generated comment by CodeRabbit -->
   
   โ€” *CodeRabbit* 
([original](https://github.com/yihua/hudi/pull/25#discussion_r3048270708)) 
(source:comment#3048270708)



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -3524,6 +3562,16 @@ public Builder withReleaseResourceEnabled(boolean 
enabled) {
       return this;
     }
 
+    public Builder withRollingMetadataKeys(String keys) {
+      writeConfig.setValue(ROLLING_METADATA_KEYS, keys);
+      return this;
+    }
+
+    public Builder withRollingMetadataTimelineLookbackCommits(int 
lookbackCommits) {
+      writeConfig.setValue(ROLLING_METADATA_TIMELINE_LOOKBACK_COMMITS, 
String.valueOf(lookbackCommits));
+      return this;
+    }

Review Comment:
   _โš ๏ธ Potential issue_ | _๐ŸŸ  Major_
   
   **Validate lookback commits as non-negative before accepting config.**
   
   Line 3570 currently accepts any integer. Negative lookback values are 
invalid for timeline traversal semantics and can cause downstream runtime 
issues during commit metadata merge.
   
   
   <details>
   <summary>๐Ÿ”ง Proposed fix</summary>
   
   ```diff
    public Builder withRollingMetadataTimelineLookbackCommits(int 
lookbackCommits) {
   +  checkArgument(lookbackCommits >= 0,
   +      "hoodie.write.rolling.metadata.timeline.lookback.commits must be >= 
0");
      writeConfig.setValue(ROLLING_METADATA_TIMELINE_LOOKBACK_COMMITS, 
String.valueOf(lookbackCommits));
      return this;
    }
   ```
   </details>
   
   Also add a defensive check in `Builder.validate()` so invalid values from 
raw property files are rejected during `build()` as well.
   
   <details>
   <summary>๐Ÿค– Prompt for AI Agents</summary>
   
   ```
   Verify each finding against the current code and only fix it if needed.
   
   In
   
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java`
   around lines 3570 - 3573, Add validation to reject negative lookback values: 
in
   Builder.withRollingMetadataTimelineLookbackCommits(int lookbackCommits) check
   lookbackCommits >= 0 and throw an IllegalArgumentException if negative 
instead
   of setting the property; keep using 
ROLLING_METADATA_TIMELINE_LOOKBACK_COMMITS
   as the config key. Also add a defensive check inside Builder.validate() to 
read
   the raw property ROLLING_METADATA_TIMELINE_LOOKBACK_COMMITS (parseInt the 
stored
   string), verify it is >= 0, and throw a clear
   ConfigValidationException/IllegalArgumentException during build() if the 
parsed
   value is invalid so values from property files are rejected early.
   ```
   
   </details>
   
   <!-- 
fingerprinting:phantom:poseidon:hawk:eb49d22d-5850-456a-af4c-9c65ea289870 -->
   
   <!-- This is an auto-generated comment by CodeRabbit -->
   
   โ€” *CodeRabbit* 
([original](https://github.com/yihua/hudi/pull/25#discussion_r3048270713)) 
(source:comment#3048270713)



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java:
##########
@@ -311,4 +316,116 @@ protected boolean 
isStreamingWriteToMetadataEnabled(HoodieTable table) {
     return config.isMetadataTableEnabled()
         && 
config.isMetadataStreamingWritesEnabled(table.getMetaClient().getTableConfig().getTableVersion());
   }
+
+  /**
+   * Merges rolling metadata from recent completed commits into the current 
commit metadata.
+   * This method MUST be called within the transaction lock after conflict 
resolution.
+   *
+   * <p>Rolling metadata keys configured via {@link 
HoodieWriteConfig#ROLLING_METADATA_KEYS} will be
+   * automatically carried forward from recent commits. The system walks back 
up to
+   * {@link HoodieWriteConfig#ROLLING_METADATA_TIMELINE_LOOKBACK_COMMITS} 
commits to find the most
+   * recent value for each key. This ensures that important metadata like 
checkpoint information
+   * remains accessible without worrying about archival or missing keys in 
individual commits.
+   *
+   * @param table HoodieTable instance (may have refreshed timeline after 
conflict resolution)
+   * @param metadata Current commit metadata to be augmented with rolling 
metadata
+   */
+  protected void mergeRollingMetadata(HoodieTable table, HoodieCommitMetadata 
metadata) {
+    // Skip for metadata table - rolling metadata is only for data tables
+    if (table.isMetadataTable()) {
+      return;
+    }
+
+    Set<String> rollingKeys = config.getRollingMetadataKeys();
+    if (rollingKeys.isEmpty()) {
+      return;  // No rolling metadata configured
+    }
+
+    // IMPORTANT: We're inside the lock here. The timeline in 'table' is 
either:
+    // 1. Fresh from createTable() if no conflict resolution happened
+    // 2. Reloaded during resolveWriteConflict() if conflicts were checked
+    // In both cases, we have the latest view of the timeline.
+
+    HoodieTimeline commitsTimeline = 
table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+
+    if (commitsTimeline.empty()) {
+      log.info("No previous commits found. Rolling metadata will start with 
current commit.");
+      return;  // First commit - nothing to roll forward
+    }
+
+    try {
+      Map<String, String> existingExtraMetadata = metadata.getExtraMetadata();
+      Map<String, String> foundRollingMetadata = new HashMap<>();
+      Set<String> remainingKeys = new HashSet<>(rollingKeys);
+
+      // Remove keys that are already present in current commit (current 
values take precedence)
+      for (String key : rollingKeys) {
+        if (existingExtraMetadata.containsKey(key)) {
+          remainingKeys.remove(key);
+        }
+      }
+
+      if (remainingKeys.isEmpty()) {
+        log.debug("All rolling metadata keys are present in current commit. No 
walkback needed.");
+        return;
+      }
+
+      int lookbackLimit = config.getRollingMetadataTimelineLookbackCommits();
+      int commitsWalkedBack = 0;
+
+      // Walk back through the timeline in reverse order (most recent first) 
to find values for all remaining keys
+      List<HoodieInstant> recentCommits = 
commitsTimeline.getReverseOrderedInstantsByCompletionTime()
+          .limit(lookbackLimit)
+          .collect(Collectors.toList());
+
+      log.debug("Walking back up to {} commits to find rolling metadata for 
keys: {}",
+          lookbackLimit, remainingKeys);
+
+      for (HoodieInstant instant : recentCommits) {
+        if (remainingKeys.isEmpty()) {
+          break;  // Found all keys
+        }
+
+        commitsWalkedBack++;
+        HoodieCommitMetadata commitMetadata = 
table.getMetaClient().getActiveTimeline().readInstantContent(instant, 
HoodieCommitMetadata.class);
+
+        // Check for remaining keys in this commit
+        for (String key : new HashSet<>(remainingKeys)) {
+          String value = commitMetadata.getMetadata(key);
+          if (value != null) {
+            foundRollingMetadata.put(key, value);
+            remainingKeys.remove(key);
+            log.debug("Found rolling metadata key '{}' in commit {} with 
value: {}",
+                key, instant.requestedTime(), value);

Review Comment:
   _โš ๏ธ Potential issue_ | _๐ŸŸ  Major_
   
   **Avoid logging raw rolling-metadata values.**
   
   `value` is arbitrary extra metadata from user commits. Emitting it verbatim 
can leak checkpoints/secrets and can also bloat debug logs for large payloads. 
Log the key and instant only, or a redacted/hash form.
   
   
   
   <details>
   <summary>๐Ÿ”’ Minimal redaction</summary>
   
   ```diff
   -            log.debug("Found rolling metadata key '{}' in commit {} with 
value: {}",
   -                key, instant.requestedTime(), value);
   +            log.debug("Found rolling metadata key '{}' in commit {}",
   +                key, instant.requestedTime());
   ```
   </details>
   
   <!-- suggestion_start -->
   
   <details>
   <summary>๐Ÿ“ Committable suggestion</summary>
   
   > โ€ผ๏ธ **IMPORTANT**
   > Carefully review the code before committing. Ensure that it accurately 
replaces the highlighted code, contains no missing lines, and has no issues 
with indentation. Thoroughly test & benchmark the code to ensure it meets the 
requirements.
   
   ```suggestion
               log.debug("Found rolling metadata key '{}' in commit {}",
                   key, instant.requestedTime());
   ```
   
   </details>
   
   <!-- suggestion_end -->
   
   <details>
   <summary>๐Ÿค– Prompt for AI Agents</summary>
   
   ```
   Verify each finding against the current code and only fix it if needed.
   
   In
   
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java`
   around lines 398 - 399, The debug log in BaseHoodieClient currently emits the
   raw rolling-metadata value (log.debug in the method around the commit 
handling),
   which can leak sensitive or large payloads; change the log to omit the raw 
value
   and instead log only the metadata key and instant.requestedTime(), or log a
   redacted representation (e.g., a short hash or fixed placeholder) of the 
value
   before passing to log.debug. Update the call site that uses "value" in the 
log
   message so it no longer prints the raw value but uses the key and
   instant.requestedTime() and, if desired, a computed short hash/redaction 
string
   for value.
   ```
   
   </details>
   
   <!-- 
fingerprinting:phantom:medusa:grasshopper:15c30537-b1db-44ce-8f21-20ab6360bab0 
-->
   
   <!-- This is an auto-generated comment by CodeRabbit -->
   
   โ€” *CodeRabbit* 
([original](https://github.com/yihua/hudi/pull/25#discussion_r3048270702)) 
(source:comment#3048270702)



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java:
##########
@@ -311,4 +316,116 @@ protected boolean 
isStreamingWriteToMetadataEnabled(HoodieTable table) {
     return config.isMetadataTableEnabled()
         && 
config.isMetadataStreamingWritesEnabled(table.getMetaClient().getTableConfig().getTableVersion());
   }
+
+  /**
+   * Merges rolling metadata from recent completed commits into the current 
commit metadata.
+   * This method MUST be called within the transaction lock after conflict 
resolution.
+   *
+   * <p>Rolling metadata keys configured via {@link 
HoodieWriteConfig#ROLLING_METADATA_KEYS} will be
+   * automatically carried forward from recent commits. The system walks back 
up to
+   * {@link HoodieWriteConfig#ROLLING_METADATA_TIMELINE_LOOKBACK_COMMITS} 
commits to find the most
+   * recent value for each key. This ensures that important metadata like 
checkpoint information
+   * remains accessible without worrying about archival or missing keys in 
individual commits.
+   *
+   * @param table HoodieTable instance (may have refreshed timeline after 
conflict resolution)
+   * @param metadata Current commit metadata to be augmented with rolling 
metadata
+   */
+  protected void mergeRollingMetadata(HoodieTable table, HoodieCommitMetadata 
metadata) {
+    // Skip for metadata table - rolling metadata is only for data tables
+    if (table.isMetadataTable()) {
+      return;
+    }
+
+    Set<String> rollingKeys = config.getRollingMetadataKeys();
+    if (rollingKeys.isEmpty()) {
+      return;  // No rolling metadata configured
+    }
+
+    // IMPORTANT: We're inside the lock here. The timeline in 'table' is 
either:
+    // 1. Fresh from createTable() if no conflict resolution happened
+    // 2. Reloaded during resolveWriteConflict() if conflicts were checked
+    // In both cases, we have the latest view of the timeline.
+
+    HoodieTimeline commitsTimeline = 
table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+
+    if (commitsTimeline.empty()) {
+      log.info("No previous commits found. Rolling metadata will start with 
current commit.");
+      return;  // First commit - nothing to roll forward
+    }
+
+    try {
+      Map<String, String> existingExtraMetadata = metadata.getExtraMetadata();
+      Map<String, String> foundRollingMetadata = new HashMap<>();
+      Set<String> remainingKeys = new HashSet<>(rollingKeys);
+
+      // Remove keys that are already present in current commit (current 
values take precedence)
+      for (String key : rollingKeys) {
+        if (existingExtraMetadata.containsKey(key)) {
+          remainingKeys.remove(key);
+        }
+      }
+
+      if (remainingKeys.isEmpty()) {
+        log.debug("All rolling metadata keys are present in current commit. No 
walkback needed.");
+        return;
+      }
+
+      int lookbackLimit = config.getRollingMetadataTimelineLookbackCommits();
+      int commitsWalkedBack = 0;
+
+      // Walk back through the timeline in reverse order (most recent first) 
to find values for all remaining keys
+      List<HoodieInstant> recentCommits = 
commitsTimeline.getReverseOrderedInstantsByCompletionTime()
+          .limit(lookbackLimit)
+          .collect(Collectors.toList());
+
+      log.debug("Walking back up to {} commits to find rolling metadata for 
keys: {}",
+          lookbackLimit, remainingKeys);
+
+      for (HoodieInstant instant : recentCommits) {
+        if (remainingKeys.isEmpty()) {
+          break;  // Found all keys
+        }
+
+        commitsWalkedBack++;
+        HoodieCommitMetadata commitMetadata = 
table.getMetaClient().getActiveTimeline().readInstantContent(instant, 
HoodieCommitMetadata.class);
+
+        // Check for remaining keys in this commit
+        for (String key : new HashSet<>(remainingKeys)) {
+          String value = commitMetadata.getMetadata(key);
+          if (value != null) {
+            foundRollingMetadata.put(key, value);
+            remainingKeys.remove(key);
+            log.debug("Found rolling metadata key '{}' in commit {} with 
value: {}",
+                key, instant.requestedTime(), value);
+          }
+        }
+      }
+
+      // Add found rolling metadata to current commit
+      int rolledForwardCount = 0;
+      for (Map.Entry<String, String> entry : foundRollingMetadata.entrySet()) {
+        metadata.addMetadata(entry.getKey(), entry.getValue());
+        rolledForwardCount++;
+      }
+
+      int updatedCount = rollingKeys.size() - remainingKeys.size() - 
rolledForwardCount;
+
+      if (rolledForwardCount > 0 || updatedCount > 0 || 
!remainingKeys.isEmpty()) {
+        log.info("Rolling metadata merge completed. Walked back {} commits. "
+                + "Rolled forward: {}, Updated in current: {}, Not found: {}, 
Total rolling keys: {}",
+            commitsWalkedBack, rolledForwardCount, updatedCount, 
remainingKeys.size(), rollingKeys.size());

Review Comment:
   <a href="#"><img alt="P2" 
src="https://greptile-static-assets.s3.amazonaws.com/badges/p2.svg?v=7"; 
align="top"></a> **`rolledForwardCount` can be simplified to 
`foundRollingMetadata.size()`**
   
   `rolledForwardCount` is incremented once per entry inside a `for-each` over 
`foundRollingMetadata`, making it always equal to 
`foundRollingMetadata.size()`. The manual counter can be removed:
   
   ```suggestion
         int rolledForwardCount = foundRollingMetadata.size();
   
         int updatedCount = rollingKeys.size() - remainingKeys.size() - 
rolledForwardCount;
   
         if (rolledForwardCount > 0 || updatedCount > 0 || 
!remainingKeys.isEmpty()) {
           log.info("Rolling metadata merge completed. Walked back {} commits. "
                   + "Rolled forward: {}, Updated in current: {}, Not found: 
{}, Total rolling keys: {}",
               commitsWalkedBack, rolledForwardCount, updatedCount, 
remainingKeys.size(), rollingKeys.size());
         }
   ```
   
   โ€” *Greptile* 
([original](https://github.com/yihua/hudi/pull/25#discussion_r3048268992)) 
(source:comment#3048268992)



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestRollingMetadata.java:
##########
@@ -0,0 +1,577 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for rolling metadata functionality.
+ */
+class TestRollingMetadata extends SparkClientFunctionalTestHarness {
+
+  /**
+   * Test that rolling metadata keys are carried forward to subsequent commits.
+   */
+  @Test
+  public void testRollingMetadataCarriedForward() throws IOException {
+    // Given: A table with rolling metadata keys configured
+    HoodieTableMetaClient metaClient = getHoodieMetaClient(storageConf(), 
URI.create(basePath()).getPath(), new Properties());
+    HoodieWriteConfig config = getConfigBuilder(true)
+        .withPath(metaClient.getBasePath())
+        .withRollingMetadataKeys("checkpoint.offset,checkpoint.partition")
+        .build();
+
+    HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
+    SparkRDDWriteClient client = getHoodieWriteClient(config);
+
+    // When: First commit with rolling metadata keys
+    String instant1 = client.createNewInstantTime(false);
+    List<HoodieRecord> records1 = dataGen.generateInserts(instant1, 10);
+    JavaRDD<HoodieRecord> writeRecords1 = jsc().parallelize(records1, 2);
+
+    WriteClientTestUtils.startCommitWithTime(client, instant1);
+    List<WriteStatus> writeStatuses1 = client.insert(writeRecords1, 
instant1).collect();
+    assertNoWriteErrors(writeStatuses1);
+
+    // Add rolling metadata to first commit
+    Map<String, String> extraMetadata1 = new HashMap<>();
+    extraMetadata1.put("checkpoint.offset", "1000");
+    extraMetadata1.put("checkpoint.partition", "partition-0");
+    client.commitStats(instant1, 
writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+        Option.of(extraMetadata1), metaClient.getCommitActionType());
+
+    // Then: Verify first commit has the rolling metadata
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieInstant commit1 = 
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get();
+    HoodieCommitMetadata metadata1 = TimelineUtils.getCommitMetadata(commit1, 
metaClient.getActiveTimeline());
+    assertEquals("1000", metadata1.getMetadata("checkpoint.offset"));
+    assertEquals("partition-0", metadata1.getMetadata("checkpoint.partition"));
+
+    // When: Second commit updates one rolling metadata key
+    String instant2 = client.createNewInstantTime(false);
+    List<HoodieRecord> records2 = dataGen.generateInserts(instant2, 10);
+    JavaRDD<HoodieRecord> writeRecords2 = jsc().parallelize(records2, 2);
+
+    WriteClientTestUtils.startCommitWithTime(client, instant2);
+    List<WriteStatus> writeStatuses2 = client.insert(writeRecords2, 
instant2).collect();
+    assertNoWriteErrors(writeStatuses2);
+
+    // Only update checkpoint.offset, not checkpoint.partition
+    Map<String, String> extraMetadata2 = new HashMap<>();
+    extraMetadata2.put("checkpoint.offset", "2000");
+    client.commitStats(instant2, 
writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+        Option.of(extraMetadata2), metaClient.getCommitActionType());
+
+    // Then: Verify second commit has both rolling metadata keys (one carried 
forward, one updated)
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieInstant commit2 = 
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get();
+    HoodieCommitMetadata metadata2 = TimelineUtils.getCommitMetadata(commit2, 
metaClient.getActiveTimeline());
+    assertEquals("2000", metadata2.getMetadata("checkpoint.offset")); // 
Updated value
+    assertEquals("partition-0", 
metadata2.getMetadata("checkpoint.partition")); // Carried forward from commit1
+
+    // When: Third commit with no rolling metadata keys
+    String instant3 = client.createNewInstantTime(false);
+    List<HoodieRecord> records3 = dataGen.generateInserts(instant3, 10);
+    JavaRDD<HoodieRecord> writeRecords3 = jsc().parallelize(records3, 2);
+
+    client.close();
+    client = getHoodieWriteClient(config);
+
+    WriteClientTestUtils.startCommitWithTime(client, instant3);
+    List<WriteStatus> writeStatuses3 = client.insert(writeRecords3, 
instant3).collect();
+    assertNoWriteErrors(writeStatuses3);
+    client.commitStats(instant3, 
writeStatuses3.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+        Option.empty(), metaClient.getCommitActionType());
+
+    // Then: Verify third commit has all rolling metadata keys carried forward
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieInstant commit3 = 
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get();
+    HoodieCommitMetadata metadata3 = TimelineUtils.getCommitMetadata(commit3, 
metaClient.getActiveTimeline());
+    assertEquals("2000", metadata3.getMetadata("checkpoint.offset")); // 
Carried forward from commit2
+    assertEquals("partition-0", 
metadata3.getMetadata("checkpoint.partition")); // Carried forward from commit1
+
+    client.close();
+  }
+
+  /**
+   * Test that rolling metadata works with no configured keys (default 
behavior).
+   */
+  @Test
+  public void testRollingMetadataWithNoConfiguredKeys() throws IOException {
+    // Given: A table with no rolling metadata keys configured
+    HoodieTableMetaClient metaClient = getHoodieMetaClient(storageConf(), 
URI.create(basePath()).getPath(), new Properties());
+    HoodieWriteConfig config = getConfigBuilder(true)
+        .withPath(metaClient.getBasePath())
+        .build();
+
+    HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
+    SparkRDDWriteClient client = getHoodieWriteClient(config);
+
+    // When: Commit with extra metadata
+    String instant1 = client.createNewInstantTime(false);
+    List<HoodieRecord> records1 = dataGen.generateInserts(instant1, 10);
+    JavaRDD<HoodieRecord> writeRecords1 = jsc().parallelize(records1, 2);
+
+    WriteClientTestUtils.startCommitWithTime(client, instant1);
+    List<WriteStatus> writeStatuses1 = client.insert(writeRecords1, 
instant1).collect();
+    assertNoWriteErrors(writeStatuses1);
+
+    Map<String, String> extraMetadata1 = new HashMap<>();
+    extraMetadata1.put("some.key", "value1");
+    client.commitStats(instant1, 
writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+        Option.of(extraMetadata1), metaClient.getCommitActionType());
+
+    // Then: Verify metadata exists in first commit
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieInstant commit1 = 
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get();
+    HoodieCommitMetadata metadata1 = TimelineUtils.getCommitMetadata(commit1, 
metaClient.getActiveTimeline());
+    assertEquals("value1", metadata1.getMetadata("some.key"));
+
+    client.close();
+    client = getHoodieWriteClient(config);
+    // When: Second commit without the key
+    String instant2 = client.createNewInstantTime(false);
+    List<HoodieRecord> records2 = dataGen.generateInserts(instant2, 10);
+    JavaRDD<HoodieRecord> writeRecords2 = jsc().parallelize(records2, 2);
+
+    WriteClientTestUtils.startCommitWithTime(client, instant2);
+    List<WriteStatus> writeStatuses2 = client.insert(writeRecords2, 
instant2).collect();
+    assertNoWriteErrors(writeStatuses2);
+    client.commitStats(instant2, 
writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+        Option.empty(), metaClient.getCommitActionType());
+
+    // Then: Verify key is NOT carried forward (no rolling metadata configured)
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieInstant commit2 = 
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get();
+    HoodieCommitMetadata metadata2 = TimelineUtils.getCommitMetadata(commit2, 
metaClient.getActiveTimeline());
+    assertNull(metadata2.getMetadata("some.key"));
+
+    client.close();
+  }
+
+  /**
+   * Test that timeline walkback finds rolling metadata keys from older 
commits.
+   */
+  @Test
+  public void testRollingMetadataWithTimelineWalkback() throws IOException {
+    // Given: A table with rolling metadata keys and walkback configured
+    HoodieTableMetaClient metaClient = getHoodieMetaClient(storageConf(), 
URI.create(basePath()).getPath(), new Properties());
+    HoodieWriteConfig config = getConfigBuilder(true)
+        .withPath(metaClient.getBasePath())
+        .withRollingMetadataKeys("key1,key2,key3")
+        .withRollingMetadataTimelineLookbackCommits(5)
+        .build();
+
+    HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
+    SparkRDDWriteClient client = getHoodieWriteClient(config);
+
+    // When: First commit with key1 and key2
+    String instant1 = client.createNewInstantTime(false);
+    List<HoodieRecord> records1 = dataGen.generateInserts(instant1, 10);
+    JavaRDD<HoodieRecord> writeRecords1 = jsc().parallelize(records1, 2);
+
+    WriteClientTestUtils.startCommitWithTime(client, instant1);
+    List<WriteStatus> writeStatuses1 = client.insert(writeRecords1, 
instant1).collect();
+    assertNoWriteErrors(writeStatuses1);
+
+    Map<String, String> extraMetadata1 = new HashMap<>();
+    extraMetadata1.put("key1", "value1");
+    extraMetadata1.put("key2", "value2");
+    client.commitStats(instant1, 
writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+        Option.of(extraMetadata1), metaClient.getCommitActionType());
+
+    // When: Second commit with key3
+    String instant2 = client.createNewInstantTime(false);
+    List<HoodieRecord> records2 = dataGen.generateInserts(instant2, 10);
+    JavaRDD<HoodieRecord> writeRecords2 = jsc().parallelize(records2, 2);
+
+    WriteClientTestUtils.startCommitWithTime(client, instant2);
+    List<WriteStatus> writeStatuses2 = client.insert(writeRecords2, 
instant2).collect();
+    assertNoWriteErrors(writeStatuses2);
+
+    Map<String, String> extraMetadata2 = new HashMap<>();
+    extraMetadata2.put("key3", "value3");
+    client.commitStats(instant2, 
writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+        Option.of(extraMetadata2), metaClient.getCommitActionType());
+
+    // Then: Verify second commit has all three keys (key3 from current, key1 
and key2 from walkback)
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieInstant commit2 = 
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get();
+    HoodieCommitMetadata metadata2 = TimelineUtils.getCommitMetadata(commit2, 
metaClient.getActiveTimeline());
+    assertEquals("value1", metadata2.getMetadata("key1")); // From commit1 via 
walkback
+    assertEquals("value2", metadata2.getMetadata("key2")); // From commit1 via 
walkback
+    assertEquals("value3", metadata2.getMetadata("key3")); // From current 
commit
+
+    // When: Third commit with no keys (should walk back to find all three)
+    String instant3 = client.createNewInstantTime(false);
+    List<HoodieRecord> records3 = dataGen.generateInserts(instant3, 10);
+    JavaRDD<HoodieRecord> writeRecords3 = jsc().parallelize(records3, 2);
+
+    WriteClientTestUtils.startCommitWithTime(client, instant3);
+    List<WriteStatus> writeStatuses3 = client.insert(writeRecords3, 
instant3).collect();
+    assertNoWriteErrors(writeStatuses3);
+    client.commitStats(instant3, 
writeStatuses3.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+        Option.empty(), metaClient.getCommitActionType());
+
+    // Then: Verify third commit has all three keys from walkback
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieInstant commit3 = 
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get();
+    HoodieCommitMetadata metadata3 = TimelineUtils.getCommitMetadata(commit3, 
metaClient.getActiveTimeline());
+    assertEquals("value1", metadata3.getMetadata("key1")); // From commit1 via 
walkback
+    assertEquals("value2", metadata3.getMetadata("key2")); // From commit1 via 
walkback
+    assertEquals("value3", metadata3.getMetadata("key3")); // From commit2 via 
walkback
+
+    client.close();
+  }
+
+  /**
+   * Test that rolling metadata respects the walkback limit.
+   */
+  @Test
+  public void testRollingMetadataWalkbackLimit() throws IOException {
+    // Given: A table with rolling metadata keys and small walkback limit
+    HoodieTableMetaClient metaClient = getHoodieMetaClient(storageConf(), 
URI.create(basePath()).getPath(), new Properties());
+    HoodieWriteConfig config = getConfigBuilder(true)
+        .withPath(metaClient.getBasePath())
+        .withRollingMetadataKeys("checkpoint.offset")
+        .withRollingMetadataTimelineLookbackCommits(2) // Only look back 2 
commits
+        .build();
+
+    HoodieWriteConfig configNoRollingMetadata = getConfigBuilder(true)
+        .withPath(metaClient.getBasePath())
+        .build();
+
+    HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
+    SparkRDDWriteClient client = getHoodieWriteClient(config);
+
+    String instant = client.createNewInstantTime(false);
+    List<HoodieRecord> records = dataGen.generateInserts(instant, 10);
+    JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 2);
+
+    WriteClientTestUtils.startCommitWithTime(client, instant);
+    List<WriteStatus> writeStatuses = client.insert(writeRecords, 
instant).collect();
+    assertNoWriteErrors(writeStatuses);
+    // Only first commit has the rolling metadata key
+    Map<String, String> extraMetadata = new HashMap<>();
+    extraMetadata.put("checkpoint.offset", "original-value");
+    client.commitStats(instant, 
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+        Option.of(extraMetadata), metaClient.getCommitActionType());
+
+    // add 4 more commits w/o any rolling metadata.
+    client.close();
+    client = getHoodieWriteClient(configNoRollingMetadata);
+    for (int i = 2; i <= 3; i++) {
+      instant = client.createNewInstantTime(false);
+      records = dataGen.generateInserts(instant, 10);
+      writeRecords = jsc().parallelize(records, 2);
+
+      WriteClientTestUtils.startCommitWithTime(client, instant);
+      writeStatuses = client.insert(writeRecords, instant).collect();
+      assertNoWriteErrors(writeStatuses);
+      client.commitStats(instant, 
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+          Option.empty(), metaClient.getCommitActionType());
+    }
+
+    client.close();
+    client = getHoodieWriteClient(config);
+    for (int i = 4; i <= 5; i++) {
+      instant = client.createNewInstantTime(false);
+      records = dataGen.generateInserts(instant, 10);
+      writeRecords = jsc().parallelize(records, 2);
+
+      WriteClientTestUtils.startCommitWithTime(client, instant);
+      writeStatuses = client.insert(writeRecords, instant).collect();
+      assertNoWriteErrors(writeStatuses);
+      client.commitStats(instant, 
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+          Option.empty(), metaClient.getCommitActionType());
+    }
+
+    // Then: Verify that commit 2 and 3 have the rolling metadata (within 
walkback limit)
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    List<HoodieInstant> commits = 
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstantsAsStream()
+        .collect(Collectors.toList());
+
+    HoodieCommitMetadata metadata2 = 
TimelineUtils.getCommitMetadata(commits.get(1), metaClient.getActiveTimeline());
+    assertFalse(metadata2.getExtraMetadata().containsKey("checkpoint.offset"));
+
+    HoodieCommitMetadata metadata3 = 
TimelineUtils.getCommitMetadata(commits.get(2), metaClient.getActiveTimeline());
+    assertFalse(metadata3.getExtraMetadata().containsKey("checkpoint.offset"));

Review Comment:
   <a href="#"><img alt="P2" 
src="https://greptile-static-assets.s3.amazonaws.com/badges/p2.svg?v=7"; 
align="top"></a> **Test comment contradicts its assertions**
   
   The comment on line 336 says _"Verify that commit 2 and 3 have the rolling 
metadata (within walkback limit)"_, but both assertions immediately below use 
`assertFalse` โ€” meaning the metadata is **not** present. Commits 2 and 3 were 
intentionally written with `configNoRollingMetadata` (no rolling keys 
configured), so rolling metadata is never merged for them. The comment should 
reflect that:
   
   ```suggestion
       // Then: Verify that commit 2 and 3 do NOT have the rolling metadata
       // (they were made without rolling metadata config, so no carry-forward 
happens)
   ```
   
   As written, anyone reading this test will be confused about whether the 
feature is supposed to work for commits 2 and 3 or not.
   
   โ€” *Greptile* 
([original](https://github.com/yihua/hudi/pull/25#discussion_r3048268906)) 
(source:comment#3048268906)



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestRollingMetadataSpark.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestRollingMetadataSpark extends SparkClientFunctionalTestHarness 
{
+
+  /**
+   * Test that rolling metadata is carried forward across inline compaction 
commits on a MOR table.
+   * Compaction does not call preCommit, so its commit won't have rolling 
metadata. But subsequent
+   * regular writes should walk back past the compaction commit to find 
rolling metadata from earlier commits.
+   */
+  @Test
+  public void testRollingMetadataWithInlineCompaction() throws IOException {
+    // Given: A MOR table with rolling metadata and inline compaction after 2 
delta commits
+    HoodieTableMetaClient metaClient = 
getHoodieMetaClient(HoodieTableType.MERGE_ON_READ);
+    HoodieWriteConfig config = getConfigBuilder(true)
+        .withPath(metaClient.getBasePath())
+        .withRollingMetadataKeys("checkpoint.offset")
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withInlineCompaction(true)
+            .withMaxNumDeltaCommitsBeforeCompaction(2)
+            .build())
+        .build();
+
+    HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
+
+    // When: First delta commit with rolling metadata
+    SparkRDDWriteClient client = getHoodieWriteClient(config);
+    String instant1 = client.createNewInstantTime(false);
+    List<HoodieRecord> records1 = dataGen.generateInserts(instant1, 10);
+    JavaRDD<HoodieRecord> writeRecords1 = jsc().parallelize(records1, 2);
+
+    WriteClientTestUtils.startCommitWithTime(client, instant1);
+    List<WriteStatus> writeStatuses1 = client.insert(writeRecords1, 
instant1).collect();
+    assertNoWriteErrors(writeStatuses1);
+
+    Map<String, String> extraMetadata1 = new HashMap<>();
+    extraMetadata1.put("checkpoint.offset", "1000");
+    client.commitStats(instant1, 
writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+        Option.of(extraMetadata1), metaClient.getCommitActionType());
+
+    // When: Second delta commit (should trigger inline compaction after this)
+    String instant2 = client.createNewInstantTime(false);
+    List<HoodieRecord> records2 = dataGen.generateUpdates(instant2, 10);
+    JavaRDD<HoodieRecord> writeRecords2 = jsc().parallelize(records2, 2);
+
+    WriteClientTestUtils.startCommitWithTime(client, instant2);
+    List<WriteStatus> writeStatuses2 = client.upsert(writeRecords2, 
instant2).collect();
+    assertNoWriteErrors(writeStatuses2);
+    client.commitStats(instant2, 
writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+        Option.empty(), metaClient.getCommitActionType());
+
+    // Then: Verify compaction was triggered (should see a commit action on 
the timeline)
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieTimeline writeTimeline = 
metaClient.getActiveTimeline().getWriteTimeline().filterCompletedInstants();
+    boolean hasCompaction = writeTimeline.getInstantsAsStream()
+        .anyMatch(i -> i.getAction().equals(HoodieTimeline.COMMIT_ACTION));
+    assertTrue(hasCompaction, "Inline compaction should have been triggered");
+
+    // When: Third delta commit after compaction, no rolling metadata provided
+    String instant3 = client.createNewInstantTime(false);
+    List<HoodieRecord> records3 = dataGen.generateInserts(instant3, 10);
+    JavaRDD<HoodieRecord> writeRecords3 = jsc().parallelize(records3, 2);
+
+    WriteClientTestUtils.startCommitWithTime(client, instant3);
+    List<WriteStatus> writeStatuses3 = client.insert(writeRecords3, 
instant3).collect();
+    assertNoWriteErrors(writeStatuses3);
+    client.commitStats(instant3, 
writeStatuses3.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+        Option.empty(), metaClient.getCommitActionType());
+
+    // Then: Rolling metadata should still be found by walking back past the 
compaction commit
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieInstant lastDeltaCommit = 
metaClient.getActiveTimeline().getDeltaCommitTimeline()
+        .filterCompletedInstants().lastInstant().get();
+    HoodieCommitMetadata metadata3 = 
TimelineUtils.getCommitMetadata(lastDeltaCommit, 
metaClient.getActiveTimeline());
+    assertEquals("1000", metadata3.getMetadata("checkpoint.offset"),
+        "Rolling metadata should be carried forward past compaction commit");
+
+    client.close();
+  }
+
+
+  /**
+   * Test that rolling metadata is carried forward into clustering commits.
+   * With OCC enabled, clustering calls preCommit which merges rolling 
metadata.
+   */
+  @Test
+  public void testRollingMetadataWithInlineClustering() throws IOException {
+    // Given: A CoW table with rolling metadata and inline clustering after 1 
commit
+    HoodieTableMetaClient metaClient = 
getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE);
+    HoodieWriteConfig config = getConfigBuilder(true)
+        .withPath(metaClient.getBasePath())
+        .withRollingMetadataKeys("checkpoint.offset,checkpoint.partition")
+        .withClusteringConfig(HoodieClusteringConfig.newBuilder()
+            .withClusteringMaxNumGroups(10)
+            .withClusteringTargetPartitions(0)
+            .withInlineClustering(true)
+            .withInlineClusteringNumCommits(1)
+            .build())
+        .build();
+
+    HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
+
+    // When: First commit with rolling metadata
+    SparkRDDWriteClient client = getHoodieWriteClient(config);
+    String instant1 = client.createNewInstantTime(false);
+    List<HoodieRecord> records1 = dataGen.generateInserts(instant1, 100);
+    JavaRDD<HoodieRecord> writeRecords1 = jsc().parallelize(records1, 2);
+
+    WriteClientTestUtils.startCommitWithTime(client, instant1);
+    List<WriteStatus> writeStatuses1 = client.insert(writeRecords1, 
instant1).collect();
+    assertNoWriteErrors(writeStatuses1);
+
+    Map<String, String> extraMetadata1 = new HashMap<>();
+    extraMetadata1.put("checkpoint.offset", "5000");
+    extraMetadata1.put("checkpoint.partition", "p1");
+    client.commitStats(instant1, 
writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+        Option.of(extraMetadata1), metaClient.getCommitActionType());
+
+    // When: Second commit with more inserts (should trigger inline clustering 
after commit)
+    String instant2 = client.createNewInstantTime(false);
+    List<HoodieRecord> records2 = dataGen.generateInserts(instant2, 100);
+    JavaRDD<HoodieRecord> writeRecords2 = jsc().parallelize(records2, 2);
+
+    WriteClientTestUtils.startCommitWithTime(client, instant2);
+    List<WriteStatus> writeStatuses2 = client.insert(writeRecords2, 
instant2).collect();
+    assertNoWriteErrors(writeStatuses2);
+
+    Map<String, String> extraMetadata2 = new HashMap<>();
+    extraMetadata2.put("checkpoint.offset", "6000");
+    client.commitStats(instant2, 
writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+        Option.of(extraMetadata2), metaClient.getCommitActionType());
+
+    // Then: Verify clustering was triggered
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieTimeline writeTimeline = 
metaClient.getActiveTimeline().getWriteTimeline().filterCompletedInstants();
+    boolean hasClustering = writeTimeline.getInstantsAsStream()
+        .anyMatch(i -> 
i.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)
+            || i.getAction().equals(HoodieTimeline.CLUSTERING_ACTION));
+
+    if (hasClustering) {
+      // Find the clustering commit and verify it has rolling metadata
+      HoodieInstant clusteringInstant = writeTimeline.getInstantsAsStream()
+          .filter(i -> 
i.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)
+              || i.getAction().equals(HoodieTimeline.CLUSTERING_ACTION))
+          .reduce((a, b) -> b).get();
+      HoodieCommitMetadata clusteringMetadata = 
TimelineUtils.getCommitMetadata(clusteringInstant, 
metaClient.getActiveTimeline());
+      assertEquals("6000", clusteringMetadata.getMetadata("checkpoint.offset"),
+          "Clustering commit should carry forward latest rolling metadata");
+      assertEquals("p1", 
clusteringMetadata.getMetadata("checkpoint.partition"),
+          "Clustering commit should carry forward rolling metadata from 
earlier commits");
+    }

Review Comment:
   <a href="#"><img alt="P2" 
src="https://greptile-static-assets.s3.amazonaws.com/badges/p2.svg?v=7"; 
align="top"></a> **Key clustering assertions may be silently skipped**
   
   The assertions for the clustering commit's rolling metadata are wrapped 
inside `if (hasClustering)`. If clustering is not triggered for any reason 
(e.g., not enough data, config change), the block is skipped and the test 
passes without verifying the behaviour it is supposed to test. Consider using 
`assertTrue(hasClustering, "Inline clustering should have been triggered")` 
before the assertions so that a missing clustering commit fails the test rather 
than silently succeeding:
   
   ```suggestion
       assertTrue(hasClustering, "Inline clustering should have been 
triggered");
       // Find the clustering commit and verify it has rolling metadata
       HoodieInstant clusteringInstant = writeTimeline.getInstantsAsStream()
           .filter(i -> 
i.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)
               || i.getAction().equals(HoodieTimeline.CLUSTERING_ACTION))
           .reduce((a, b) -> b).get();
       HoodieCommitMetadata clusteringMetadata = 
TimelineUtils.getCommitMetadata(clusteringInstant, 
metaClient.getActiveTimeline());
       assertEquals("6000", clusteringMetadata.getMetadata("checkpoint.offset"),
           "Clustering commit should carry forward latest rolling metadata");
       assertEquals("p1", 
clusteringMetadata.getMetadata("checkpoint.partition"),
           "Clustering commit should carry forward rolling metadata from 
earlier commits");
   ```
   
   โ€” *Greptile* 
([original](https://github.com/yihua/hudi/pull/25#discussion_r3048268952)) 
(source:comment#3048268952)



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestRollingMetadataSpark.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestRollingMetadataSpark extends SparkClientFunctionalTestHarness 
{
+
+  /**
+   * Test that rolling metadata is carried forward across inline compaction 
commits on a MOR table.
+   * Compaction does not call preCommit, so its commit won't have rolling 
metadata. But subsequent
+   * regular writes should walk back past the compaction commit to find 
rolling metadata from earlier commits.
+   */
+  @Test
+  public void testRollingMetadataWithInlineCompaction() throws IOException {
+    // Given: A MOR table with rolling metadata and inline compaction after 2 
delta commits
+    HoodieTableMetaClient metaClient = 
getHoodieMetaClient(HoodieTableType.MERGE_ON_READ);
+    HoodieWriteConfig config = getConfigBuilder(true)
+        .withPath(metaClient.getBasePath())
+        .withRollingMetadataKeys("checkpoint.offset")
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withInlineCompaction(true)
+            .withMaxNumDeltaCommitsBeforeCompaction(2)
+            .build())
+        .build();
+
+    HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
+
+    // When: First delta commit with rolling metadata
+    SparkRDDWriteClient client = getHoodieWriteClient(config);
+    String instant1 = client.createNewInstantTime(false);
+    List<HoodieRecord> records1 = dataGen.generateInserts(instant1, 10);
+    JavaRDD<HoodieRecord> writeRecords1 = jsc().parallelize(records1, 2);
+
+    WriteClientTestUtils.startCommitWithTime(client, instant1);
+    List<WriteStatus> writeStatuses1 = client.insert(writeRecords1, 
instant1).collect();
+    assertNoWriteErrors(writeStatuses1);
+
+    Map<String, String> extraMetadata1 = new HashMap<>();
+    extraMetadata1.put("checkpoint.offset", "1000");
+    client.commitStats(instant1, 
writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+        Option.of(extraMetadata1), metaClient.getCommitActionType());
+
+    // When: Second delta commit (should trigger inline compaction after this)
+    String instant2 = client.createNewInstantTime(false);
+    List<HoodieRecord> records2 = dataGen.generateUpdates(instant2, 10);
+    JavaRDD<HoodieRecord> writeRecords2 = jsc().parallelize(records2, 2);
+
+    WriteClientTestUtils.startCommitWithTime(client, instant2);
+    List<WriteStatus> writeStatuses2 = client.upsert(writeRecords2, 
instant2).collect();
+    assertNoWriteErrors(writeStatuses2);
+    client.commitStats(instant2, 
writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+        Option.empty(), metaClient.getCommitActionType());
+
+    // Then: Verify compaction was triggered (should see a commit action on 
the timeline)
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieTimeline writeTimeline = 
metaClient.getActiveTimeline().getWriteTimeline().filterCompletedInstants();
+    boolean hasCompaction = writeTimeline.getInstantsAsStream()
+        .anyMatch(i -> i.getAction().equals(HoodieTimeline.COMMIT_ACTION));
+    assertTrue(hasCompaction, "Inline compaction should have been triggered");
+
+    // When: Third delta commit after compaction, no rolling metadata provided
+    String instant3 = client.createNewInstantTime(false);
+    List<HoodieRecord> records3 = dataGen.generateInserts(instant3, 10);
+    JavaRDD<HoodieRecord> writeRecords3 = jsc().parallelize(records3, 2);
+
+    WriteClientTestUtils.startCommitWithTime(client, instant3);
+    List<WriteStatus> writeStatuses3 = client.insert(writeRecords3, 
instant3).collect();
+    assertNoWriteErrors(writeStatuses3);
+    client.commitStats(instant3, 
writeStatuses3.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+        Option.empty(), metaClient.getCommitActionType());
+
+    // Then: Rolling metadata should still be found by walking back past the 
compaction commit
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieInstant lastDeltaCommit = 
metaClient.getActiveTimeline().getDeltaCommitTimeline()
+        .filterCompletedInstants().lastInstant().get();
+    HoodieCommitMetadata metadata3 = 
TimelineUtils.getCommitMetadata(lastDeltaCommit, 
metaClient.getActiveTimeline());
+    assertEquals("1000", metadata3.getMetadata("checkpoint.offset"),
+        "Rolling metadata should be carried forward past compaction commit");
+
+    client.close();
+  }
+
+
+  /**
+   * Test that rolling metadata is carried forward into clustering commits.
+   * With OCC enabled, clustering calls preCommit which merges rolling 
metadata.
+   */
+  @Test
+  public void testRollingMetadataWithInlineClustering() throws IOException {
+    // Given: A CoW table with rolling metadata and inline clustering after 1 
commit
+    HoodieTableMetaClient metaClient = 
getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE);
+    HoodieWriteConfig config = getConfigBuilder(true)
+        .withPath(metaClient.getBasePath())
+        .withRollingMetadataKeys("checkpoint.offset,checkpoint.partition")
+        .withClusteringConfig(HoodieClusteringConfig.newBuilder()
+            .withClusteringMaxNumGroups(10)
+            .withClusteringTargetPartitions(0)
+            .withInlineClustering(true)
+            .withInlineClusteringNumCommits(1)
+            .build())

Review Comment:
   _โš ๏ธ Potential issue_ | _๐ŸŸก Minor_
   
   **Make the clustering path deterministic in this regression test.**
   
   With `withInlineClusteringNumCommits(1)` and the `if (hasClustering)` guard, 
this test can still pass without ever hitting the new clustering 
`preCommit(...)` path. Prefer a setup that clusters after the second write and 
assert `hasClustering` unconditionally.
   
   
   
   <details>
   <summary>๐Ÿงช One way to tighten the test</summary>
   
   ```diff
            .withClusteringConfig(HoodieClusteringConfig.newBuilder()
                .withClusteringMaxNumGroups(10)
                .withClusteringTargetPartitions(0)
                .withInlineClustering(true)
   -            .withInlineClusteringNumCommits(1)
   +            .withInlineClusteringNumCommits(2)
                .build())
            .build();
   @@
   -    if (hasClustering) {
   -      // Find the clustering commit and verify it has rolling metadata
   -      HoodieInstant clusteringInstant = writeTimeline.getInstantsAsStream()
   -          .filter(i -> 
i.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)
   -              || i.getAction().equals(HoodieTimeline.CLUSTERING_ACTION))
   -          .reduce((a, b) -> b).get();
   -      HoodieCommitMetadata clusteringMetadata = 
TimelineUtils.getCommitMetadata(clusteringInstant, 
metaClient.getActiveTimeline());
   -      assertEquals("6000", 
clusteringMetadata.getMetadata("checkpoint.offset"),
   -          "Clustering commit should carry forward latest rolling metadata");
   -      assertEquals("p1", 
clusteringMetadata.getMetadata("checkpoint.partition"),
   -          "Clustering commit should carry forward rolling metadata from 
earlier commits");
   -    }
   +    assertTrue(hasClustering, "Inline clustering should have been 
triggered");
   +    HoodieInstant clusteringInstant = writeTimeline.getInstantsAsStream()
   +        .filter(i -> 
i.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)
   +            || i.getAction().equals(HoodieTimeline.CLUSTERING_ACTION))
   +        .reduce((a, b) -> b).get();
   +    HoodieCommitMetadata clusteringMetadata = 
TimelineUtils.getCommitMetadata(clusteringInstant, 
metaClient.getActiveTimeline());
   +    assertEquals("6000", 
clusteringMetadata.getMetadata("checkpoint.offset"),
   +        "Clustering commit should carry forward latest rolling metadata");
   +    assertEquals("p1", 
clusteringMetadata.getMetadata("checkpoint.partition"),
   +        "Clustering commit should carry forward rolling metadata from 
earlier commits");
   ```
   </details>
   
   
   Also applies to: 178-196
   
   <details>
   <summary>๐Ÿค– Prompt for AI Agents</summary>
   
   ```
   Verify each finding against the current code and only fix it if needed.
   
   In
   
`@hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestRollingMetadataSpark.java`
   around lines 138 - 143, The test uses withInlineClusteringNumCommits(1) and a
   conditional guard if (hasClustering) that allows the test to pass without
   exercising the clustering preCommit path; change the test setup in
   TestRollingMetadataSpark to make clustering deterministic by configuring
   withInlineClusteringNumCommits(2) (or otherwise ensure clustering runs after 
the
   second write) and remove the conditional hasClustering guard so the test 
always
   asserts clustering behavior; update assertions to unconditionally verify
   clustering ran (e.g., that preCommit path effects are visible) and apply the
   same change to the other occurrence around lines 178-196 where
   withClusteringConfig/withInlineClusteringNumCommits and hasClustering are 
used.
   ```
   
   </details>
   
   <!-- 
fingerprinting:phantom:medusa:grasshopper:15c30537-b1db-44ce-8f21-20ab6360bab0 
-->
   
   <!-- This is an auto-generated comment by CodeRabbit -->
   
   โ€” *CodeRabbit* 
([original](https://github.com/yihua/hudi/pull/25#discussion_r3048270716)) 
(source:comment#3048270716)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to