yihua commented on code in PR #18421:
URL: https://github.com/apache/hudi/pull/18421#discussion_r3048271980
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -172,11 +172,15 @@ protected void setTableServiceTimer(WriteOperationType
operationType) {
*
* @param metadata commit metadata for which pre commit is being invoked.
*/
- protected void preCommit(HoodieCommitMetadata metadata) {
+ protected void preCommit(HoodieCommitMetadata metadata, boolean
executeConflictResolution) {
// Create a Hoodie table after startTxn which encapsulated the commits and
files visible.
// Important to create this after the lock to ensure the latest commits
show up in the timeline without need for reload
HoodieTable table = createTable(config, storageConf);
- resolveWriteConflict(table, metadata,
this.pendingInflightAndRequestedInstants);
+ if (executeConflictResolution) {
+ resolveWriteConflict(table, metadata,
this.pendingInflightAndRequestedInstants);
+ }
+ // Merge rolling metadata after conflict resolution, still within the lock
+ mergeRollingMetadata(table, metadata);
}
Review Comment:
_โ ๏ธ Potential issue_ | _๐ Major_
**Compaction still bypasses the new rolling-metadata hook.**
This wires `mergeRollingMetadata(...)` into log compaction and clustering
via `preCommit(...)`, but `completeCompaction(...)` still commits without it.
With `hoodie.write.rolling.metadata.timeline.lookback.commits=1`, a compaction
instant becomes the newest completed commit without the carried keys, so the
next write can drop metadata unexpectedly.
<details>
<summary>๐ก Minimal fix</summary>
```diff
protected void completeCompaction(HoodieCommitMetadata metadata,
HoodieTable table, String compactionCommitTime, List<HoodieWriteStat>
partialMetadataWriteStats) {
try {
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect
compaction write status and commit compaction: " + config.getTableName());
List<HoodieWriteStat> writeStats = metadata.getWriteStats();
handleWriteErrors(writeStats, TableServiceType.COMPACT);
InstantGenerator instantGenerator =
table.getMetaClient().getInstantGenerator();
final HoodieInstant compactionInstant =
instantGenerator.getCompactionInflightInstant(compactionCommitTime);
try {
this.txnManager.beginStateChange(Option.of(compactionInstant),
Option.empty());
finalizeWrite(table, compactionCommitTime, writeStats);
+ mergeRollingMetadata(table, metadata);
// commit to data table after committing to metadata table.
writeToMetadataTable(table, compactionCommitTime, metadata,
partialMetadataWriteStats);
log.info("Committing Compaction {}", compactionCommitTime);
CompactHelpers.getInstance().completeInflightCompaction(table,
compactionCommitTime, metadata);
```
</details>
Also applies to: 472-472, 607-608
<details>
<summary>๐ค Prompt for AI Agents</summary>
```
Verify each finding against the current code and only fix it if needed.
In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java`
around lines 175 - 184, Compaction (and other commit paths like
completeCompaction and the commit paths around lines ~472 and ~607) bypass
the
new rolling-metadata merge, causing carried keys to be lost; fix by invoking
mergeRollingMetadata(...) wherever commits are finalized just like in
preCommit:
after creating the HoodieTable (e.g., via createTable(config, storageConf))
and
before the actual commit/commitWrite, construct or obtain the
HoodieCommitMetadata for that operation and call mergeRollingMetadata(table,
metadata) (or mergeRollingMetadata(table, compactionMetadata)) inside
completeCompaction and the other commit-completing methods so rolling
metadata
is merged under the same lock/precedence as preCommit.
```
</details>
<!--
fingerprinting:phantom:medusa:grasshopper:15c30537-b1db-44ce-8f21-20ab6360bab0
-->
<!-- This is an auto-generated comment by CodeRabbit -->
โ *CodeRabbit*
([original](https://github.com/yihua/hudi/pull/25#discussion_r3048270708))
(source:comment#3048270708)
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -3524,6 +3562,16 @@ public Builder withReleaseResourceEnabled(boolean
enabled) {
return this;
}
+ public Builder withRollingMetadataKeys(String keys) {
+ writeConfig.setValue(ROLLING_METADATA_KEYS, keys);
+ return this;
+ }
+
+ public Builder withRollingMetadataTimelineLookbackCommits(int
lookbackCommits) {
+ writeConfig.setValue(ROLLING_METADATA_TIMELINE_LOOKBACK_COMMITS,
String.valueOf(lookbackCommits));
+ return this;
+ }
Review Comment:
_โ ๏ธ Potential issue_ | _๐ Major_
**Validate lookback commits as non-negative before accepting config.**
Line 3570 currently accepts any integer. Negative lookback values are
invalid for timeline traversal semantics and can cause downstream runtime
issues during commit metadata merge.
<details>
<summary>๐ง Proposed fix</summary>
```diff
public Builder withRollingMetadataTimelineLookbackCommits(int
lookbackCommits) {
+ checkArgument(lookbackCommits >= 0,
+ "hoodie.write.rolling.metadata.timeline.lookback.commits must be >=
0");
writeConfig.setValue(ROLLING_METADATA_TIMELINE_LOOKBACK_COMMITS,
String.valueOf(lookbackCommits));
return this;
}
```
</details>
Also add a defensive check in `Builder.validate()` so invalid values from
raw property files are rejected during `build()` as well.
<details>
<summary>๐ค Prompt for AI Agents</summary>
```
Verify each finding against the current code and only fix it if needed.
In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java`
around lines 3570 - 3573, Add validation to reject negative lookback values:
in
Builder.withRollingMetadataTimelineLookbackCommits(int lookbackCommits) check
lookbackCommits >= 0 and throw an IllegalArgumentException if negative
instead
of setting the property; keep using
ROLLING_METADATA_TIMELINE_LOOKBACK_COMMITS
as the config key. Also add a defensive check inside Builder.validate() to
read
the raw property ROLLING_METADATA_TIMELINE_LOOKBACK_COMMITS (parseInt the
stored
string), verify it is >= 0, and throw a clear
ConfigValidationException/IllegalArgumentException during build() if the
parsed
value is invalid so values from property files are rejected early.
```
</details>
<!--
fingerprinting:phantom:poseidon:hawk:eb49d22d-5850-456a-af4c-9c65ea289870 -->
<!-- This is an auto-generated comment by CodeRabbit -->
โ *CodeRabbit*
([original](https://github.com/yihua/hudi/pull/25#discussion_r3048270713))
(source:comment#3048270713)
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java:
##########
@@ -311,4 +316,116 @@ protected boolean
isStreamingWriteToMetadataEnabled(HoodieTable table) {
return config.isMetadataTableEnabled()
&&
config.isMetadataStreamingWritesEnabled(table.getMetaClient().getTableConfig().getTableVersion());
}
+
+ /**
+ * Merges rolling metadata from recent completed commits into the current
commit metadata.
+ * This method MUST be called within the transaction lock after conflict
resolution.
+ *
+ * <p>Rolling metadata keys configured via {@link
HoodieWriteConfig#ROLLING_METADATA_KEYS} will be
+ * automatically carried forward from recent commits. The system walks back
up to
+ * {@link HoodieWriteConfig#ROLLING_METADATA_TIMELINE_LOOKBACK_COMMITS}
commits to find the most
+ * recent value for each key. This ensures that important metadata like
checkpoint information
+ * remains accessible without worrying about archival or missing keys in
individual commits.
+ *
+ * @param table HoodieTable instance (may have refreshed timeline after
conflict resolution)
+ * @param metadata Current commit metadata to be augmented with rolling
metadata
+ */
+ protected void mergeRollingMetadata(HoodieTable table, HoodieCommitMetadata
metadata) {
+ // Skip for metadata table - rolling metadata is only for data tables
+ if (table.isMetadataTable()) {
+ return;
+ }
+
+ Set<String> rollingKeys = config.getRollingMetadataKeys();
+ if (rollingKeys.isEmpty()) {
+ return; // No rolling metadata configured
+ }
+
+ // IMPORTANT: We're inside the lock here. The timeline in 'table' is
either:
+ // 1. Fresh from createTable() if no conflict resolution happened
+ // 2. Reloaded during resolveWriteConflict() if conflicts were checked
+ // In both cases, we have the latest view of the timeline.
+
+ HoodieTimeline commitsTimeline =
table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+
+ if (commitsTimeline.empty()) {
+ log.info("No previous commits found. Rolling metadata will start with
current commit.");
+ return; // First commit - nothing to roll forward
+ }
+
+ try {
+ Map<String, String> existingExtraMetadata = metadata.getExtraMetadata();
+ Map<String, String> foundRollingMetadata = new HashMap<>();
+ Set<String> remainingKeys = new HashSet<>(rollingKeys);
+
+ // Remove keys that are already present in current commit (current
values take precedence)
+ for (String key : rollingKeys) {
+ if (existingExtraMetadata.containsKey(key)) {
+ remainingKeys.remove(key);
+ }
+ }
+
+ if (remainingKeys.isEmpty()) {
+ log.debug("All rolling metadata keys are present in current commit. No
walkback needed.");
+ return;
+ }
+
+ int lookbackLimit = config.getRollingMetadataTimelineLookbackCommits();
+ int commitsWalkedBack = 0;
+
+ // Walk back through the timeline in reverse order (most recent first)
to find values for all remaining keys
+ List<HoodieInstant> recentCommits =
commitsTimeline.getReverseOrderedInstantsByCompletionTime()
+ .limit(lookbackLimit)
+ .collect(Collectors.toList());
+
+ log.debug("Walking back up to {} commits to find rolling metadata for
keys: {}",
+ lookbackLimit, remainingKeys);
+
+ for (HoodieInstant instant : recentCommits) {
+ if (remainingKeys.isEmpty()) {
+ break; // Found all keys
+ }
+
+ commitsWalkedBack++;
+ HoodieCommitMetadata commitMetadata =
table.getMetaClient().getActiveTimeline().readInstantContent(instant,
HoodieCommitMetadata.class);
+
+ // Check for remaining keys in this commit
+ for (String key : new HashSet<>(remainingKeys)) {
+ String value = commitMetadata.getMetadata(key);
+ if (value != null) {
+ foundRollingMetadata.put(key, value);
+ remainingKeys.remove(key);
+ log.debug("Found rolling metadata key '{}' in commit {} with
value: {}",
+ key, instant.requestedTime(), value);
Review Comment:
_โ ๏ธ Potential issue_ | _๐ Major_
**Avoid logging raw rolling-metadata values.**
`value` is arbitrary extra metadata from user commits. Emitting it verbatim
can leak checkpoints/secrets and can also bloat debug logs for large payloads.
Log the key and instant only, or a redacted/hash form.
<details>
<summary>๐ Minimal redaction</summary>
```diff
- log.debug("Found rolling metadata key '{}' in commit {} with
value: {}",
- key, instant.requestedTime(), value);
+ log.debug("Found rolling metadata key '{}' in commit {}",
+ key, instant.requestedTime());
```
</details>
<!-- suggestion_start -->
<details>
<summary>๐ Committable suggestion</summary>
> โผ๏ธ **IMPORTANT**
> Carefully review the code before committing. Ensure that it accurately
replaces the highlighted code, contains no missing lines, and has no issues
with indentation. Thoroughly test & benchmark the code to ensure it meets the
requirements.
```suggestion
log.debug("Found rolling metadata key '{}' in commit {}",
key, instant.requestedTime());
```
</details>
<!-- suggestion_end -->
<details>
<summary>๐ค Prompt for AI Agents</summary>
```
Verify each finding against the current code and only fix it if needed.
In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java`
around lines 398 - 399, The debug log in BaseHoodieClient currently emits the
raw rolling-metadata value (log.debug in the method around the commit
handling),
which can leak sensitive or large payloads; change the log to omit the raw
value
and instead log only the metadata key and instant.requestedTime(), or log a
redacted representation (e.g., a short hash or fixed placeholder) of the
value
before passing to log.debug. Update the call site that uses "value" in the
log
message so it no longer prints the raw value but uses the key and
instant.requestedTime() and, if desired, a computed short hash/redaction
string
for value.
```
</details>
<!--
fingerprinting:phantom:medusa:grasshopper:15c30537-b1db-44ce-8f21-20ab6360bab0
-->
<!-- This is an auto-generated comment by CodeRabbit -->
โ *CodeRabbit*
([original](https://github.com/yihua/hudi/pull/25#discussion_r3048270702))
(source:comment#3048270702)
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java:
##########
@@ -311,4 +316,116 @@ protected boolean
isStreamingWriteToMetadataEnabled(HoodieTable table) {
return config.isMetadataTableEnabled()
&&
config.isMetadataStreamingWritesEnabled(table.getMetaClient().getTableConfig().getTableVersion());
}
+
+ /**
+ * Merges rolling metadata from recent completed commits into the current
commit metadata.
+ * This method MUST be called within the transaction lock after conflict
resolution.
+ *
+ * <p>Rolling metadata keys configured via {@link
HoodieWriteConfig#ROLLING_METADATA_KEYS} will be
+ * automatically carried forward from recent commits. The system walks back
up to
+ * {@link HoodieWriteConfig#ROLLING_METADATA_TIMELINE_LOOKBACK_COMMITS}
commits to find the most
+ * recent value for each key. This ensures that important metadata like
checkpoint information
+ * remains accessible without worrying about archival or missing keys in
individual commits.
+ *
+ * @param table HoodieTable instance (may have refreshed timeline after
conflict resolution)
+ * @param metadata Current commit metadata to be augmented with rolling
metadata
+ */
+ protected void mergeRollingMetadata(HoodieTable table, HoodieCommitMetadata
metadata) {
+ // Skip for metadata table - rolling metadata is only for data tables
+ if (table.isMetadataTable()) {
+ return;
+ }
+
+ Set<String> rollingKeys = config.getRollingMetadataKeys();
+ if (rollingKeys.isEmpty()) {
+ return; // No rolling metadata configured
+ }
+
+ // IMPORTANT: We're inside the lock here. The timeline in 'table' is
either:
+ // 1. Fresh from createTable() if no conflict resolution happened
+ // 2. Reloaded during resolveWriteConflict() if conflicts were checked
+ // In both cases, we have the latest view of the timeline.
+
+ HoodieTimeline commitsTimeline =
table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+
+ if (commitsTimeline.empty()) {
+ log.info("No previous commits found. Rolling metadata will start with
current commit.");
+ return; // First commit - nothing to roll forward
+ }
+
+ try {
+ Map<String, String> existingExtraMetadata = metadata.getExtraMetadata();
+ Map<String, String> foundRollingMetadata = new HashMap<>();
+ Set<String> remainingKeys = new HashSet<>(rollingKeys);
+
+ // Remove keys that are already present in current commit (current
values take precedence)
+ for (String key : rollingKeys) {
+ if (existingExtraMetadata.containsKey(key)) {
+ remainingKeys.remove(key);
+ }
+ }
+
+ if (remainingKeys.isEmpty()) {
+ log.debug("All rolling metadata keys are present in current commit. No
walkback needed.");
+ return;
+ }
+
+ int lookbackLimit = config.getRollingMetadataTimelineLookbackCommits();
+ int commitsWalkedBack = 0;
+
+ // Walk back through the timeline in reverse order (most recent first)
to find values for all remaining keys
+ List<HoodieInstant> recentCommits =
commitsTimeline.getReverseOrderedInstantsByCompletionTime()
+ .limit(lookbackLimit)
+ .collect(Collectors.toList());
+
+ log.debug("Walking back up to {} commits to find rolling metadata for
keys: {}",
+ lookbackLimit, remainingKeys);
+
+ for (HoodieInstant instant : recentCommits) {
+ if (remainingKeys.isEmpty()) {
+ break; // Found all keys
+ }
+
+ commitsWalkedBack++;
+ HoodieCommitMetadata commitMetadata =
table.getMetaClient().getActiveTimeline().readInstantContent(instant,
HoodieCommitMetadata.class);
+
+ // Check for remaining keys in this commit
+ for (String key : new HashSet<>(remainingKeys)) {
+ String value = commitMetadata.getMetadata(key);
+ if (value != null) {
+ foundRollingMetadata.put(key, value);
+ remainingKeys.remove(key);
+ log.debug("Found rolling metadata key '{}' in commit {} with
value: {}",
+ key, instant.requestedTime(), value);
+ }
+ }
+ }
+
+ // Add found rolling metadata to current commit
+ int rolledForwardCount = 0;
+ for (Map.Entry<String, String> entry : foundRollingMetadata.entrySet()) {
+ metadata.addMetadata(entry.getKey(), entry.getValue());
+ rolledForwardCount++;
+ }
+
+ int updatedCount = rollingKeys.size() - remainingKeys.size() -
rolledForwardCount;
+
+ if (rolledForwardCount > 0 || updatedCount > 0 ||
!remainingKeys.isEmpty()) {
+ log.info("Rolling metadata merge completed. Walked back {} commits. "
+ + "Rolled forward: {}, Updated in current: {}, Not found: {},
Total rolling keys: {}",
+ commitsWalkedBack, rolledForwardCount, updatedCount,
remainingKeys.size(), rollingKeys.size());
Review Comment:
<a href="#"><img alt="P2"
src="https://greptile-static-assets.s3.amazonaws.com/badges/p2.svg?v=7"
align="top"></a> **`rolledForwardCount` can be simplified to
`foundRollingMetadata.size()`**
`rolledForwardCount` is incremented once per entry inside a `for-each` over
`foundRollingMetadata`, making it always equal to
`foundRollingMetadata.size()`. The manual counter can be removed:
```suggestion
int rolledForwardCount = foundRollingMetadata.size();
int updatedCount = rollingKeys.size() - remainingKeys.size() -
rolledForwardCount;
if (rolledForwardCount > 0 || updatedCount > 0 ||
!remainingKeys.isEmpty()) {
log.info("Rolling metadata merge completed. Walked back {} commits. "
+ "Rolled forward: {}, Updated in current: {}, Not found:
{}, Total rolling keys: {}",
commitsWalkedBack, rolledForwardCount, updatedCount,
remainingKeys.size(), rollingKeys.size());
}
```
โ *Greptile*
([original](https://github.com/yihua/hudi/pull/25#discussion_r3048268992))
(source:comment#3048268992)
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestRollingMetadata.java:
##########
@@ -0,0 +1,577 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for rolling metadata functionality.
+ */
+class TestRollingMetadata extends SparkClientFunctionalTestHarness {
+
+ /**
+ * Test that rolling metadata keys are carried forward to subsequent commits.
+ */
+ @Test
+ public void testRollingMetadataCarriedForward() throws IOException {
+ // Given: A table with rolling metadata keys configured
+ HoodieTableMetaClient metaClient = getHoodieMetaClient(storageConf(),
URI.create(basePath()).getPath(), new Properties());
+ HoodieWriteConfig config = getConfigBuilder(true)
+ .withPath(metaClient.getBasePath())
+ .withRollingMetadataKeys("checkpoint.offset,checkpoint.partition")
+ .build();
+
+ HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
+ SparkRDDWriteClient client = getHoodieWriteClient(config);
+
+ // When: First commit with rolling metadata keys
+ String instant1 = client.createNewInstantTime(false);
+ List<HoodieRecord> records1 = dataGen.generateInserts(instant1, 10);
+ JavaRDD<HoodieRecord> writeRecords1 = jsc().parallelize(records1, 2);
+
+ WriteClientTestUtils.startCommitWithTime(client, instant1);
+ List<WriteStatus> writeStatuses1 = client.insert(writeRecords1,
instant1).collect();
+ assertNoWriteErrors(writeStatuses1);
+
+ // Add rolling metadata to first commit
+ Map<String, String> extraMetadata1 = new HashMap<>();
+ extraMetadata1.put("checkpoint.offset", "1000");
+ extraMetadata1.put("checkpoint.partition", "partition-0");
+ client.commitStats(instant1,
writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+ Option.of(extraMetadata1), metaClient.getCommitActionType());
+
+ // Then: Verify first commit has the rolling metadata
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieInstant commit1 =
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get();
+ HoodieCommitMetadata metadata1 = TimelineUtils.getCommitMetadata(commit1,
metaClient.getActiveTimeline());
+ assertEquals("1000", metadata1.getMetadata("checkpoint.offset"));
+ assertEquals("partition-0", metadata1.getMetadata("checkpoint.partition"));
+
+ // When: Second commit updates one rolling metadata key
+ String instant2 = client.createNewInstantTime(false);
+ List<HoodieRecord> records2 = dataGen.generateInserts(instant2, 10);
+ JavaRDD<HoodieRecord> writeRecords2 = jsc().parallelize(records2, 2);
+
+ WriteClientTestUtils.startCommitWithTime(client, instant2);
+ List<WriteStatus> writeStatuses2 = client.insert(writeRecords2,
instant2).collect();
+ assertNoWriteErrors(writeStatuses2);
+
+ // Only update checkpoint.offset, not checkpoint.partition
+ Map<String, String> extraMetadata2 = new HashMap<>();
+ extraMetadata2.put("checkpoint.offset", "2000");
+ client.commitStats(instant2,
writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+ Option.of(extraMetadata2), metaClient.getCommitActionType());
+
+ // Then: Verify second commit has both rolling metadata keys (one carried
forward, one updated)
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieInstant commit2 =
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get();
+ HoodieCommitMetadata metadata2 = TimelineUtils.getCommitMetadata(commit2,
metaClient.getActiveTimeline());
+ assertEquals("2000", metadata2.getMetadata("checkpoint.offset")); //
Updated value
+ assertEquals("partition-0",
metadata2.getMetadata("checkpoint.partition")); // Carried forward from commit1
+
+ // When: Third commit with no rolling metadata keys
+ String instant3 = client.createNewInstantTime(false);
+ List<HoodieRecord> records3 = dataGen.generateInserts(instant3, 10);
+ JavaRDD<HoodieRecord> writeRecords3 = jsc().parallelize(records3, 2);
+
+ client.close();
+ client = getHoodieWriteClient(config);
+
+ WriteClientTestUtils.startCommitWithTime(client, instant3);
+ List<WriteStatus> writeStatuses3 = client.insert(writeRecords3,
instant3).collect();
+ assertNoWriteErrors(writeStatuses3);
+ client.commitStats(instant3,
writeStatuses3.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+ Option.empty(), metaClient.getCommitActionType());
+
+ // Then: Verify third commit has all rolling metadata keys carried forward
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieInstant commit3 =
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get();
+ HoodieCommitMetadata metadata3 = TimelineUtils.getCommitMetadata(commit3,
metaClient.getActiveTimeline());
+ assertEquals("2000", metadata3.getMetadata("checkpoint.offset")); //
Carried forward from commit2
+ assertEquals("partition-0",
metadata3.getMetadata("checkpoint.partition")); // Carried forward from commit1
+
+ client.close();
+ }
+
+ /**
+ * Test that rolling metadata works with no configured keys (default
behavior).
+ */
+ @Test
+ public void testRollingMetadataWithNoConfiguredKeys() throws IOException {
+ // Given: A table with no rolling metadata keys configured
+ HoodieTableMetaClient metaClient = getHoodieMetaClient(storageConf(),
URI.create(basePath()).getPath(), new Properties());
+ HoodieWriteConfig config = getConfigBuilder(true)
+ .withPath(metaClient.getBasePath())
+ .build();
+
+ HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
+ SparkRDDWriteClient client = getHoodieWriteClient(config);
+
+ // When: Commit with extra metadata
+ String instant1 = client.createNewInstantTime(false);
+ List<HoodieRecord> records1 = dataGen.generateInserts(instant1, 10);
+ JavaRDD<HoodieRecord> writeRecords1 = jsc().parallelize(records1, 2);
+
+ WriteClientTestUtils.startCommitWithTime(client, instant1);
+ List<WriteStatus> writeStatuses1 = client.insert(writeRecords1,
instant1).collect();
+ assertNoWriteErrors(writeStatuses1);
+
+ Map<String, String> extraMetadata1 = new HashMap<>();
+ extraMetadata1.put("some.key", "value1");
+ client.commitStats(instant1,
writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+ Option.of(extraMetadata1), metaClient.getCommitActionType());
+
+ // Then: Verify metadata exists in first commit
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieInstant commit1 =
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get();
+ HoodieCommitMetadata metadata1 = TimelineUtils.getCommitMetadata(commit1,
metaClient.getActiveTimeline());
+ assertEquals("value1", metadata1.getMetadata("some.key"));
+
+ client.close();
+ client = getHoodieWriteClient(config);
+ // When: Second commit without the key
+ String instant2 = client.createNewInstantTime(false);
+ List<HoodieRecord> records2 = dataGen.generateInserts(instant2, 10);
+ JavaRDD<HoodieRecord> writeRecords2 = jsc().parallelize(records2, 2);
+
+ WriteClientTestUtils.startCommitWithTime(client, instant2);
+ List<WriteStatus> writeStatuses2 = client.insert(writeRecords2,
instant2).collect();
+ assertNoWriteErrors(writeStatuses2);
+ client.commitStats(instant2,
writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+ Option.empty(), metaClient.getCommitActionType());
+
+ // Then: Verify key is NOT carried forward (no rolling metadata configured)
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieInstant commit2 =
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get();
+ HoodieCommitMetadata metadata2 = TimelineUtils.getCommitMetadata(commit2,
metaClient.getActiveTimeline());
+ assertNull(metadata2.getMetadata("some.key"));
+
+ client.close();
+ }
+
+ /**
+ * Test that timeline walkback finds rolling metadata keys from older
commits.
+ */
+ @Test
+ public void testRollingMetadataWithTimelineWalkback() throws IOException {
+ // Given: A table with rolling metadata keys and walkback configured
+ HoodieTableMetaClient metaClient = getHoodieMetaClient(storageConf(),
URI.create(basePath()).getPath(), new Properties());
+ HoodieWriteConfig config = getConfigBuilder(true)
+ .withPath(metaClient.getBasePath())
+ .withRollingMetadataKeys("key1,key2,key3")
+ .withRollingMetadataTimelineLookbackCommits(5)
+ .build();
+
+ HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
+ SparkRDDWriteClient client = getHoodieWriteClient(config);
+
+ // When: First commit with key1 and key2
+ String instant1 = client.createNewInstantTime(false);
+ List<HoodieRecord> records1 = dataGen.generateInserts(instant1, 10);
+ JavaRDD<HoodieRecord> writeRecords1 = jsc().parallelize(records1, 2);
+
+ WriteClientTestUtils.startCommitWithTime(client, instant1);
+ List<WriteStatus> writeStatuses1 = client.insert(writeRecords1,
instant1).collect();
+ assertNoWriteErrors(writeStatuses1);
+
+ Map<String, String> extraMetadata1 = new HashMap<>();
+ extraMetadata1.put("key1", "value1");
+ extraMetadata1.put("key2", "value2");
+ client.commitStats(instant1,
writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+ Option.of(extraMetadata1), metaClient.getCommitActionType());
+
+ // When: Second commit with key3
+ String instant2 = client.createNewInstantTime(false);
+ List<HoodieRecord> records2 = dataGen.generateInserts(instant2, 10);
+ JavaRDD<HoodieRecord> writeRecords2 = jsc().parallelize(records2, 2);
+
+ WriteClientTestUtils.startCommitWithTime(client, instant2);
+ List<WriteStatus> writeStatuses2 = client.insert(writeRecords2,
instant2).collect();
+ assertNoWriteErrors(writeStatuses2);
+
+ Map<String, String> extraMetadata2 = new HashMap<>();
+ extraMetadata2.put("key3", "value3");
+ client.commitStats(instant2,
writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+ Option.of(extraMetadata2), metaClient.getCommitActionType());
+
+ // Then: Verify second commit has all three keys (key3 from current, key1
and key2 from walkback)
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieInstant commit2 =
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get();
+ HoodieCommitMetadata metadata2 = TimelineUtils.getCommitMetadata(commit2,
metaClient.getActiveTimeline());
+ assertEquals("value1", metadata2.getMetadata("key1")); // From commit1 via
walkback
+ assertEquals("value2", metadata2.getMetadata("key2")); // From commit1 via
walkback
+ assertEquals("value3", metadata2.getMetadata("key3")); // From current
commit
+
+ // When: Third commit with no keys (should walk back to find all three)
+ String instant3 = client.createNewInstantTime(false);
+ List<HoodieRecord> records3 = dataGen.generateInserts(instant3, 10);
+ JavaRDD<HoodieRecord> writeRecords3 = jsc().parallelize(records3, 2);
+
+ WriteClientTestUtils.startCommitWithTime(client, instant3);
+ List<WriteStatus> writeStatuses3 = client.insert(writeRecords3,
instant3).collect();
+ assertNoWriteErrors(writeStatuses3);
+ client.commitStats(instant3,
writeStatuses3.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+ Option.empty(), metaClient.getCommitActionType());
+
+ // Then: Verify third commit has all three keys from walkback
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieInstant commit3 =
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get();
+ HoodieCommitMetadata metadata3 = TimelineUtils.getCommitMetadata(commit3,
metaClient.getActiveTimeline());
+ assertEquals("value1", metadata3.getMetadata("key1")); // From commit1 via
walkback
+ assertEquals("value2", metadata3.getMetadata("key2")); // From commit1 via
walkback
+ assertEquals("value3", metadata3.getMetadata("key3")); // From commit2 via
walkback
+
+ client.close();
+ }
+
+ /**
+ * Test that rolling metadata respects the walkback limit.
+ */
+ @Test
+ public void testRollingMetadataWalkbackLimit() throws IOException {
+ // Given: A table with rolling metadata keys and small walkback limit
+ HoodieTableMetaClient metaClient = getHoodieMetaClient(storageConf(),
URI.create(basePath()).getPath(), new Properties());
+ HoodieWriteConfig config = getConfigBuilder(true)
+ .withPath(metaClient.getBasePath())
+ .withRollingMetadataKeys("checkpoint.offset")
+ .withRollingMetadataTimelineLookbackCommits(2) // Only look back 2
commits
+ .build();
+
+ HoodieWriteConfig configNoRollingMetadata = getConfigBuilder(true)
+ .withPath(metaClient.getBasePath())
+ .build();
+
+ HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
+ SparkRDDWriteClient client = getHoodieWriteClient(config);
+
+ String instant = client.createNewInstantTime(false);
+ List<HoodieRecord> records = dataGen.generateInserts(instant, 10);
+ JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 2);
+
+ WriteClientTestUtils.startCommitWithTime(client, instant);
+ List<WriteStatus> writeStatuses = client.insert(writeRecords,
instant).collect();
+ assertNoWriteErrors(writeStatuses);
+ // Only first commit has the rolling metadata key
+ Map<String, String> extraMetadata = new HashMap<>();
+ extraMetadata.put("checkpoint.offset", "original-value");
+ client.commitStats(instant,
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+ Option.of(extraMetadata), metaClient.getCommitActionType());
+
+ // add 4 more commits w/o any rolling metadata.
+ client.close();
+ client = getHoodieWriteClient(configNoRollingMetadata);
+ for (int i = 2; i <= 3; i++) {
+ instant = client.createNewInstantTime(false);
+ records = dataGen.generateInserts(instant, 10);
+ writeRecords = jsc().parallelize(records, 2);
+
+ WriteClientTestUtils.startCommitWithTime(client, instant);
+ writeStatuses = client.insert(writeRecords, instant).collect();
+ assertNoWriteErrors(writeStatuses);
+ client.commitStats(instant,
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+ Option.empty(), metaClient.getCommitActionType());
+ }
+
+ client.close();
+ client = getHoodieWriteClient(config);
+ for (int i = 4; i <= 5; i++) {
+ instant = client.createNewInstantTime(false);
+ records = dataGen.generateInserts(instant, 10);
+ writeRecords = jsc().parallelize(records, 2);
+
+ WriteClientTestUtils.startCommitWithTime(client, instant);
+ writeStatuses = client.insert(writeRecords, instant).collect();
+ assertNoWriteErrors(writeStatuses);
+ client.commitStats(instant,
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+ Option.empty(), metaClient.getCommitActionType());
+ }
+
+ // Then: Verify that commit 2 and 3 have the rolling metadata (within
walkback limit)
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ List<HoodieInstant> commits =
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstantsAsStream()
+ .collect(Collectors.toList());
+
+ HoodieCommitMetadata metadata2 =
TimelineUtils.getCommitMetadata(commits.get(1), metaClient.getActiveTimeline());
+ assertFalse(metadata2.getExtraMetadata().containsKey("checkpoint.offset"));
+
+ HoodieCommitMetadata metadata3 =
TimelineUtils.getCommitMetadata(commits.get(2), metaClient.getActiveTimeline());
+ assertFalse(metadata3.getExtraMetadata().containsKey("checkpoint.offset"));
Review Comment:
<a href="#"><img alt="P2"
src="https://greptile-static-assets.s3.amazonaws.com/badges/p2.svg?v=7"
align="top"></a> **Test comment contradicts its assertions**
The comment on line 336 says _"Verify that commit 2 and 3 have the rolling
metadata (within walkback limit)"_, but both assertions immediately below use
`assertFalse` โ meaning the metadata is **not** present. Commits 2 and 3 were
intentionally written with `configNoRollingMetadata` (no rolling keys
configured), so rolling metadata is never merged for them. The comment should
reflect that:
```suggestion
// Then: Verify that commit 2 and 3 do NOT have the rolling metadata
// (they were made without rolling metadata config, so no carry-forward
happens)
```
As written, anyone reading this test will be confused about whether the
feature is supposed to work for commits 2 and 3 or not.
โ *Greptile*
([original](https://github.com/yihua/hudi/pull/25#discussion_r3048268906))
(source:comment#3048268906)
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestRollingMetadataSpark.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestRollingMetadataSpark extends SparkClientFunctionalTestHarness
{
+
+ /**
+ * Test that rolling metadata is carried forward across inline compaction
commits on a MOR table.
+ * Compaction does not call preCommit, so its commit won't have rolling
metadata. But subsequent
+ * regular writes should walk back past the compaction commit to find
rolling metadata from earlier commits.
+ */
+ @Test
+ public void testRollingMetadataWithInlineCompaction() throws IOException {
+ // Given: A MOR table with rolling metadata and inline compaction after 2
delta commits
+ HoodieTableMetaClient metaClient =
getHoodieMetaClient(HoodieTableType.MERGE_ON_READ);
+ HoodieWriteConfig config = getConfigBuilder(true)
+ .withPath(metaClient.getBasePath())
+ .withRollingMetadataKeys("checkpoint.offset")
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withInlineCompaction(true)
+ .withMaxNumDeltaCommitsBeforeCompaction(2)
+ .build())
+ .build();
+
+ HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
+
+ // When: First delta commit with rolling metadata
+ SparkRDDWriteClient client = getHoodieWriteClient(config);
+ String instant1 = client.createNewInstantTime(false);
+ List<HoodieRecord> records1 = dataGen.generateInserts(instant1, 10);
+ JavaRDD<HoodieRecord> writeRecords1 = jsc().parallelize(records1, 2);
+
+ WriteClientTestUtils.startCommitWithTime(client, instant1);
+ List<WriteStatus> writeStatuses1 = client.insert(writeRecords1,
instant1).collect();
+ assertNoWriteErrors(writeStatuses1);
+
+ Map<String, String> extraMetadata1 = new HashMap<>();
+ extraMetadata1.put("checkpoint.offset", "1000");
+ client.commitStats(instant1,
writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+ Option.of(extraMetadata1), metaClient.getCommitActionType());
+
+ // When: Second delta commit (should trigger inline compaction after this)
+ String instant2 = client.createNewInstantTime(false);
+ List<HoodieRecord> records2 = dataGen.generateUpdates(instant2, 10);
+ JavaRDD<HoodieRecord> writeRecords2 = jsc().parallelize(records2, 2);
+
+ WriteClientTestUtils.startCommitWithTime(client, instant2);
+ List<WriteStatus> writeStatuses2 = client.upsert(writeRecords2,
instant2).collect();
+ assertNoWriteErrors(writeStatuses2);
+ client.commitStats(instant2,
writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+ Option.empty(), metaClient.getCommitActionType());
+
+ // Then: Verify compaction was triggered (should see a commit action on
the timeline)
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTimeline writeTimeline =
metaClient.getActiveTimeline().getWriteTimeline().filterCompletedInstants();
+ boolean hasCompaction = writeTimeline.getInstantsAsStream()
+ .anyMatch(i -> i.getAction().equals(HoodieTimeline.COMMIT_ACTION));
+ assertTrue(hasCompaction, "Inline compaction should have been triggered");
+
+ // When: Third delta commit after compaction, no rolling metadata provided
+ String instant3 = client.createNewInstantTime(false);
+ List<HoodieRecord> records3 = dataGen.generateInserts(instant3, 10);
+ JavaRDD<HoodieRecord> writeRecords3 = jsc().parallelize(records3, 2);
+
+ WriteClientTestUtils.startCommitWithTime(client, instant3);
+ List<WriteStatus> writeStatuses3 = client.insert(writeRecords3,
instant3).collect();
+ assertNoWriteErrors(writeStatuses3);
+ client.commitStats(instant3,
writeStatuses3.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+ Option.empty(), metaClient.getCommitActionType());
+
+ // Then: Rolling metadata should still be found by walking back past the
compaction commit
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieInstant lastDeltaCommit =
metaClient.getActiveTimeline().getDeltaCommitTimeline()
+ .filterCompletedInstants().lastInstant().get();
+ HoodieCommitMetadata metadata3 =
TimelineUtils.getCommitMetadata(lastDeltaCommit,
metaClient.getActiveTimeline());
+ assertEquals("1000", metadata3.getMetadata("checkpoint.offset"),
+ "Rolling metadata should be carried forward past compaction commit");
+
+ client.close();
+ }
+
+
+ /**
+ * Test that rolling metadata is carried forward into clustering commits.
+ * With OCC enabled, clustering calls preCommit which merges rolling
metadata.
+ */
+ @Test
+ public void testRollingMetadataWithInlineClustering() throws IOException {
+ // Given: A CoW table with rolling metadata and inline clustering after 1
commit
+ HoodieTableMetaClient metaClient =
getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE);
+ HoodieWriteConfig config = getConfigBuilder(true)
+ .withPath(metaClient.getBasePath())
+ .withRollingMetadataKeys("checkpoint.offset,checkpoint.partition")
+ .withClusteringConfig(HoodieClusteringConfig.newBuilder()
+ .withClusteringMaxNumGroups(10)
+ .withClusteringTargetPartitions(0)
+ .withInlineClustering(true)
+ .withInlineClusteringNumCommits(1)
+ .build())
+ .build();
+
+ HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
+
+ // When: First commit with rolling metadata
+ SparkRDDWriteClient client = getHoodieWriteClient(config);
+ String instant1 = client.createNewInstantTime(false);
+ List<HoodieRecord> records1 = dataGen.generateInserts(instant1, 100);
+ JavaRDD<HoodieRecord> writeRecords1 = jsc().parallelize(records1, 2);
+
+ WriteClientTestUtils.startCommitWithTime(client, instant1);
+ List<WriteStatus> writeStatuses1 = client.insert(writeRecords1,
instant1).collect();
+ assertNoWriteErrors(writeStatuses1);
+
+ Map<String, String> extraMetadata1 = new HashMap<>();
+ extraMetadata1.put("checkpoint.offset", "5000");
+ extraMetadata1.put("checkpoint.partition", "p1");
+ client.commitStats(instant1,
writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+ Option.of(extraMetadata1), metaClient.getCommitActionType());
+
+ // When: Second commit with more inserts (should trigger inline clustering
after commit)
+ String instant2 = client.createNewInstantTime(false);
+ List<HoodieRecord> records2 = dataGen.generateInserts(instant2, 100);
+ JavaRDD<HoodieRecord> writeRecords2 = jsc().parallelize(records2, 2);
+
+ WriteClientTestUtils.startCommitWithTime(client, instant2);
+ List<WriteStatus> writeStatuses2 = client.insert(writeRecords2,
instant2).collect();
+ assertNoWriteErrors(writeStatuses2);
+
+ Map<String, String> extraMetadata2 = new HashMap<>();
+ extraMetadata2.put("checkpoint.offset", "6000");
+ client.commitStats(instant2,
writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+ Option.of(extraMetadata2), metaClient.getCommitActionType());
+
+ // Then: Verify clustering was triggered
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTimeline writeTimeline =
metaClient.getActiveTimeline().getWriteTimeline().filterCompletedInstants();
+ boolean hasClustering = writeTimeline.getInstantsAsStream()
+ .anyMatch(i ->
i.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)
+ || i.getAction().equals(HoodieTimeline.CLUSTERING_ACTION));
+
+ if (hasClustering) {
+ // Find the clustering commit and verify it has rolling metadata
+ HoodieInstant clusteringInstant = writeTimeline.getInstantsAsStream()
+ .filter(i ->
i.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)
+ || i.getAction().equals(HoodieTimeline.CLUSTERING_ACTION))
+ .reduce((a, b) -> b).get();
+ HoodieCommitMetadata clusteringMetadata =
TimelineUtils.getCommitMetadata(clusteringInstant,
metaClient.getActiveTimeline());
+ assertEquals("6000", clusteringMetadata.getMetadata("checkpoint.offset"),
+ "Clustering commit should carry forward latest rolling metadata");
+ assertEquals("p1",
clusteringMetadata.getMetadata("checkpoint.partition"),
+ "Clustering commit should carry forward rolling metadata from
earlier commits");
+ }
Review Comment:
<a href="#"><img alt="P2"
src="https://greptile-static-assets.s3.amazonaws.com/badges/p2.svg?v=7"
align="top"></a> **Key clustering assertions may be silently skipped**
The assertions for the clustering commit's rolling metadata are wrapped
inside `if (hasClustering)`. If clustering is not triggered for any reason
(e.g., not enough data, config change), the block is skipped and the test
passes without verifying the behaviour it is supposed to test. Consider using
`assertTrue(hasClustering, "Inline clustering should have been triggered")`
before the assertions so that a missing clustering commit fails the test rather
than silently succeeding:
```suggestion
assertTrue(hasClustering, "Inline clustering should have been
triggered");
// Find the clustering commit and verify it has rolling metadata
HoodieInstant clusteringInstant = writeTimeline.getInstantsAsStream()
.filter(i ->
i.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)
|| i.getAction().equals(HoodieTimeline.CLUSTERING_ACTION))
.reduce((a, b) -> b).get();
HoodieCommitMetadata clusteringMetadata =
TimelineUtils.getCommitMetadata(clusteringInstant,
metaClient.getActiveTimeline());
assertEquals("6000", clusteringMetadata.getMetadata("checkpoint.offset"),
"Clustering commit should carry forward latest rolling metadata");
assertEquals("p1",
clusteringMetadata.getMetadata("checkpoint.partition"),
"Clustering commit should carry forward rolling metadata from
earlier commits");
```
โ *Greptile*
([original](https://github.com/yihua/hudi/pull/25#discussion_r3048268952))
(source:comment#3048268952)
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestRollingMetadataSpark.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestRollingMetadataSpark extends SparkClientFunctionalTestHarness
{
+
+ /**
+ * Test that rolling metadata is carried forward across inline compaction
commits on a MOR table.
+ * Compaction does not call preCommit, so its commit won't have rolling
metadata. But subsequent
+ * regular writes should walk back past the compaction commit to find
rolling metadata from earlier commits.
+ */
+ @Test
+ public void testRollingMetadataWithInlineCompaction() throws IOException {
+ // Given: A MOR table with rolling metadata and inline compaction after 2
delta commits
+ HoodieTableMetaClient metaClient =
getHoodieMetaClient(HoodieTableType.MERGE_ON_READ);
+ HoodieWriteConfig config = getConfigBuilder(true)
+ .withPath(metaClient.getBasePath())
+ .withRollingMetadataKeys("checkpoint.offset")
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withInlineCompaction(true)
+ .withMaxNumDeltaCommitsBeforeCompaction(2)
+ .build())
+ .build();
+
+ HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
+
+ // When: First delta commit with rolling metadata
+ SparkRDDWriteClient client = getHoodieWriteClient(config);
+ String instant1 = client.createNewInstantTime(false);
+ List<HoodieRecord> records1 = dataGen.generateInserts(instant1, 10);
+ JavaRDD<HoodieRecord> writeRecords1 = jsc().parallelize(records1, 2);
+
+ WriteClientTestUtils.startCommitWithTime(client, instant1);
+ List<WriteStatus> writeStatuses1 = client.insert(writeRecords1,
instant1).collect();
+ assertNoWriteErrors(writeStatuses1);
+
+ Map<String, String> extraMetadata1 = new HashMap<>();
+ extraMetadata1.put("checkpoint.offset", "1000");
+ client.commitStats(instant1,
writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+ Option.of(extraMetadata1), metaClient.getCommitActionType());
+
+ // When: Second delta commit (should trigger inline compaction after this)
+ String instant2 = client.createNewInstantTime(false);
+ List<HoodieRecord> records2 = dataGen.generateUpdates(instant2, 10);
+ JavaRDD<HoodieRecord> writeRecords2 = jsc().parallelize(records2, 2);
+
+ WriteClientTestUtils.startCommitWithTime(client, instant2);
+ List<WriteStatus> writeStatuses2 = client.upsert(writeRecords2,
instant2).collect();
+ assertNoWriteErrors(writeStatuses2);
+ client.commitStats(instant2,
writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+ Option.empty(), metaClient.getCommitActionType());
+
+ // Then: Verify compaction was triggered (should see a commit action on
the timeline)
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTimeline writeTimeline =
metaClient.getActiveTimeline().getWriteTimeline().filterCompletedInstants();
+ boolean hasCompaction = writeTimeline.getInstantsAsStream()
+ .anyMatch(i -> i.getAction().equals(HoodieTimeline.COMMIT_ACTION));
+ assertTrue(hasCompaction, "Inline compaction should have been triggered");
+
+ // When: Third delta commit after compaction, no rolling metadata provided
+ String instant3 = client.createNewInstantTime(false);
+ List<HoodieRecord> records3 = dataGen.generateInserts(instant3, 10);
+ JavaRDD<HoodieRecord> writeRecords3 = jsc().parallelize(records3, 2);
+
+ WriteClientTestUtils.startCommitWithTime(client, instant3);
+ List<WriteStatus> writeStatuses3 = client.insert(writeRecords3,
instant3).collect();
+ assertNoWriteErrors(writeStatuses3);
+ client.commitStats(instant3,
writeStatuses3.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+ Option.empty(), metaClient.getCommitActionType());
+
+ // Then: Rolling metadata should still be found by walking back past the
compaction commit
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieInstant lastDeltaCommit =
metaClient.getActiveTimeline().getDeltaCommitTimeline()
+ .filterCompletedInstants().lastInstant().get();
+ HoodieCommitMetadata metadata3 =
TimelineUtils.getCommitMetadata(lastDeltaCommit,
metaClient.getActiveTimeline());
+ assertEquals("1000", metadata3.getMetadata("checkpoint.offset"),
+ "Rolling metadata should be carried forward past compaction commit");
+
+ client.close();
+ }
+
+
+ /**
+ * Test that rolling metadata is carried forward into clustering commits.
+ * With OCC enabled, clustering calls preCommit which merges rolling
metadata.
+ */
+ @Test
+ public void testRollingMetadataWithInlineClustering() throws IOException {
+ // Given: A CoW table with rolling metadata and inline clustering after 1
commit
+ HoodieTableMetaClient metaClient =
getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE);
+ HoodieWriteConfig config = getConfigBuilder(true)
+ .withPath(metaClient.getBasePath())
+ .withRollingMetadataKeys("checkpoint.offset,checkpoint.partition")
+ .withClusteringConfig(HoodieClusteringConfig.newBuilder()
+ .withClusteringMaxNumGroups(10)
+ .withClusteringTargetPartitions(0)
+ .withInlineClustering(true)
+ .withInlineClusteringNumCommits(1)
+ .build())
Review Comment:
_โ ๏ธ Potential issue_ | _๐ก Minor_
**Make the clustering path deterministic in this regression test.**
With `withInlineClusteringNumCommits(1)` and the `if (hasClustering)` guard,
this test can still pass without ever hitting the new clustering
`preCommit(...)` path. Prefer a setup that clusters after the second write and
assert `hasClustering` unconditionally.
<details>
<summary>๐งช One way to tighten the test</summary>
```diff
.withClusteringConfig(HoodieClusteringConfig.newBuilder()
.withClusteringMaxNumGroups(10)
.withClusteringTargetPartitions(0)
.withInlineClustering(true)
- .withInlineClusteringNumCommits(1)
+ .withInlineClusteringNumCommits(2)
.build())
.build();
@@
- if (hasClustering) {
- // Find the clustering commit and verify it has rolling metadata
- HoodieInstant clusteringInstant = writeTimeline.getInstantsAsStream()
- .filter(i ->
i.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)
- || i.getAction().equals(HoodieTimeline.CLUSTERING_ACTION))
- .reduce((a, b) -> b).get();
- HoodieCommitMetadata clusteringMetadata =
TimelineUtils.getCommitMetadata(clusteringInstant,
metaClient.getActiveTimeline());
- assertEquals("6000",
clusteringMetadata.getMetadata("checkpoint.offset"),
- "Clustering commit should carry forward latest rolling metadata");
- assertEquals("p1",
clusteringMetadata.getMetadata("checkpoint.partition"),
- "Clustering commit should carry forward rolling metadata from
earlier commits");
- }
+ assertTrue(hasClustering, "Inline clustering should have been
triggered");
+ HoodieInstant clusteringInstant = writeTimeline.getInstantsAsStream()
+ .filter(i ->
i.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)
+ || i.getAction().equals(HoodieTimeline.CLUSTERING_ACTION))
+ .reduce((a, b) -> b).get();
+ HoodieCommitMetadata clusteringMetadata =
TimelineUtils.getCommitMetadata(clusteringInstant,
metaClient.getActiveTimeline());
+ assertEquals("6000",
clusteringMetadata.getMetadata("checkpoint.offset"),
+ "Clustering commit should carry forward latest rolling metadata");
+ assertEquals("p1",
clusteringMetadata.getMetadata("checkpoint.partition"),
+ "Clustering commit should carry forward rolling metadata from
earlier commits");
```
</details>
Also applies to: 178-196
<details>
<summary>๐ค Prompt for AI Agents</summary>
```
Verify each finding against the current code and only fix it if needed.
In
`@hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestRollingMetadataSpark.java`
around lines 138 - 143, The test uses withInlineClusteringNumCommits(1) and a
conditional guard if (hasClustering) that allows the test to pass without
exercising the clustering preCommit path; change the test setup in
TestRollingMetadataSpark to make clustering deterministic by configuring
withInlineClusteringNumCommits(2) (or otherwise ensure clustering runs after
the
second write) and remove the conditional hasClustering guard so the test
always
asserts clustering behavior; update assertions to unconditionally verify
clustering ran (e.g., that preCommit path effects are visible) and apply the
same change to the other occurrence around lines 178-196 where
withClusteringConfig/withInlineClusteringNumCommits and hasClustering are
used.
```
</details>
<!--
fingerprinting:phantom:medusa:grasshopper:15c30537-b1db-44ce-8f21-20ab6360bab0
-->
<!-- This is an auto-generated comment by CodeRabbit -->
โ *CodeRabbit*
([original](https://github.com/yihua/hudi/pull/25#discussion_r3048270716))
(source:comment#3048270716)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]