yihua commented on code in PR #18448:
URL: https://github.com/apache/hudi/pull/18448#discussion_r3095294402
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1245,25 +1245,57 @@ public boolean rollback(final String commitInstantTime,
Option<HoodiePendingRoll
final Timer.Context timerContext = this.metrics.getRollbackCtx();
try {
HoodieTable table = createTable(config, storageConf, skipVersionCheck);
+
Option<HoodieInstant> commitInstantOpt =
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
.filter(instant -> EQUALS.test(instant.requestedTime(),
commitInstantTime))
.findFirst());
- Option<HoodieRollbackPlan> rollbackPlanOption;
- String rollbackInstantTime;
- if (pendingRollbackInfo.isPresent()) {
+ Option<HoodieRollbackPlan> rollbackPlanOption = Option.empty();
+ Option<String> rollbackInstantTimeOpt;
+ if (!config.isExclusiveRollbackEnabled() &&
pendingRollbackInfo.isPresent()) {
+ // Only case when lock can be skipped is if exclusive rollback is
disabled and
+ // there is a pending rollback info available
rollbackPlanOption =
Option.of(pendingRollbackInfo.get().getRollbackPlan());
- rollbackInstantTime =
pendingRollbackInfo.get().getRollbackInstant().requestedTime();
+ rollbackInstantTimeOpt =
Option.of(pendingRollbackInfo.get().getRollbackInstant().requestedTime());
} else {
- if (commitInstantOpt.isEmpty()) {
- log.error("Cannot find instant {} in the timeline of table {} for
rollback", commitInstantTime, config.getBasePath());
- return false;
- }
if (!skipLocking) {
txnManager.beginStateChange(Option.empty(), Option.empty());
}
try {
- rollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() ->
createNewInstantTime(false));
- rollbackPlanOption = table.scheduleRollback(context,
rollbackInstantTime, commitInstantOpt.get(), false,
config.shouldRollbackUsingMarkers(), false);
+ if (config.isExclusiveRollbackEnabled()) {
+ // Reload meta client within the lock so that the timeline is
latest while executing pending rollback
+ table.getMetaClient().reloadActiveTimeline();
+ Option<HoodiePendingRollbackInfo> pendingRollbackOpt =
getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
+ rollbackInstantTimeOpt = pendingRollbackOpt.map(info ->
info.getRollbackInstant().requestedTime());
+ if (pendingRollbackOpt.isPresent()) {
+ // If pending rollback and heartbeat is expired, writer should
start heartbeat and execute rollback
+ if
(heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) {
+ LOG.info("Heartbeat expired for rollback instant {}, executing
rollback now", rollbackInstantTimeOpt);
+ HeartbeatUtils.deleteHeartbeatFile(storage, basePath,
rollbackInstantTimeOpt.get(), config);
+ heartbeatClient.start(rollbackInstantTimeOpt.get());
+ rollbackPlanOption =
pendingRollbackOpt.map(HoodiePendingRollbackInfo::getRollbackPlan);
+ } else {
+ // Heartbeat is still active for another writer, ignore
rollback for now
+ // TODO: ABCDEFGHI revisit return value
+ return false;
+ }
+ } else if
(Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
+ .filter(instant -> EQUALS.test(instant.requestedTime(),
commitInstantTime))
+ .findFirst()).isEmpty()) {
+ // Assume rollback is already executed since the commit is no
longer present in the timeline
+ return false;
+ }
+ } else {
+ // Case where no pending rollback is present,
+ if (commitInstantOpt.isEmpty()) {
+ log.error("Cannot find instant {} in the timeline of table {}
for rollback", commitInstantTime, config.getBasePath());
+ return false;
+ }
+ rollbackInstantTimeOpt = suppliedRollbackInstantTime.or(() ->
Option.of(createNewInstantTime(false)));
+ if (config.isExclusiveRollbackEnabled()) {
+ heartbeatClient.start(rollbackInstantTimeOpt.get());
+ }
+ rollbackPlanOption = table.scheduleRollback(context,
rollbackInstantTimeOpt.get(), commitInstantOpt.get(), false,
config.shouldRollbackUsingMarkers(), false);
+ }
} finally {
if (!skipLocking) {
txnManager.endStateChange(Option.empty());
Review Comment:
🤖 [Line 1306] nit: the comment 'Case where no pending rollback is present,'
is incomplete (missing period and context); consider clarifying that exclusive
rollback is also disabled here.
<sub><i>- Generated by an AI agent and may contain mistakes. Please verify
any suggestions before applying.</i></sub>
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java:
##########
@@ -825,6 +827,202 @@ public void testRollbackWithRequestedRollbackPlan(boolean
enableMetadataTable, b
}
}
+ /**
+ * Test exclusive rollback with multi-writer: when a pending rollback exists
with an expired heartbeat
+ * (no heartbeat file present → returns 0L → always expired), the current
writer should take ownership
+ * and execute the rollback.
+ */
+ @Test
+ public void testExclusiveRollbackPendingRollbackHeartbeatExpired() throws
Exception {
+ final String p1 = "2016/05/01";
+ final String p2 = "2016/05/02";
+ final String commitTime1 = "20160501010101";
+ final String commitTime2 = "20160502020601";
+ final String commitTime3 = "20160506030611";
Review Comment:
🤖 nit: the double-brace initialization (anonymous class with initializer
block) for creating maps is verbose; if Java 9+ is available, consider using
`Map.of(p1, "id11", p2, "id12")` instead.
<sub><i>- Generated by an AI agent and may contain mistakes. Please verify
any suggestions before applying.</i></sub>
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java:
##########
@@ -825,6 +827,202 @@ public void testRollbackWithRequestedRollbackPlan(boolean
enableMetadataTable, b
}
}
+ /**
+ * Test exclusive rollback with multi-writer: when a pending rollback exists
with an expired heartbeat
+ * (no heartbeat file present → returns 0L → always expired), the current
writer should take ownership
+ * and execute the rollback.
+ */
+ @Test
Review Comment:
🤖 nit: test variable names like `p1`, `p2` and `id11`, `id12` are generic;
consider more descriptive names like `partition1`, `partition2`, `fileId1_1` to
improve test clarity.
<sub><i>- Generated by an AI agent and may contain mistakes. Please verify
any suggestions before applying.</i></sub>
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1245,25 +1245,57 @@ public boolean rollback(final String commitInstantTime,
Option<HoodiePendingRoll
final Timer.Context timerContext = this.metrics.getRollbackCtx();
try {
HoodieTable table = createTable(config, storageConf, skipVersionCheck);
+
Option<HoodieInstant> commitInstantOpt =
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
.filter(instant -> EQUALS.test(instant.requestedTime(),
commitInstantTime))
.findFirst());
- Option<HoodieRollbackPlan> rollbackPlanOption;
- String rollbackInstantTime;
- if (pendingRollbackInfo.isPresent()) {
+ Option<HoodieRollbackPlan> rollbackPlanOption = Option.empty();
+ Option<String> rollbackInstantTimeOpt;
+ if (!config.isExclusiveRollbackEnabled() &&
pendingRollbackInfo.isPresent()) {
+ // Only case when lock can be skipped is if exclusive rollback is
disabled and
+ // there is a pending rollback info available
rollbackPlanOption =
Option.of(pendingRollbackInfo.get().getRollbackPlan());
- rollbackInstantTime =
pendingRollbackInfo.get().getRollbackInstant().requestedTime();
+ rollbackInstantTimeOpt =
Option.of(pendingRollbackInfo.get().getRollbackInstant().requestedTime());
} else {
- if (commitInstantOpt.isEmpty()) {
- log.error("Cannot find instant {} in the timeline of table {} for
rollback", commitInstantTime, config.getBasePath());
- return false;
- }
if (!skipLocking) {
txnManager.beginStateChange(Option.empty(), Option.empty());
}
try {
- rollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() ->
createNewInstantTime(false));
- rollbackPlanOption = table.scheduleRollback(context,
rollbackInstantTime, commitInstantOpt.get(), false,
config.shouldRollbackUsingMarkers(), false);
+ if (config.isExclusiveRollbackEnabled()) {
+ // Reload meta client within the lock so that the timeline is
latest while executing pending rollback
+ table.getMetaClient().reloadActiveTimeline();
+ Option<HoodiePendingRollbackInfo> pendingRollbackOpt =
getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
+ rollbackInstantTimeOpt = pendingRollbackOpt.map(info ->
info.getRollbackInstant().requestedTime());
+ if (pendingRollbackOpt.isPresent()) {
+ // If pending rollback and heartbeat is expired, writer should
start heartbeat and execute rollback
+ if
(heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) {
+ LOG.info("Heartbeat expired for rollback instant {}, executing
rollback now", rollbackInstantTimeOpt);
+ HeartbeatUtils.deleteHeartbeatFile(storage, basePath,
rollbackInstantTimeOpt.get(), config);
+ heartbeatClient.start(rollbackInstantTimeOpt.get());
+ rollbackPlanOption =
pendingRollbackOpt.map(HoodiePendingRollbackInfo::getRollbackPlan);
+ } else {
+ // Heartbeat is still active for another writer, ignore
rollback for now
+ // TODO: ABCDEFGHI revisit return value
+ return false;
+ }
+ } else if
(Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
+ .filter(instant -> EQUALS.test(instant.requestedTime(),
commitInstantTime))
+ .findFirst()).isEmpty()) {
Review Comment:
🤖 nit: passing `rollbackInstantTimeOpt` (an Option) to LOG.info will print
'Option[...]' instead of the value; use `.get()` or extract the string first
for cleaner logs.
<sub><i>- Generated by an AI agent and may contain mistakes. Please verify
any suggestions before applying.</i></sub>
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1245,25 +1245,57 @@ public boolean rollback(final String commitInstantTime,
Option<HoodiePendingRoll
final Timer.Context timerContext = this.metrics.getRollbackCtx();
try {
HoodieTable table = createTable(config, storageConf, skipVersionCheck);
+
Option<HoodieInstant> commitInstantOpt =
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
.filter(instant -> EQUALS.test(instant.requestedTime(),
commitInstantTime))
.findFirst());
- Option<HoodieRollbackPlan> rollbackPlanOption;
- String rollbackInstantTime;
- if (pendingRollbackInfo.isPresent()) {
+ Option<HoodieRollbackPlan> rollbackPlanOption = Option.empty();
+ Option<String> rollbackInstantTimeOpt;
+ if (!config.isExclusiveRollbackEnabled() &&
pendingRollbackInfo.isPresent()) {
+ // Only case when lock can be skipped is if exclusive rollback is
disabled and
+ // there is a pending rollback info available
rollbackPlanOption =
Option.of(pendingRollbackInfo.get().getRollbackPlan());
- rollbackInstantTime =
pendingRollbackInfo.get().getRollbackInstant().requestedTime();
+ rollbackInstantTimeOpt =
Option.of(pendingRollbackInfo.get().getRollbackInstant().requestedTime());
} else {
- if (commitInstantOpt.isEmpty()) {
- log.error("Cannot find instant {} in the timeline of table {} for
rollback", commitInstantTime, config.getBasePath());
- return false;
- }
if (!skipLocking) {
txnManager.beginStateChange(Option.empty(), Option.empty());
}
try {
- rollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() ->
createNewInstantTime(false));
- rollbackPlanOption = table.scheduleRollback(context,
rollbackInstantTime, commitInstantOpt.get(), false,
config.shouldRollbackUsingMarkers(), false);
+ if (config.isExclusiveRollbackEnabled()) {
+ // Reload meta client within the lock so that the timeline is
latest while executing pending rollback
+ table.getMetaClient().reloadActiveTimeline();
+ Option<HoodiePendingRollbackInfo> pendingRollbackOpt =
getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
+ rollbackInstantTimeOpt = pendingRollbackOpt.map(info ->
info.getRollbackInstant().requestedTime());
+ if (pendingRollbackOpt.isPresent()) {
+ // If pending rollback and heartbeat is expired, writer should
start heartbeat and execute rollback
+ if
(heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) {
+ LOG.info("Heartbeat expired for rollback instant {}, executing
rollback now", rollbackInstantTimeOpt);
+ HeartbeatUtils.deleteHeartbeatFile(storage, basePath,
rollbackInstantTimeOpt.get(), config);
+ heartbeatClient.start(rollbackInstantTimeOpt.get());
+ rollbackPlanOption =
pendingRollbackOpt.map(HoodiePendingRollbackInfo::getRollbackPlan);
+ } else {
+ // Heartbeat is still active for another writer, ignore
rollback for now
+ // TODO: ABCDEFGHI revisit return value
+ return false;
+ }
+ } else if
(Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
+ .filter(instant -> EQUALS.test(instant.requestedTime(),
commitInstantTime))
+ .findFirst()).isEmpty()) {
+ // Assume rollback is already executed since the commit is no
longer present in the timeline
+ return false;
+ }
+ } else {
+ // Case where no pending rollback is present,
+ if (commitInstantOpt.isEmpty()) {
+ log.error("Cannot find instant {} in the timeline of table {}
for rollback", commitInstantTime, config.getBasePath());
+ return false;
+ }
+ rollbackInstantTimeOpt = suppliedRollbackInstantTime.or(() ->
Option.of(createNewInstantTime(false)));
+ if (config.isExclusiveRollbackEnabled()) {
+ heartbeatClient.start(rollbackInstantTimeOpt.get());
+ }
Review Comment:
🤖 nit: the TODO comment contains 'ABCDEFGHI' which looks like a placeholder;
consider replacing with a real issue reference (e.g., HUDI-#### or a meaningful
comment).
<sub><i>- Generated by an AI agent and may contain mistakes. Please verify
any suggestions before applying.</i></sub>
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1245,25 +1245,57 @@ public boolean rollback(final String commitInstantTime,
Option<HoodiePendingRoll
final Timer.Context timerContext = this.metrics.getRollbackCtx();
try {
HoodieTable table = createTable(config, storageConf, skipVersionCheck);
+
Option<HoodieInstant> commitInstantOpt =
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
.filter(instant -> EQUALS.test(instant.requestedTime(),
commitInstantTime))
.findFirst());
- Option<HoodieRollbackPlan> rollbackPlanOption;
- String rollbackInstantTime;
- if (pendingRollbackInfo.isPresent()) {
+ Option<HoodieRollbackPlan> rollbackPlanOption = Option.empty();
+ Option<String> rollbackInstantTimeOpt;
+ if (!config.isExclusiveRollbackEnabled() &&
pendingRollbackInfo.isPresent()) {
+ // Only case when lock can be skipped is if exclusive rollback is
disabled and
+ // there is a pending rollback info available
rollbackPlanOption =
Option.of(pendingRollbackInfo.get().getRollbackPlan());
- rollbackInstantTime =
pendingRollbackInfo.get().getRollbackInstant().requestedTime();
+ rollbackInstantTimeOpt =
Option.of(pendingRollbackInfo.get().getRollbackInstant().requestedTime());
} else {
- if (commitInstantOpt.isEmpty()) {
- log.error("Cannot find instant {} in the timeline of table {} for
rollback", commitInstantTime, config.getBasePath());
- return false;
- }
if (!skipLocking) {
txnManager.beginStateChange(Option.empty(), Option.empty());
}
try {
- rollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() ->
createNewInstantTime(false));
- rollbackPlanOption = table.scheduleRollback(context,
rollbackInstantTime, commitInstantOpt.get(), false,
config.shouldRollbackUsingMarkers(), false);
+ if (config.isExclusiveRollbackEnabled()) {
+ // Reload meta client within the lock so that the timeline is
latest while executing pending rollback
+ table.getMetaClient().reloadActiveTimeline();
+ Option<HoodiePendingRollbackInfo> pendingRollbackOpt =
getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
+ rollbackInstantTimeOpt = pendingRollbackOpt.map(info ->
info.getRollbackInstant().requestedTime());
+ if (pendingRollbackOpt.isPresent()) {
+ // If pending rollback and heartbeat is expired, writer should
start heartbeat and execute rollback
+ if
(heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) {
+ LOG.info("Heartbeat expired for rollback instant {}, executing
rollback now", rollbackInstantTimeOpt);
+ HeartbeatUtils.deleteHeartbeatFile(storage, basePath,
rollbackInstantTimeOpt.get(), config);
+ heartbeatClient.start(rollbackInstantTimeOpt.get());
+ rollbackPlanOption =
pendingRollbackOpt.map(HoodiePendingRollbackInfo::getRollbackPlan);
+ } else {
+ // Heartbeat is still active for another writer, ignore
rollback for now
+ // TODO: ABCDEFGHI revisit return value
+ return false;
+ }
+ } else if
(Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
+ .filter(instant -> EQUALS.test(instant.requestedTime(),
commitInstantTime))
+ .findFirst()).isEmpty()) {
+ // Assume rollback is already executed since the commit is no
longer present in the timeline
+ return false;
+ }
+ } else {
+ // Case where no pending rollback is present,
+ if (commitInstantOpt.isEmpty()) {
+ log.error("Cannot find instant {} in the timeline of table {}
for rollback", commitInstantTime, config.getBasePath());
+ return false;
+ }
+ rollbackInstantTimeOpt = suppliedRollbackInstantTime.or(() ->
Option.of(createNewInstantTime(false)));
+ if (config.isExclusiveRollbackEnabled()) {
+ heartbeatClient.start(rollbackInstantTimeOpt.get());
+ }
+ rollbackPlanOption = table.scheduleRollback(context,
rollbackInstantTimeOpt.get(), commitInstantOpt.get(), false,
config.shouldRollbackUsingMarkers(), false);
+ }
} finally {
Review Comment:
🤖 nit: this code duplicates the commit-in-timeline check from lines
1254-1257; consider extracting into a helper method or reusing the earlier
result.
<sub><i>- Generated by an AI agent and may contain mistakes. Please verify
any suggestions before applying.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]