yihua commented on code in PR #18448:
URL: https://github.com/apache/hudi/pull/18448#discussion_r3095366301


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1245,25 +1245,57 @@ public boolean rollback(final String commitInstantTime, 
Option<HoodiePendingRoll
     final Timer.Context timerContext = this.metrics.getRollbackCtx();
     try {
       HoodieTable table = createTable(config, storageConf, skipVersionCheck);
+
       Option<HoodieInstant> commitInstantOpt = 
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
           .filter(instant -> EQUALS.test(instant.requestedTime(), 
commitInstantTime))
           .findFirst());
-      Option<HoodieRollbackPlan> rollbackPlanOption;
-      String rollbackInstantTime;
-      if (pendingRollbackInfo.isPresent()) {
+      Option<HoodieRollbackPlan> rollbackPlanOption = Option.empty();
+      Option<String> rollbackInstantTimeOpt;
+      if (!config.isExclusiveRollbackEnabled() && 
pendingRollbackInfo.isPresent()) {
+        // Only case when lock can be skipped is if exclusive rollback is 
disabled and
+        // there is a pending rollback info available
         rollbackPlanOption = 
Option.of(pendingRollbackInfo.get().getRollbackPlan());
-        rollbackInstantTime = 
pendingRollbackInfo.get().getRollbackInstant().requestedTime();
+        rollbackInstantTimeOpt = 
Option.of(pendingRollbackInfo.get().getRollbackInstant().requestedTime());
       } else {
-        if (commitInstantOpt.isEmpty()) {
-          log.error("Cannot find instant {} in the timeline of table {} for 
rollback", commitInstantTime, config.getBasePath());
-          return false;
-        }
         if (!skipLocking) {
           txnManager.beginStateChange(Option.empty(), Option.empty());
         }
         try {
-          rollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() -> 
createNewInstantTime(false));
-          rollbackPlanOption = table.scheduleRollback(context, 
rollbackInstantTime, commitInstantOpt.get(), false, 
config.shouldRollbackUsingMarkers(), false);
+          if (config.isExclusiveRollbackEnabled()) {
+            // Reload meta client within the lock so that the timeline is 
latest while executing pending rollback
+            table.getMetaClient().reloadActiveTimeline();
+            Option<HoodiePendingRollbackInfo> pendingRollbackOpt = 
getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
+            rollbackInstantTimeOpt = pendingRollbackOpt.map(info -> 
info.getRollbackInstant().requestedTime());
+            if (pendingRollbackOpt.isPresent()) {
+              // If pending rollback and heartbeat is expired, writer should 
start heartbeat and execute rollback
+              if 
(heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) {

Review Comment:
   🤖 When `isExclusiveRollbackEnabled()` is true and no pending rollback exists 
but the commit IS in the timeline, this block falls through with 
`rollbackPlanOption` still `Option.empty()` and `rollbackInstantTimeOpt` also 
empty. No rollback plan is ever scheduled, so the code hits the `else` branch 
at line ~1297 and throws `HoodieRollbackException`. This is the "first writer" 
scenario — someone needs to create the rollback plan.
   
   The `else` branch below (line ~1288) has a dead 
`isExclusiveRollbackEnabled()` check that suggests the scheduling logic was 
intended to be shared, but due to the if/else structure it's unreachable when 
exclusive rollback is enabled.
   
   <sub><i>- Generated by an AI agent and may contain mistakes. Please verify 
any suggestions before applying.</i></sub>



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1245,25 +1245,57 @@ public boolean rollback(final String commitInstantTime, 
Option<HoodiePendingRoll
     final Timer.Context timerContext = this.metrics.getRollbackCtx();
     try {
       HoodieTable table = createTable(config, storageConf, skipVersionCheck);
+
       Option<HoodieInstant> commitInstantOpt = 
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
           .filter(instant -> EQUALS.test(instant.requestedTime(), 
commitInstantTime))
           .findFirst());
-      Option<HoodieRollbackPlan> rollbackPlanOption;
-      String rollbackInstantTime;
-      if (pendingRollbackInfo.isPresent()) {
+      Option<HoodieRollbackPlan> rollbackPlanOption = Option.empty();
+      Option<String> rollbackInstantTimeOpt;
+      if (!config.isExclusiveRollbackEnabled() && 
pendingRollbackInfo.isPresent()) {
+        // Only case when lock can be skipped is if exclusive rollback is 
disabled and
+        // there is a pending rollback info available
         rollbackPlanOption = 
Option.of(pendingRollbackInfo.get().getRollbackPlan());
-        rollbackInstantTime = 
pendingRollbackInfo.get().getRollbackInstant().requestedTime();
+        rollbackInstantTimeOpt = 
Option.of(pendingRollbackInfo.get().getRollbackInstant().requestedTime());
       } else {
-        if (commitInstantOpt.isEmpty()) {
-          log.error("Cannot find instant {} in the timeline of table {} for 
rollback", commitInstantTime, config.getBasePath());
-          return false;
-        }
         if (!skipLocking) {
           txnManager.beginStateChange(Option.empty(), Option.empty());
         }
         try {
-          rollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() -> 
createNewInstantTime(false));
-          rollbackPlanOption = table.scheduleRollback(context, 
rollbackInstantTime, commitInstantOpt.get(), false, 
config.shouldRollbackUsingMarkers(), false);
+          if (config.isExclusiveRollbackEnabled()) {
+            // Reload meta client within the lock so that the timeline is 
latest while executing pending rollback
+            table.getMetaClient().reloadActiveTimeline();
+            Option<HoodiePendingRollbackInfo> pendingRollbackOpt = 
getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
+            rollbackInstantTimeOpt = pendingRollbackOpt.map(info -> 
info.getRollbackInstant().requestedTime());
+            if (pendingRollbackOpt.isPresent()) {
+              // If pending rollback and heartbeat is expired, writer should 
start heartbeat and execute rollback
+              if 
(heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) {
+                LOG.info("Heartbeat expired for rollback instant {}, executing 
rollback now", rollbackInstantTimeOpt);
+                HeartbeatUtils.deleteHeartbeatFile(storage, basePath, 
rollbackInstantTimeOpt.get(), config);
+                heartbeatClient.start(rollbackInstantTimeOpt.get());
+                rollbackPlanOption = 
pendingRollbackOpt.map(HoodiePendingRollbackInfo::getRollbackPlan);
+              } else {
+                // Heartbeat is still active for another writer, ignore 
rollback for now
+                // TODO: ABCDEFGHI revisit return value
+                return false;
+              }
+            } else if 
(Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
+                .filter(instant -> EQUALS.test(instant.requestedTime(), 
commitInstantTime))
+                .findFirst()).isEmpty()) {

Review Comment:
   🤖 This TODO (`// TODO: ABCDEFGHI revisit return value`) looks like a 
development placeholder that shouldn't be merged. Could you either resolve it 
or replace with a proper tracking reference (e.g., a JIRA link)?
   
   <sub><i>- Generated by an AI agent and may contain mistakes. Please verify 
any suggestions before applying.</i></sub>



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1279,14 +1311,17 @@ public boolean rollback(final String commitInstantTime, 
Option<HoodiePendingRoll
         // is set to false since they are already deleted.
         // Execute rollback
         HoodieRollbackMetadata rollbackMetadata = commitInstantOpt.isPresent()
-            ? table.rollback(context, rollbackInstantTime, 
commitInstantOpt.get(), true, skipLocking)
-            : table.rollback(context, rollbackInstantTime, 
table.getMetaClient().createNewInstant(
+            ? table.rollback(context, rollbackInstantTimeOpt.get(), 
commitInstantOpt.get(), true, skipLocking)
+            : table.rollback(context, rollbackInstantTimeOpt.get(), 
table.getMetaClient().createNewInstant(
                 HoodieInstant.State.INFLIGHT, 
rollbackPlanOption.get().getInstantToRollback().getAction(), commitInstantTime),
             false, skipLocking);
         if (timerContext != null) {
           long durationInMs = metrics.getDurationInMs(timerContext.stop());
           metrics.updateRollbackMetrics(durationInMs, 
rollbackMetadata.getTotalFilesDeleted());
         }
+        if (config.isExclusiveRollbackEnabled()) {
+          heartbeatClient.stop(rollbackInstantTimeOpt.get());

Review Comment:
   🤖 If `table.rollback()` throws an exception, `heartbeatClient.stop()` is 
never called. The heartbeat timer thread keeps running and the heartbeat file 
remains on storage, which blocks other writers from taking ownership of this 
rollback until the heartbeat naturally expires. Could you wrap the rollback 
execution + heartbeat stop in a try/finally so the heartbeat is always cleaned 
up?
   
   <sub><i>- Generated by an AI agent and may contain mistakes. Please verify 
any suggestions before applying.</i></sub>



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1245,25 +1245,57 @@ public boolean rollback(final String commitInstantTime, 
Option<HoodiePendingRoll
     final Timer.Context timerContext = this.metrics.getRollbackCtx();
     try {
       HoodieTable table = createTable(config, storageConf, skipVersionCheck);
+
       Option<HoodieInstant> commitInstantOpt = 
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
           .filter(instant -> EQUALS.test(instant.requestedTime(), 
commitInstantTime))
           .findFirst());
-      Option<HoodieRollbackPlan> rollbackPlanOption;
-      String rollbackInstantTime;
-      if (pendingRollbackInfo.isPresent()) {
+      Option<HoodieRollbackPlan> rollbackPlanOption = Option.empty();
+      Option<String> rollbackInstantTimeOpt;
+      if (!config.isExclusiveRollbackEnabled() && 
pendingRollbackInfo.isPresent()) {
+        // Only case when lock can be skipped is if exclusive rollback is 
disabled and
+        // there is a pending rollback info available
         rollbackPlanOption = 
Option.of(pendingRollbackInfo.get().getRollbackPlan());
-        rollbackInstantTime = 
pendingRollbackInfo.get().getRollbackInstant().requestedTime();
+        rollbackInstantTimeOpt = 
Option.of(pendingRollbackInfo.get().getRollbackInstant().requestedTime());
       } else {
-        if (commitInstantOpt.isEmpty()) {
-          log.error("Cannot find instant {} in the timeline of table {} for 
rollback", commitInstantTime, config.getBasePath());
-          return false;
-        }
         if (!skipLocking) {
           txnManager.beginStateChange(Option.empty(), Option.empty());
         }
         try {
-          rollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() -> 
createNewInstantTime(false));
-          rollbackPlanOption = table.scheduleRollback(context, 
rollbackInstantTime, commitInstantOpt.get(), false, 
config.shouldRollbackUsingMarkers(), false);
+          if (config.isExclusiveRollbackEnabled()) {
+            // Reload meta client within the lock so that the timeline is 
latest while executing pending rollback
+            table.getMetaClient().reloadActiveTimeline();
+            Option<HoodiePendingRollbackInfo> pendingRollbackOpt = 
getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
+            rollbackInstantTimeOpt = pendingRollbackOpt.map(info -> 
info.getRollbackInstant().requestedTime());
+            if (pendingRollbackOpt.isPresent()) {
+              // If pending rollback and heartbeat is expired, writer should 
start heartbeat and execute rollback
+              if 
(heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) {
+                LOG.info("Heartbeat expired for rollback instant {}, executing 
rollback now", rollbackInstantTimeOpt);
+                HeartbeatUtils.deleteHeartbeatFile(storage, basePath, 
rollbackInstantTimeOpt.get(), config);
+                heartbeatClient.start(rollbackInstantTimeOpt.get());
+                rollbackPlanOption = 
pendingRollbackOpt.map(HoodiePendingRollbackInfo::getRollbackPlan);
+              } else {
+                // Heartbeat is still active for another writer, ignore 
rollback for now
+                // TODO: ABCDEFGHI revisit return value
+                return false;
+              }
+            } else if 
(Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
+                .filter(instant -> EQUALS.test(instant.requestedTime(), 
commitInstantTime))
+                .findFirst()).isEmpty()) {
+              // Assume rollback is already executed since the commit is no 
longer present in the timeline
+              return false;
+            }
+          } else {
+            // Case where no pending rollback is present,
+            if (commitInstantOpt.isEmpty()) {
+              log.error("Cannot find instant {} in the timeline of table {} 
for rollback", commitInstantTime, config.getBasePath());
+              return false;
+            }
+            rollbackInstantTimeOpt = suppliedRollbackInstantTime.or(() -> 
Option.of(createNewInstantTime(false)));
+            if (config.isExclusiveRollbackEnabled()) {
+              heartbeatClient.start(rollbackInstantTimeOpt.get());

Review Comment:
   🤖 This `isExclusiveRollbackEnabled()` check is unreachable — it's inside the 
`else` branch of `if (config.isExclusiveRollbackEnabled())`, so it can never be 
true. This appears to be a copy-paste issue and is related to the missing 
rollback scheduling path for the exclusive rollback case when no pending 
rollback exists.
   
   <sub><i>- Generated by an AI agent and may contain mistakes. Please verify 
any suggestions before applying.</i></sub>



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1245,25 +1245,57 @@ public boolean rollback(final String commitInstantTime, 
Option<HoodiePendingRoll
     final Timer.Context timerContext = this.metrics.getRollbackCtx();
     try {
       HoodieTable table = createTable(config, storageConf, skipVersionCheck);
+
       Option<HoodieInstant> commitInstantOpt = 
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
           .filter(instant -> EQUALS.test(instant.requestedTime(), 
commitInstantTime))
           .findFirst());
-      Option<HoodieRollbackPlan> rollbackPlanOption;
-      String rollbackInstantTime;
-      if (pendingRollbackInfo.isPresent()) {
+      Option<HoodieRollbackPlan> rollbackPlanOption = Option.empty();
+      Option<String> rollbackInstantTimeOpt;
+      if (!config.isExclusiveRollbackEnabled() && 
pendingRollbackInfo.isPresent()) {
+        // Only case when lock can be skipped is if exclusive rollback is 
disabled and
+        // there is a pending rollback info available
         rollbackPlanOption = 
Option.of(pendingRollbackInfo.get().getRollbackPlan());
-        rollbackInstantTime = 
pendingRollbackInfo.get().getRollbackInstant().requestedTime();
+        rollbackInstantTimeOpt = 
Option.of(pendingRollbackInfo.get().getRollbackInstant().requestedTime());

Review Comment:
   🤖 When exclusive rollback is enabled, the old fast-path that skips locking 
(when `pendingRollbackInfo` is present) is now bypassed — the code always takes 
the lock and re-fetches pending rollback info from the timeline. Is this 
intentional? It means callers like `rollbackFailedWrites()` that already looked 
up `pendingRollbackInfo` will have that value ignored, and the timeline reload 
+ `getPendingRollbackInfo()` call inside the lock becomes the source of truth. 
Worth documenting this behavioral change.
   
   <sub><i>- Generated by an AI agent and may contain mistakes. Please verify 
any suggestions before applying.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to