yihua commented on code in PR #18448:
URL: https://github.com/apache/hudi/pull/18448#discussion_r3095366301
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1245,25 +1245,57 @@ public boolean rollback(final String commitInstantTime,
Option<HoodiePendingRoll
final Timer.Context timerContext = this.metrics.getRollbackCtx();
try {
HoodieTable table = createTable(config, storageConf, skipVersionCheck);
+
Option<HoodieInstant> commitInstantOpt =
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
.filter(instant -> EQUALS.test(instant.requestedTime(),
commitInstantTime))
.findFirst());
- Option<HoodieRollbackPlan> rollbackPlanOption;
- String rollbackInstantTime;
- if (pendingRollbackInfo.isPresent()) {
+ Option<HoodieRollbackPlan> rollbackPlanOption = Option.empty();
+ Option<String> rollbackInstantTimeOpt;
+ if (!config.isExclusiveRollbackEnabled() &&
pendingRollbackInfo.isPresent()) {
+ // Only case when lock can be skipped is if exclusive rollback is
disabled and
+ // there is a pending rollback info available
rollbackPlanOption =
Option.of(pendingRollbackInfo.get().getRollbackPlan());
- rollbackInstantTime =
pendingRollbackInfo.get().getRollbackInstant().requestedTime();
+ rollbackInstantTimeOpt =
Option.of(pendingRollbackInfo.get().getRollbackInstant().requestedTime());
} else {
- if (commitInstantOpt.isEmpty()) {
- log.error("Cannot find instant {} in the timeline of table {} for
rollback", commitInstantTime, config.getBasePath());
- return false;
- }
if (!skipLocking) {
txnManager.beginStateChange(Option.empty(), Option.empty());
}
try {
- rollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() ->
createNewInstantTime(false));
- rollbackPlanOption = table.scheduleRollback(context,
rollbackInstantTime, commitInstantOpt.get(), false,
config.shouldRollbackUsingMarkers(), false);
+ if (config.isExclusiveRollbackEnabled()) {
+ // Reload meta client within the lock so that the timeline is
latest while executing pending rollback
+ table.getMetaClient().reloadActiveTimeline();
+ Option<HoodiePendingRollbackInfo> pendingRollbackOpt =
getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
+ rollbackInstantTimeOpt = pendingRollbackOpt.map(info ->
info.getRollbackInstant().requestedTime());
+ if (pendingRollbackOpt.isPresent()) {
+ // If pending rollback and heartbeat is expired, writer should
start heartbeat and execute rollback
+ if
(heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) {
Review Comment:
🤖 When `isExclusiveRollbackEnabled()` is true and no pending rollback exists
but the commit IS in the timeline, this block falls through with
`rollbackPlanOption` still `Option.empty()` and `rollbackInstantTimeOpt` also
empty. No rollback plan is ever scheduled, so the code hits the `else` branch
at line ~1297 and throws `HoodieRollbackException`. This is the "first writer"
scenario — someone needs to create the rollback plan.
The `else` branch below (line ~1288) has a dead
`isExclusiveRollbackEnabled()` check that suggests the scheduling logic was
intended to be shared, but due to the if/else structure it's unreachable when
exclusive rollback is enabled.
<sub><i>- Generated by an AI agent and may contain mistakes. Please verify
any suggestions before applying.</i></sub>
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1245,25 +1245,57 @@ public boolean rollback(final String commitInstantTime,
Option<HoodiePendingRoll
final Timer.Context timerContext = this.metrics.getRollbackCtx();
try {
HoodieTable table = createTable(config, storageConf, skipVersionCheck);
+
Option<HoodieInstant> commitInstantOpt =
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
.filter(instant -> EQUALS.test(instant.requestedTime(),
commitInstantTime))
.findFirst());
- Option<HoodieRollbackPlan> rollbackPlanOption;
- String rollbackInstantTime;
- if (pendingRollbackInfo.isPresent()) {
+ Option<HoodieRollbackPlan> rollbackPlanOption = Option.empty();
+ Option<String> rollbackInstantTimeOpt;
+ if (!config.isExclusiveRollbackEnabled() &&
pendingRollbackInfo.isPresent()) {
+ // Only case when lock can be skipped is if exclusive rollback is
disabled and
+ // there is a pending rollback info available
rollbackPlanOption =
Option.of(pendingRollbackInfo.get().getRollbackPlan());
- rollbackInstantTime =
pendingRollbackInfo.get().getRollbackInstant().requestedTime();
+ rollbackInstantTimeOpt =
Option.of(pendingRollbackInfo.get().getRollbackInstant().requestedTime());
} else {
- if (commitInstantOpt.isEmpty()) {
- log.error("Cannot find instant {} in the timeline of table {} for
rollback", commitInstantTime, config.getBasePath());
- return false;
- }
if (!skipLocking) {
txnManager.beginStateChange(Option.empty(), Option.empty());
}
try {
- rollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() ->
createNewInstantTime(false));
- rollbackPlanOption = table.scheduleRollback(context,
rollbackInstantTime, commitInstantOpt.get(), false,
config.shouldRollbackUsingMarkers(), false);
+ if (config.isExclusiveRollbackEnabled()) {
+ // Reload meta client within the lock so that the timeline is
latest while executing pending rollback
+ table.getMetaClient().reloadActiveTimeline();
+ Option<HoodiePendingRollbackInfo> pendingRollbackOpt =
getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
+ rollbackInstantTimeOpt = pendingRollbackOpt.map(info ->
info.getRollbackInstant().requestedTime());
+ if (pendingRollbackOpt.isPresent()) {
+ // If pending rollback and heartbeat is expired, writer should
start heartbeat and execute rollback
+ if
(heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) {
+ LOG.info("Heartbeat expired for rollback instant {}, executing
rollback now", rollbackInstantTimeOpt);
+ HeartbeatUtils.deleteHeartbeatFile(storage, basePath,
rollbackInstantTimeOpt.get(), config);
+ heartbeatClient.start(rollbackInstantTimeOpt.get());
+ rollbackPlanOption =
pendingRollbackOpt.map(HoodiePendingRollbackInfo::getRollbackPlan);
+ } else {
+ // Heartbeat is still active for another writer, ignore
rollback for now
+ // TODO: ABCDEFGHI revisit return value
+ return false;
+ }
+ } else if
(Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
+ .filter(instant -> EQUALS.test(instant.requestedTime(),
commitInstantTime))
+ .findFirst()).isEmpty()) {
Review Comment:
🤖 This TODO (`// TODO: ABCDEFGHI revisit return value`) looks like a
development placeholder that shouldn't be merged. Could you either resolve it
or replace with a proper tracking reference (e.g., a JIRA link)?
<sub><i>- Generated by an AI agent and may contain mistakes. Please verify
any suggestions before applying.</i></sub>
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1279,14 +1311,17 @@ public boolean rollback(final String commitInstantTime,
Option<HoodiePendingRoll
// is set to false since they are already deleted.
// Execute rollback
HoodieRollbackMetadata rollbackMetadata = commitInstantOpt.isPresent()
- ? table.rollback(context, rollbackInstantTime,
commitInstantOpt.get(), true, skipLocking)
- : table.rollback(context, rollbackInstantTime,
table.getMetaClient().createNewInstant(
+ ? table.rollback(context, rollbackInstantTimeOpt.get(),
commitInstantOpt.get(), true, skipLocking)
+ : table.rollback(context, rollbackInstantTimeOpt.get(),
table.getMetaClient().createNewInstant(
HoodieInstant.State.INFLIGHT,
rollbackPlanOption.get().getInstantToRollback().getAction(), commitInstantTime),
false, skipLocking);
if (timerContext != null) {
long durationInMs = metrics.getDurationInMs(timerContext.stop());
metrics.updateRollbackMetrics(durationInMs,
rollbackMetadata.getTotalFilesDeleted());
}
+ if (config.isExclusiveRollbackEnabled()) {
+ heartbeatClient.stop(rollbackInstantTimeOpt.get());
Review Comment:
🤖 If `table.rollback()` throws an exception, `heartbeatClient.stop()` is
never called. The heartbeat timer thread keeps running and the heartbeat file
remains on storage, which blocks other writers from taking ownership of this
rollback until the heartbeat naturally expires. Could you wrap the rollback
execution + heartbeat stop in a try/finally so the heartbeat is always cleaned
up?
<sub><i>- Generated by an AI agent and may contain mistakes. Please verify
any suggestions before applying.</i></sub>
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1245,25 +1245,57 @@ public boolean rollback(final String commitInstantTime,
Option<HoodiePendingRoll
final Timer.Context timerContext = this.metrics.getRollbackCtx();
try {
HoodieTable table = createTable(config, storageConf, skipVersionCheck);
+
Option<HoodieInstant> commitInstantOpt =
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
.filter(instant -> EQUALS.test(instant.requestedTime(),
commitInstantTime))
.findFirst());
- Option<HoodieRollbackPlan> rollbackPlanOption;
- String rollbackInstantTime;
- if (pendingRollbackInfo.isPresent()) {
+ Option<HoodieRollbackPlan> rollbackPlanOption = Option.empty();
+ Option<String> rollbackInstantTimeOpt;
+ if (!config.isExclusiveRollbackEnabled() &&
pendingRollbackInfo.isPresent()) {
+ // Only case when lock can be skipped is if exclusive rollback is
disabled and
+ // there is a pending rollback info available
rollbackPlanOption =
Option.of(pendingRollbackInfo.get().getRollbackPlan());
- rollbackInstantTime =
pendingRollbackInfo.get().getRollbackInstant().requestedTime();
+ rollbackInstantTimeOpt =
Option.of(pendingRollbackInfo.get().getRollbackInstant().requestedTime());
} else {
- if (commitInstantOpt.isEmpty()) {
- log.error("Cannot find instant {} in the timeline of table {} for
rollback", commitInstantTime, config.getBasePath());
- return false;
- }
if (!skipLocking) {
txnManager.beginStateChange(Option.empty(), Option.empty());
}
try {
- rollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() ->
createNewInstantTime(false));
- rollbackPlanOption = table.scheduleRollback(context,
rollbackInstantTime, commitInstantOpt.get(), false,
config.shouldRollbackUsingMarkers(), false);
+ if (config.isExclusiveRollbackEnabled()) {
+ // Reload meta client within the lock so that the timeline is
latest while executing pending rollback
+ table.getMetaClient().reloadActiveTimeline();
+ Option<HoodiePendingRollbackInfo> pendingRollbackOpt =
getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
+ rollbackInstantTimeOpt = pendingRollbackOpt.map(info ->
info.getRollbackInstant().requestedTime());
+ if (pendingRollbackOpt.isPresent()) {
+ // If pending rollback and heartbeat is expired, writer should
start heartbeat and execute rollback
+ if
(heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) {
+ LOG.info("Heartbeat expired for rollback instant {}, executing
rollback now", rollbackInstantTimeOpt);
+ HeartbeatUtils.deleteHeartbeatFile(storage, basePath,
rollbackInstantTimeOpt.get(), config);
+ heartbeatClient.start(rollbackInstantTimeOpt.get());
+ rollbackPlanOption =
pendingRollbackOpt.map(HoodiePendingRollbackInfo::getRollbackPlan);
+ } else {
+ // Heartbeat is still active for another writer, ignore
rollback for now
+ // TODO: ABCDEFGHI revisit return value
+ return false;
+ }
+ } else if
(Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
+ .filter(instant -> EQUALS.test(instant.requestedTime(),
commitInstantTime))
+ .findFirst()).isEmpty()) {
+ // Assume rollback is already executed since the commit is no
longer present in the timeline
+ return false;
+ }
+ } else {
+ // Case where no pending rollback is present,
+ if (commitInstantOpt.isEmpty()) {
+ log.error("Cannot find instant {} in the timeline of table {}
for rollback", commitInstantTime, config.getBasePath());
+ return false;
+ }
+ rollbackInstantTimeOpt = suppliedRollbackInstantTime.or(() ->
Option.of(createNewInstantTime(false)));
+ if (config.isExclusiveRollbackEnabled()) {
+ heartbeatClient.start(rollbackInstantTimeOpt.get());
Review Comment:
🤖 This `isExclusiveRollbackEnabled()` check is unreachable — it's inside the
`else` branch of `if (config.isExclusiveRollbackEnabled())`, so it can never be
true. This appears to be a copy-paste issue and is related to the missing
rollback scheduling path for the exclusive rollback case when no pending
rollback exists.
<sub><i>- Generated by an AI agent and may contain mistakes. Please verify
any suggestions before applying.</i></sub>
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1245,25 +1245,57 @@ public boolean rollback(final String commitInstantTime,
Option<HoodiePendingRoll
final Timer.Context timerContext = this.metrics.getRollbackCtx();
try {
HoodieTable table = createTable(config, storageConf, skipVersionCheck);
+
Option<HoodieInstant> commitInstantOpt =
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
.filter(instant -> EQUALS.test(instant.requestedTime(),
commitInstantTime))
.findFirst());
- Option<HoodieRollbackPlan> rollbackPlanOption;
- String rollbackInstantTime;
- if (pendingRollbackInfo.isPresent()) {
+ Option<HoodieRollbackPlan> rollbackPlanOption = Option.empty();
+ Option<String> rollbackInstantTimeOpt;
+ if (!config.isExclusiveRollbackEnabled() &&
pendingRollbackInfo.isPresent()) {
+ // Only case when lock can be skipped is if exclusive rollback is
disabled and
+ // there is a pending rollback info available
rollbackPlanOption =
Option.of(pendingRollbackInfo.get().getRollbackPlan());
- rollbackInstantTime =
pendingRollbackInfo.get().getRollbackInstant().requestedTime();
+ rollbackInstantTimeOpt =
Option.of(pendingRollbackInfo.get().getRollbackInstant().requestedTime());
Review Comment:
🤖 When exclusive rollback is enabled, the old fast-path that skips locking
(when `pendingRollbackInfo` is present) is now bypassed — the code always takes
the lock and re-fetches pending rollback info from the timeline. Is this
intentional? It means callers like `rollbackFailedWrites()` that already looked
up `pendingRollbackInfo` will have that value ignored, and the timeline reload
+ `getPendingRollbackInfo()` call inside the lock becomes the source of truth.
Worth documenting this behavioral change.
<sub><i>- Generated by an AI agent and may contain mistakes. Please verify
any suggestions before applying.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]