yihua commented on code in PR #18448:
URL: https://github.com/apache/hudi/pull/18448#discussion_r3095307390
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1245,25 +1245,57 @@ public boolean rollback(final String commitInstantTime,
Option<HoodiePendingRoll
final Timer.Context timerContext = this.metrics.getRollbackCtx();
try {
HoodieTable table = createTable(config, storageConf, skipVersionCheck);
+
Option<HoodieInstant> commitInstantOpt =
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
.filter(instant -> EQUALS.test(instant.requestedTime(),
commitInstantTime))
.findFirst());
- Option<HoodieRollbackPlan> rollbackPlanOption;
- String rollbackInstantTime;
- if (pendingRollbackInfo.isPresent()) {
+ Option<HoodieRollbackPlan> rollbackPlanOption = Option.empty();
+ Option<String> rollbackInstantTimeOpt;
+ if (!config.isExclusiveRollbackEnabled() &&
pendingRollbackInfo.isPresent()) {
+ // Only case when lock can be skipped is if exclusive rollback is
disabled and
+ // there is a pending rollback info available
rollbackPlanOption =
Option.of(pendingRollbackInfo.get().getRollbackPlan());
- rollbackInstantTime =
pendingRollbackInfo.get().getRollbackInstant().requestedTime();
+ rollbackInstantTimeOpt =
Option.of(pendingRollbackInfo.get().getRollbackInstant().requestedTime());
} else {
- if (commitInstantOpt.isEmpty()) {
- log.error("Cannot find instant {} in the timeline of table {} for
rollback", commitInstantTime, config.getBasePath());
- return false;
- }
if (!skipLocking) {
txnManager.beginStateChange(Option.empty(), Option.empty());
}
try {
- rollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() ->
createNewInstantTime(false));
- rollbackPlanOption = table.scheduleRollback(context,
rollbackInstantTime, commitInstantOpt.get(), false,
config.shouldRollbackUsingMarkers(), false);
+ if (config.isExclusiveRollbackEnabled()) {
+ // Reload meta client within the lock so that the timeline is
latest while executing pending rollback
+ table.getMetaClient().reloadActiveTimeline();
+ Option<HoodiePendingRollbackInfo> pendingRollbackOpt =
getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
+ rollbackInstantTimeOpt = pendingRollbackOpt.map(info ->
info.getRollbackInstant().requestedTime());
+ if (pendingRollbackOpt.isPresent()) {
+ // If pending rollback and heartbeat is expired, writer should
start heartbeat and execute rollback
+ if
(heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) {
+ LOG.info("Heartbeat expired for rollback instant {}, executing
rollback now", rollbackInstantTimeOpt);
+ HeartbeatUtils.deleteHeartbeatFile(storage, basePath,
rollbackInstantTimeOpt.get(), config);
+ heartbeatClient.start(rollbackInstantTimeOpt.get());
+ rollbackPlanOption =
pendingRollbackOpt.map(HoodiePendingRollbackInfo::getRollbackPlan);
+ } else {
Review Comment:
🤖 `HeartbeatUtils.deleteHeartbeatFile(storage, basePath,
rollbackInstantTimeOpt.get(), config)` only deletes if
`config.getFailedWritesCleanPolicy().isLazy()`. Is this intentional for the
exclusive rollback use case? If the policy isn't lazy, the stale heartbeat file
from the previous writer won't be deleted before `heartbeatClient.start()`
overwrites it. Might be fine since `start()` overwrites, but wanted to confirm
the intent.
<sub><i>- Generated by an AI agent and may contain mistakes. Please verify
any suggestions before applying.</i></sub>
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1245,25 +1245,57 @@ public boolean rollback(final String commitInstantTime,
Option<HoodiePendingRoll
final Timer.Context timerContext = this.metrics.getRollbackCtx();
try {
HoodieTable table = createTable(config, storageConf, skipVersionCheck);
+
Option<HoodieInstant> commitInstantOpt =
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
.filter(instant -> EQUALS.test(instant.requestedTime(),
commitInstantTime))
.findFirst());
- Option<HoodieRollbackPlan> rollbackPlanOption;
- String rollbackInstantTime;
- if (pendingRollbackInfo.isPresent()) {
+ Option<HoodieRollbackPlan> rollbackPlanOption = Option.empty();
+ Option<String> rollbackInstantTimeOpt;
+ if (!config.isExclusiveRollbackEnabled() &&
pendingRollbackInfo.isPresent()) {
+ // Only case when lock can be skipped is if exclusive rollback is
disabled and
+ // there is a pending rollback info available
rollbackPlanOption =
Option.of(pendingRollbackInfo.get().getRollbackPlan());
- rollbackInstantTime =
pendingRollbackInfo.get().getRollbackInstant().requestedTime();
+ rollbackInstantTimeOpt =
Option.of(pendingRollbackInfo.get().getRollbackInstant().requestedTime());
} else {
- if (commitInstantOpt.isEmpty()) {
- log.error("Cannot find instant {} in the timeline of table {} for
rollback", commitInstantTime, config.getBasePath());
- return false;
- }
if (!skipLocking) {
txnManager.beginStateChange(Option.empty(), Option.empty());
}
try {
- rollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() ->
createNewInstantTime(false));
- rollbackPlanOption = table.scheduleRollback(context,
rollbackInstantTime, commitInstantOpt.get(), false,
config.shouldRollbackUsingMarkers(), false);
+ if (config.isExclusiveRollbackEnabled()) {
+ // Reload meta client within the lock so that the timeline is
latest while executing pending rollback
+ table.getMetaClient().reloadActiveTimeline();
+ Option<HoodiePendingRollbackInfo> pendingRollbackOpt =
getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
+ rollbackInstantTimeOpt = pendingRollbackOpt.map(info ->
info.getRollbackInstant().requestedTime());
+ if (pendingRollbackOpt.isPresent()) {
+ // If pending rollback and heartbeat is expired, writer should
start heartbeat and execute rollback
+ if
(heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) {
+ LOG.info("Heartbeat expired for rollback instant {}, executing
rollback now", rollbackInstantTimeOpt);
+ HeartbeatUtils.deleteHeartbeatFile(storage, basePath,
rollbackInstantTimeOpt.get(), config);
+ heartbeatClient.start(rollbackInstantTimeOpt.get());
+ rollbackPlanOption =
pendingRollbackOpt.map(HoodiePendingRollbackInfo::getRollbackPlan);
+ } else {
+ // Heartbeat is still active for another writer, ignore
rollback for now
+ // TODO: ABCDEFGHI revisit return value
+ return false;
+ }
+ } else if
(Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
+ .filter(instant -> EQUALS.test(instant.requestedTime(),
commitInstantTime))
+ .findFirst()).isEmpty()) {
+ // Assume rollback is already executed since the commit is no
longer present in the timeline
+ return false;
Review Comment:
🤖 When exclusive rollback is enabled, no pending rollback exists, but the
commit IS still in the timeline, neither `rollbackPlanOption` nor
`rollbackInstantTimeOpt` gets assigned — they remain `Option.empty()`. The code
falls through to `rollbackPlanOption.isPresent()` which is false, throwing a
`HoodieRollbackException`. This is the first-writer-to-rollback case and it's
completely unhandled. It seems like the scheduling logic in the `else` branch
(line 1290–1298) was intended to also be reachable from the exclusive path, but
the `if/else` structure prevents it.
<sub><i>- Generated by an AI agent and may contain mistakes. Please verify
any suggestions before applying.</i></sub>
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1245,25 +1245,57 @@ public boolean rollback(final String commitInstantTime,
Option<HoodiePendingRoll
final Timer.Context timerContext = this.metrics.getRollbackCtx();
try {
HoodieTable table = createTable(config, storageConf, skipVersionCheck);
+
Option<HoodieInstant> commitInstantOpt =
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
.filter(instant -> EQUALS.test(instant.requestedTime(),
commitInstantTime))
.findFirst());
- Option<HoodieRollbackPlan> rollbackPlanOption;
- String rollbackInstantTime;
- if (pendingRollbackInfo.isPresent()) {
+ Option<HoodieRollbackPlan> rollbackPlanOption = Option.empty();
+ Option<String> rollbackInstantTimeOpt;
+ if (!config.isExclusiveRollbackEnabled() &&
pendingRollbackInfo.isPresent()) {
+ // Only case when lock can be skipped is if exclusive rollback is
disabled and
+ // there is a pending rollback info available
rollbackPlanOption =
Option.of(pendingRollbackInfo.get().getRollbackPlan());
- rollbackInstantTime =
pendingRollbackInfo.get().getRollbackInstant().requestedTime();
+ rollbackInstantTimeOpt =
Option.of(pendingRollbackInfo.get().getRollbackInstant().requestedTime());
} else {
- if (commitInstantOpt.isEmpty()) {
- log.error("Cannot find instant {} in the timeline of table {} for
rollback", commitInstantTime, config.getBasePath());
- return false;
- }
if (!skipLocking) {
txnManager.beginStateChange(Option.empty(), Option.empty());
}
try {
- rollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() ->
createNewInstantTime(false));
- rollbackPlanOption = table.scheduleRollback(context,
rollbackInstantTime, commitInstantOpt.get(), false,
config.shouldRollbackUsingMarkers(), false);
+ if (config.isExclusiveRollbackEnabled()) {
+ // Reload meta client within the lock so that the timeline is
latest while executing pending rollback
+ table.getMetaClient().reloadActiveTimeline();
+ Option<HoodiePendingRollbackInfo> pendingRollbackOpt =
getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
+ rollbackInstantTimeOpt = pendingRollbackOpt.map(info ->
info.getRollbackInstant().requestedTime());
+ if (pendingRollbackOpt.isPresent()) {
+ // If pending rollback and heartbeat is expired, writer should
start heartbeat and execute rollback
+ if
(heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) {
+ LOG.info("Heartbeat expired for rollback instant {}, executing
rollback now", rollbackInstantTimeOpt);
+ HeartbeatUtils.deleteHeartbeatFile(storage, basePath,
rollbackInstantTimeOpt.get(), config);
+ heartbeatClient.start(rollbackInstantTimeOpt.get());
+ rollbackPlanOption =
pendingRollbackOpt.map(HoodiePendingRollbackInfo::getRollbackPlan);
+ } else {
+ // Heartbeat is still active for another writer, ignore
rollback for now
+ // TODO: ABCDEFGHI revisit return value
+ return false;
+ }
+ } else if
(Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
Review Comment:
🤖 `// TODO: ABCDEFGHI revisit return value` — looks like a placeholder that
shouldn't go into the final PR.
<sub><i>- Generated by an AI agent and may contain mistakes. Please verify
any suggestions before applying.</i></sub>
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1245,25 +1245,57 @@ public boolean rollback(final String commitInstantTime,
Option<HoodiePendingRoll
final Timer.Context timerContext = this.metrics.getRollbackCtx();
try {
HoodieTable table = createTable(config, storageConf, skipVersionCheck);
+
Option<HoodieInstant> commitInstantOpt =
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
.filter(instant -> EQUALS.test(instant.requestedTime(),
commitInstantTime))
.findFirst());
- Option<HoodieRollbackPlan> rollbackPlanOption;
- String rollbackInstantTime;
- if (pendingRollbackInfo.isPresent()) {
+ Option<HoodieRollbackPlan> rollbackPlanOption = Option.empty();
+ Option<String> rollbackInstantTimeOpt;
+ if (!config.isExclusiveRollbackEnabled() &&
pendingRollbackInfo.isPresent()) {
+ // Only case when lock can be skipped is if exclusive rollback is
disabled and
+ // there is a pending rollback info available
rollbackPlanOption =
Option.of(pendingRollbackInfo.get().getRollbackPlan());
- rollbackInstantTime =
pendingRollbackInfo.get().getRollbackInstant().requestedTime();
+ rollbackInstantTimeOpt =
Option.of(pendingRollbackInfo.get().getRollbackInstant().requestedTime());
} else {
- if (commitInstantOpt.isEmpty()) {
- log.error("Cannot find instant {} in the timeline of table {} for
rollback", commitInstantTime, config.getBasePath());
- return false;
- }
if (!skipLocking) {
txnManager.beginStateChange(Option.empty(), Option.empty());
}
try {
- rollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() ->
createNewInstantTime(false));
- rollbackPlanOption = table.scheduleRollback(context,
rollbackInstantTime, commitInstantOpt.get(), false,
config.shouldRollbackUsingMarkers(), false);
+ if (config.isExclusiveRollbackEnabled()) {
+ // Reload meta client within the lock so that the timeline is
latest while executing pending rollback
+ table.getMetaClient().reloadActiveTimeline();
+ Option<HoodiePendingRollbackInfo> pendingRollbackOpt =
getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
+ rollbackInstantTimeOpt = pendingRollbackOpt.map(info ->
info.getRollbackInstant().requestedTime());
+ if (pendingRollbackOpt.isPresent()) {
+ // If pending rollback and heartbeat is expired, writer should
start heartbeat and execute rollback
+ if
(heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) {
+ LOG.info("Heartbeat expired for rollback instant {}, executing
rollback now", rollbackInstantTimeOpt);
+ HeartbeatUtils.deleteHeartbeatFile(storage, basePath,
rollbackInstantTimeOpt.get(), config);
+ heartbeatClient.start(rollbackInstantTimeOpt.get());
+ rollbackPlanOption =
pendingRollbackOpt.map(HoodiePendingRollbackInfo::getRollbackPlan);
+ } else {
+ // Heartbeat is still active for another writer, ignore
rollback for now
+ // TODO: ABCDEFGHI revisit return value
+ return false;
+ }
+ } else if
(Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
+ .filter(instant -> EQUALS.test(instant.requestedTime(),
commitInstantTime))
+ .findFirst()).isEmpty()) {
+ // Assume rollback is already executed since the commit is no
longer present in the timeline
+ return false;
+ }
+ } else {
+ // Case where no pending rollback is present,
+ if (commitInstantOpt.isEmpty()) {
+ log.error("Cannot find instant {} in the timeline of table {}
for rollback", commitInstantTime, config.getBasePath());
+ return false;
+ }
+ rollbackInstantTimeOpt = suppliedRollbackInstantTime.or(() ->
Option.of(createNewInstantTime(false)));
+ if (config.isExclusiveRollbackEnabled()) {
Review Comment:
🤖 This `if (config.isExclusiveRollbackEnabled())` check is unreachable —
we're inside the `else` branch of `if (config.isExclusiveRollbackEnabled())` at
line 1268. The `heartbeatClient.start(...)` call will never execute. This looks
related to the missing case above where the exclusive path doesn't schedule a
new rollback.
<sub><i>- Generated by an AI agent and may contain mistakes. Please verify
any suggestions before applying.</i></sub>
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1279,14 +1311,17 @@ public boolean rollback(final String commitInstantTime,
Option<HoodiePendingRoll
// is set to false since they are already deleted.
// Execute rollback
HoodieRollbackMetadata rollbackMetadata = commitInstantOpt.isPresent()
- ? table.rollback(context, rollbackInstantTime,
commitInstantOpt.get(), true, skipLocking)
- : table.rollback(context, rollbackInstantTime,
table.getMetaClient().createNewInstant(
+ ? table.rollback(context, rollbackInstantTimeOpt.get(),
commitInstantOpt.get(), true, skipLocking)
+ : table.rollback(context, rollbackInstantTimeOpt.get(),
table.getMetaClient().createNewInstant(
HoodieInstant.State.INFLIGHT,
rollbackPlanOption.get().getInstantToRollback().getAction(), commitInstantTime),
false, skipLocking);
if (timerContext != null) {
long durationInMs = metrics.getDurationInMs(timerContext.stop());
metrics.updateRollbackMetrics(durationInMs,
rollbackMetadata.getTotalFilesDeleted());
}
+ if (config.isExclusiveRollbackEnabled()) {
+ heartbeatClient.stop(rollbackInstantTimeOpt.get());
+ }
return true;
Review Comment:
🤖 If `table.rollback()` throws an exception, the heartbeat started at line
1277 is never stopped — the `heartbeatClient.stop()` at line 1327 is only
reached on the success path. A leaked heartbeat timer + file would prevent
other writers from performing the rollback (they'd see an active heartbeat).
Could you move the `stop()` into a `finally` block?
<sub><i>- Generated by an AI agent and may contain mistakes. Please verify
any suggestions before applying.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]