yihua commented on code in PR #18448:
URL: https://github.com/apache/hudi/pull/18448#discussion_r3095307390


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1245,25 +1245,57 @@ public boolean rollback(final String commitInstantTime, 
Option<HoodiePendingRoll
     final Timer.Context timerContext = this.metrics.getRollbackCtx();
     try {
       HoodieTable table = createTable(config, storageConf, skipVersionCheck);
+
       Option<HoodieInstant> commitInstantOpt = 
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
           .filter(instant -> EQUALS.test(instant.requestedTime(), 
commitInstantTime))
           .findFirst());
-      Option<HoodieRollbackPlan> rollbackPlanOption;
-      String rollbackInstantTime;
-      if (pendingRollbackInfo.isPresent()) {
+      Option<HoodieRollbackPlan> rollbackPlanOption = Option.empty();
+      Option<String> rollbackInstantTimeOpt;
+      if (!config.isExclusiveRollbackEnabled() && 
pendingRollbackInfo.isPresent()) {
+        // Only case when lock can be skipped is if exclusive rollback is 
disabled and
+        // there is a pending rollback info available
         rollbackPlanOption = 
Option.of(pendingRollbackInfo.get().getRollbackPlan());
-        rollbackInstantTime = 
pendingRollbackInfo.get().getRollbackInstant().requestedTime();
+        rollbackInstantTimeOpt = 
Option.of(pendingRollbackInfo.get().getRollbackInstant().requestedTime());
       } else {
-        if (commitInstantOpt.isEmpty()) {
-          log.error("Cannot find instant {} in the timeline of table {} for 
rollback", commitInstantTime, config.getBasePath());
-          return false;
-        }
         if (!skipLocking) {
           txnManager.beginStateChange(Option.empty(), Option.empty());
         }
         try {
-          rollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() -> 
createNewInstantTime(false));
-          rollbackPlanOption = table.scheduleRollback(context, 
rollbackInstantTime, commitInstantOpt.get(), false, 
config.shouldRollbackUsingMarkers(), false);
+          if (config.isExclusiveRollbackEnabled()) {
+            // Reload meta client within the lock so that the timeline is 
latest while executing pending rollback
+            table.getMetaClient().reloadActiveTimeline();
+            Option<HoodiePendingRollbackInfo> pendingRollbackOpt = 
getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
+            rollbackInstantTimeOpt = pendingRollbackOpt.map(info -> 
info.getRollbackInstant().requestedTime());
+            if (pendingRollbackOpt.isPresent()) {
+              // If pending rollback and heartbeat is expired, writer should 
start heartbeat and execute rollback
+              if 
(heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) {
+                LOG.info("Heartbeat expired for rollback instant {}, executing 
rollback now", rollbackInstantTimeOpt);
+                HeartbeatUtils.deleteHeartbeatFile(storage, basePath, 
rollbackInstantTimeOpt.get(), config);
+                heartbeatClient.start(rollbackInstantTimeOpt.get());
+                rollbackPlanOption = 
pendingRollbackOpt.map(HoodiePendingRollbackInfo::getRollbackPlan);
+              } else {

Review Comment:
   🤖 `HeartbeatUtils.deleteHeartbeatFile(storage, basePath, 
rollbackInstantTimeOpt.get(), config)` only deletes if 
`config.getFailedWritesCleanPolicy().isLazy()`. Is this intentional for the 
exclusive rollback use case? If the policy isn't lazy, the stale heartbeat file 
from the previous writer won't be deleted before `heartbeatClient.start()` 
overwrites it. Might be fine since `start()` overwrites, but wanted to confirm 
the intent.
   
   <sub><i>- Generated by an AI agent and may contain mistakes. Please verify 
any suggestions before applying.</i></sub>



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1245,25 +1245,57 @@ public boolean rollback(final String commitInstantTime, 
Option<HoodiePendingRoll
     final Timer.Context timerContext = this.metrics.getRollbackCtx();
     try {
       HoodieTable table = createTable(config, storageConf, skipVersionCheck);
+
       Option<HoodieInstant> commitInstantOpt = 
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
           .filter(instant -> EQUALS.test(instant.requestedTime(), 
commitInstantTime))
           .findFirst());
-      Option<HoodieRollbackPlan> rollbackPlanOption;
-      String rollbackInstantTime;
-      if (pendingRollbackInfo.isPresent()) {
+      Option<HoodieRollbackPlan> rollbackPlanOption = Option.empty();
+      Option<String> rollbackInstantTimeOpt;
+      if (!config.isExclusiveRollbackEnabled() && 
pendingRollbackInfo.isPresent()) {
+        // Only case when lock can be skipped is if exclusive rollback is 
disabled and
+        // there is a pending rollback info available
         rollbackPlanOption = 
Option.of(pendingRollbackInfo.get().getRollbackPlan());
-        rollbackInstantTime = 
pendingRollbackInfo.get().getRollbackInstant().requestedTime();
+        rollbackInstantTimeOpt = 
Option.of(pendingRollbackInfo.get().getRollbackInstant().requestedTime());
       } else {
-        if (commitInstantOpt.isEmpty()) {
-          log.error("Cannot find instant {} in the timeline of table {} for 
rollback", commitInstantTime, config.getBasePath());
-          return false;
-        }
         if (!skipLocking) {
           txnManager.beginStateChange(Option.empty(), Option.empty());
         }
         try {
-          rollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() -> 
createNewInstantTime(false));
-          rollbackPlanOption = table.scheduleRollback(context, 
rollbackInstantTime, commitInstantOpt.get(), false, 
config.shouldRollbackUsingMarkers(), false);
+          if (config.isExclusiveRollbackEnabled()) {
+            // Reload meta client within the lock so that the timeline is 
latest while executing pending rollback
+            table.getMetaClient().reloadActiveTimeline();
+            Option<HoodiePendingRollbackInfo> pendingRollbackOpt = 
getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
+            rollbackInstantTimeOpt = pendingRollbackOpt.map(info -> 
info.getRollbackInstant().requestedTime());
+            if (pendingRollbackOpt.isPresent()) {
+              // If pending rollback and heartbeat is expired, writer should 
start heartbeat and execute rollback
+              if 
(heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) {
+                LOG.info("Heartbeat expired for rollback instant {}, executing 
rollback now", rollbackInstantTimeOpt);
+                HeartbeatUtils.deleteHeartbeatFile(storage, basePath, 
rollbackInstantTimeOpt.get(), config);
+                heartbeatClient.start(rollbackInstantTimeOpt.get());
+                rollbackPlanOption = 
pendingRollbackOpt.map(HoodiePendingRollbackInfo::getRollbackPlan);
+              } else {
+                // Heartbeat is still active for another writer, ignore 
rollback for now
+                // TODO: ABCDEFGHI revisit return value
+                return false;
+              }
+            } else if 
(Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
+                .filter(instant -> EQUALS.test(instant.requestedTime(), 
commitInstantTime))
+                .findFirst()).isEmpty()) {
+              // Assume rollback is already executed since the commit is no 
longer present in the timeline
+              return false;

Review Comment:
   🤖 When exclusive rollback is enabled, no pending rollback exists, but the 
commit IS still in the timeline, neither `rollbackPlanOption` nor 
`rollbackInstantTimeOpt` gets assigned — they remain `Option.empty()`. The code 
falls through to `rollbackPlanOption.isPresent()` which is false, throwing a 
`HoodieRollbackException`. This is the first-writer-to-rollback case and it's 
completely unhandled. It seems like the scheduling logic in the `else` branch 
(line 1290–1298) was intended to also be reachable from the exclusive path, but 
the `if/else` structure prevents it.
   
   <sub><i>- Generated by an AI agent and may contain mistakes. Please verify 
any suggestions before applying.</i></sub>



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1245,25 +1245,57 @@ public boolean rollback(final String commitInstantTime, 
Option<HoodiePendingRoll
     final Timer.Context timerContext = this.metrics.getRollbackCtx();
     try {
       HoodieTable table = createTable(config, storageConf, skipVersionCheck);
+
       Option<HoodieInstant> commitInstantOpt = 
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
           .filter(instant -> EQUALS.test(instant.requestedTime(), 
commitInstantTime))
           .findFirst());
-      Option<HoodieRollbackPlan> rollbackPlanOption;
-      String rollbackInstantTime;
-      if (pendingRollbackInfo.isPresent()) {
+      Option<HoodieRollbackPlan> rollbackPlanOption = Option.empty();
+      Option<String> rollbackInstantTimeOpt;
+      if (!config.isExclusiveRollbackEnabled() && 
pendingRollbackInfo.isPresent()) {
+        // Only case when lock can be skipped is if exclusive rollback is 
disabled and
+        // there is a pending rollback info available
         rollbackPlanOption = 
Option.of(pendingRollbackInfo.get().getRollbackPlan());
-        rollbackInstantTime = 
pendingRollbackInfo.get().getRollbackInstant().requestedTime();
+        rollbackInstantTimeOpt = 
Option.of(pendingRollbackInfo.get().getRollbackInstant().requestedTime());
       } else {
-        if (commitInstantOpt.isEmpty()) {
-          log.error("Cannot find instant {} in the timeline of table {} for 
rollback", commitInstantTime, config.getBasePath());
-          return false;
-        }
         if (!skipLocking) {
           txnManager.beginStateChange(Option.empty(), Option.empty());
         }
         try {
-          rollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() -> 
createNewInstantTime(false));
-          rollbackPlanOption = table.scheduleRollback(context, 
rollbackInstantTime, commitInstantOpt.get(), false, 
config.shouldRollbackUsingMarkers(), false);
+          if (config.isExclusiveRollbackEnabled()) {
+            // Reload meta client within the lock so that the timeline is 
latest while executing pending rollback
+            table.getMetaClient().reloadActiveTimeline();
+            Option<HoodiePendingRollbackInfo> pendingRollbackOpt = 
getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
+            rollbackInstantTimeOpt = pendingRollbackOpt.map(info -> 
info.getRollbackInstant().requestedTime());
+            if (pendingRollbackOpt.isPresent()) {
+              // If pending rollback and heartbeat is expired, writer should 
start heartbeat and execute rollback
+              if 
(heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) {
+                LOG.info("Heartbeat expired for rollback instant {}, executing 
rollback now", rollbackInstantTimeOpt);
+                HeartbeatUtils.deleteHeartbeatFile(storage, basePath, 
rollbackInstantTimeOpt.get(), config);
+                heartbeatClient.start(rollbackInstantTimeOpt.get());
+                rollbackPlanOption = 
pendingRollbackOpt.map(HoodiePendingRollbackInfo::getRollbackPlan);
+              } else {
+                // Heartbeat is still active for another writer, ignore 
rollback for now
+                // TODO: ABCDEFGHI revisit return value
+                return false;
+              }
+            } else if 
(Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()

Review Comment:
   🤖 `// TODO: ABCDEFGHI revisit return value` — looks like a placeholder that 
shouldn't go into the final PR.
   
   <sub><i>- Generated by an AI agent and may contain mistakes. Please verify 
any suggestions before applying.</i></sub>



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1245,25 +1245,57 @@ public boolean rollback(final String commitInstantTime, 
Option<HoodiePendingRoll
     final Timer.Context timerContext = this.metrics.getRollbackCtx();
     try {
       HoodieTable table = createTable(config, storageConf, skipVersionCheck);
+
       Option<HoodieInstant> commitInstantOpt = 
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
           .filter(instant -> EQUALS.test(instant.requestedTime(), 
commitInstantTime))
           .findFirst());
-      Option<HoodieRollbackPlan> rollbackPlanOption;
-      String rollbackInstantTime;
-      if (pendingRollbackInfo.isPresent()) {
+      Option<HoodieRollbackPlan> rollbackPlanOption = Option.empty();
+      Option<String> rollbackInstantTimeOpt;
+      if (!config.isExclusiveRollbackEnabled() && 
pendingRollbackInfo.isPresent()) {
+        // Only case when lock can be skipped is if exclusive rollback is 
disabled and
+        // there is a pending rollback info available
         rollbackPlanOption = 
Option.of(pendingRollbackInfo.get().getRollbackPlan());
-        rollbackInstantTime = 
pendingRollbackInfo.get().getRollbackInstant().requestedTime();
+        rollbackInstantTimeOpt = 
Option.of(pendingRollbackInfo.get().getRollbackInstant().requestedTime());
       } else {
-        if (commitInstantOpt.isEmpty()) {
-          log.error("Cannot find instant {} in the timeline of table {} for 
rollback", commitInstantTime, config.getBasePath());
-          return false;
-        }
         if (!skipLocking) {
           txnManager.beginStateChange(Option.empty(), Option.empty());
         }
         try {
-          rollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() -> 
createNewInstantTime(false));
-          rollbackPlanOption = table.scheduleRollback(context, 
rollbackInstantTime, commitInstantOpt.get(), false, 
config.shouldRollbackUsingMarkers(), false);
+          if (config.isExclusiveRollbackEnabled()) {
+            // Reload meta client within the lock so that the timeline is 
latest while executing pending rollback
+            table.getMetaClient().reloadActiveTimeline();
+            Option<HoodiePendingRollbackInfo> pendingRollbackOpt = 
getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
+            rollbackInstantTimeOpt = pendingRollbackOpt.map(info -> 
info.getRollbackInstant().requestedTime());
+            if (pendingRollbackOpt.isPresent()) {
+              // If pending rollback and heartbeat is expired, writer should 
start heartbeat and execute rollback
+              if 
(heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) {
+                LOG.info("Heartbeat expired for rollback instant {}, executing 
rollback now", rollbackInstantTimeOpt);
+                HeartbeatUtils.deleteHeartbeatFile(storage, basePath, 
rollbackInstantTimeOpt.get(), config);
+                heartbeatClient.start(rollbackInstantTimeOpt.get());
+                rollbackPlanOption = 
pendingRollbackOpt.map(HoodiePendingRollbackInfo::getRollbackPlan);
+              } else {
+                // Heartbeat is still active for another writer, ignore 
rollback for now
+                // TODO: ABCDEFGHI revisit return value
+                return false;
+              }
+            } else if 
(Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
+                .filter(instant -> EQUALS.test(instant.requestedTime(), 
commitInstantTime))
+                .findFirst()).isEmpty()) {
+              // Assume rollback is already executed since the commit is no 
longer present in the timeline
+              return false;
+            }
+          } else {
+            // Case where no pending rollback is present,
+            if (commitInstantOpt.isEmpty()) {
+              log.error("Cannot find instant {} in the timeline of table {} 
for rollback", commitInstantTime, config.getBasePath());
+              return false;
+            }
+            rollbackInstantTimeOpt = suppliedRollbackInstantTime.or(() -> 
Option.of(createNewInstantTime(false)));
+            if (config.isExclusiveRollbackEnabled()) {

Review Comment:
   🤖 This `if (config.isExclusiveRollbackEnabled())` check is unreachable — 
we're inside the `else` branch of `if (config.isExclusiveRollbackEnabled())` at 
line 1268. The `heartbeatClient.start(...)` call will never execute. This looks 
related to the missing case above where the exclusive path doesn't schedule a 
new rollback.
   
   <sub><i>- Generated by an AI agent and may contain mistakes. Please verify 
any suggestions before applying.</i></sub>



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1279,14 +1311,17 @@ public boolean rollback(final String commitInstantTime, 
Option<HoodiePendingRoll
         // is set to false since they are already deleted.
         // Execute rollback
         HoodieRollbackMetadata rollbackMetadata = commitInstantOpt.isPresent()
-            ? table.rollback(context, rollbackInstantTime, 
commitInstantOpt.get(), true, skipLocking)
-            : table.rollback(context, rollbackInstantTime, 
table.getMetaClient().createNewInstant(
+            ? table.rollback(context, rollbackInstantTimeOpt.get(), 
commitInstantOpt.get(), true, skipLocking)
+            : table.rollback(context, rollbackInstantTimeOpt.get(), 
table.getMetaClient().createNewInstant(
                 HoodieInstant.State.INFLIGHT, 
rollbackPlanOption.get().getInstantToRollback().getAction(), commitInstantTime),
             false, skipLocking);
         if (timerContext != null) {
           long durationInMs = metrics.getDurationInMs(timerContext.stop());
           metrics.updateRollbackMetrics(durationInMs, 
rollbackMetadata.getTotalFilesDeleted());
         }
+        if (config.isExclusiveRollbackEnabled()) {
+          heartbeatClient.stop(rollbackInstantTimeOpt.get());
+        }
         return true;

Review Comment:
   🤖 If `table.rollback()` throws an exception, the heartbeat started at line 
1277 is never stopped — the `heartbeatClient.stop()` at line 1327 is only 
reached on the success path. A leaked heartbeat timer + file would prevent 
other writers from performing the rollback (they'd see an active heartbeat). 
Could you move the `stop()` into a `finally` block?
   
   <sub><i>- Generated by an AI agent and may contain mistakes. Please verify 
any suggestions before applying.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to