yihua commented on code in PR #18448:
URL: https://github.com/apache/hudi/pull/18448#discussion_r3095294402


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1245,25 +1245,57 @@ public boolean rollback(final String commitInstantTime, 
Option<HoodiePendingRoll
     final Timer.Context timerContext = this.metrics.getRollbackCtx();
     try {
       HoodieTable table = createTable(config, storageConf, skipVersionCheck);
+
       Option<HoodieInstant> commitInstantOpt = 
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
           .filter(instant -> EQUALS.test(instant.requestedTime(), 
commitInstantTime))
           .findFirst());
-      Option<HoodieRollbackPlan> rollbackPlanOption;
-      String rollbackInstantTime;
-      if (pendingRollbackInfo.isPresent()) {
+      Option<HoodieRollbackPlan> rollbackPlanOption = Option.empty();
+      Option<String> rollbackInstantTimeOpt;
+      if (!config.isExclusiveRollbackEnabled() && 
pendingRollbackInfo.isPresent()) {
+        // Only case when lock can be skipped is if exclusive rollback is 
disabled and
+        // there is a pending rollback info available
         rollbackPlanOption = 
Option.of(pendingRollbackInfo.get().getRollbackPlan());
-        rollbackInstantTime = 
pendingRollbackInfo.get().getRollbackInstant().requestedTime();
+        rollbackInstantTimeOpt = 
Option.of(pendingRollbackInfo.get().getRollbackInstant().requestedTime());
       } else {
-        if (commitInstantOpt.isEmpty()) {
-          log.error("Cannot find instant {} in the timeline of table {} for 
rollback", commitInstantTime, config.getBasePath());
-          return false;
-        }
         if (!skipLocking) {
           txnManager.beginStateChange(Option.empty(), Option.empty());
         }
         try {
-          rollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() -> 
createNewInstantTime(false));
-          rollbackPlanOption = table.scheduleRollback(context, 
rollbackInstantTime, commitInstantOpt.get(), false, 
config.shouldRollbackUsingMarkers(), false);
+          if (config.isExclusiveRollbackEnabled()) {
+            // Reload meta client within the lock so that the timeline is 
latest while executing pending rollback
+            table.getMetaClient().reloadActiveTimeline();
+            Option<HoodiePendingRollbackInfo> pendingRollbackOpt = 
getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
+            rollbackInstantTimeOpt = pendingRollbackOpt.map(info -> 
info.getRollbackInstant().requestedTime());
+            if (pendingRollbackOpt.isPresent()) {
+              // If pending rollback and heartbeat is expired, writer should 
start heartbeat and execute rollback
+              if 
(heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) {
+                LOG.info("Heartbeat expired for rollback instant {}, executing 
rollback now", rollbackInstantTimeOpt);
+                HeartbeatUtils.deleteHeartbeatFile(storage, basePath, 
rollbackInstantTimeOpt.get(), config);
+                heartbeatClient.start(rollbackInstantTimeOpt.get());
+                rollbackPlanOption = 
pendingRollbackOpt.map(HoodiePendingRollbackInfo::getRollbackPlan);
+              } else {
+                // Heartbeat is still active for another writer, ignore 
rollback for now
+                // TODO: ABCDEFGHI revisit return value
+                return false;
+              }
+            } else if 
(Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
+                .filter(instant -> EQUALS.test(instant.requestedTime(), 
commitInstantTime))
+                .findFirst()).isEmpty()) {
+              // Assume rollback is already executed since the commit is no 
longer present in the timeline
+              return false;
+            }
+          } else {
+            // Case where no pending rollback is present,
+            if (commitInstantOpt.isEmpty()) {
+              log.error("Cannot find instant {} in the timeline of table {} 
for rollback", commitInstantTime, config.getBasePath());
+              return false;
+            }
+            rollbackInstantTimeOpt = suppliedRollbackInstantTime.or(() -> 
Option.of(createNewInstantTime(false)));
+            if (config.isExclusiveRollbackEnabled()) {
+              heartbeatClient.start(rollbackInstantTimeOpt.get());
+            }
+            rollbackPlanOption = table.scheduleRollback(context, 
rollbackInstantTimeOpt.get(), commitInstantOpt.get(), false, 
config.shouldRollbackUsingMarkers(), false);
+          }
         } finally {
           if (!skipLocking) {
             txnManager.endStateChange(Option.empty());

Review Comment:
   🤖 [Line 1306] nit: the comment 'Case where no pending rollback is present,' 
is incomplete (missing period and context); consider clarifying that exclusive 
rollback is also disabled here.
   
   <sub><i>- Generated by an AI agent and may contain mistakes. Please verify 
any suggestions before applying.</i></sub>



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java:
##########
@@ -825,6 +827,202 @@ public void testRollbackWithRequestedRollbackPlan(boolean 
enableMetadataTable, b
     }
   }
 
+  /**
+   * Test exclusive rollback with multi-writer: when a pending rollback exists 
with an expired heartbeat
+   * (no heartbeat file present → returns 0L → always expired), the current 
writer should take ownership
+   * and execute the rollback.
+   */
+  @Test
+  public void testExclusiveRollbackPendingRollbackHeartbeatExpired() throws 
Exception {
+    final String p1 = "2016/05/01";
+    final String p2 = "2016/05/02";
+    final String commitTime1 = "20160501010101";
+    final String commitTime2 = "20160502020601";
+    final String commitTime3 = "20160506030611";

Review Comment:
   🤖 nit: the double-brace initialization (anonymous class with initializer 
block) for creating maps is verbose; if Java 9+ is available, consider using 
`Map.of(p1, "id11", p2, "id12")` instead.
   
   <sub><i>- Generated by an AI agent and may contain mistakes. Please verify 
any suggestions before applying.</i></sub>



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java:
##########
@@ -825,6 +827,202 @@ public void testRollbackWithRequestedRollbackPlan(boolean 
enableMetadataTable, b
     }
   }
 
+  /**
+   * Test exclusive rollback with multi-writer: when a pending rollback exists 
with an expired heartbeat
+   * (no heartbeat file present → returns 0L → always expired), the current 
writer should take ownership
+   * and execute the rollback.
+   */
+  @Test

Review Comment:
   🤖 nit: test variable names like `p1`, `p2` and `id11`, `id12` are generic; 
consider more descriptive names like `partition1`, `partition2`, `fileId1_1` to 
improve test clarity.
   
   <sub><i>- Generated by an AI agent and may contain mistakes. Please verify 
any suggestions before applying.</i></sub>



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1245,25 +1245,57 @@ public boolean rollback(final String commitInstantTime, 
Option<HoodiePendingRoll
     final Timer.Context timerContext = this.metrics.getRollbackCtx();
     try {
       HoodieTable table = createTable(config, storageConf, skipVersionCheck);
+
       Option<HoodieInstant> commitInstantOpt = 
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
           .filter(instant -> EQUALS.test(instant.requestedTime(), 
commitInstantTime))
           .findFirst());
-      Option<HoodieRollbackPlan> rollbackPlanOption;
-      String rollbackInstantTime;
-      if (pendingRollbackInfo.isPresent()) {
+      Option<HoodieRollbackPlan> rollbackPlanOption = Option.empty();
+      Option<String> rollbackInstantTimeOpt;
+      if (!config.isExclusiveRollbackEnabled() && 
pendingRollbackInfo.isPresent()) {
+        // Only case when lock can be skipped is if exclusive rollback is 
disabled and
+        // there is a pending rollback info available
         rollbackPlanOption = 
Option.of(pendingRollbackInfo.get().getRollbackPlan());
-        rollbackInstantTime = 
pendingRollbackInfo.get().getRollbackInstant().requestedTime();
+        rollbackInstantTimeOpt = 
Option.of(pendingRollbackInfo.get().getRollbackInstant().requestedTime());
       } else {
-        if (commitInstantOpt.isEmpty()) {
-          log.error("Cannot find instant {} in the timeline of table {} for 
rollback", commitInstantTime, config.getBasePath());
-          return false;
-        }
         if (!skipLocking) {
           txnManager.beginStateChange(Option.empty(), Option.empty());
         }
         try {
-          rollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() -> 
createNewInstantTime(false));
-          rollbackPlanOption = table.scheduleRollback(context, 
rollbackInstantTime, commitInstantOpt.get(), false, 
config.shouldRollbackUsingMarkers(), false);
+          if (config.isExclusiveRollbackEnabled()) {
+            // Reload meta client within the lock so that the timeline is 
latest while executing pending rollback
+            table.getMetaClient().reloadActiveTimeline();
+            Option<HoodiePendingRollbackInfo> pendingRollbackOpt = 
getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
+            rollbackInstantTimeOpt = pendingRollbackOpt.map(info -> 
info.getRollbackInstant().requestedTime());
+            if (pendingRollbackOpt.isPresent()) {
+              // If pending rollback and heartbeat is expired, writer should 
start heartbeat and execute rollback
+              if 
(heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) {
+                LOG.info("Heartbeat expired for rollback instant {}, executing 
rollback now", rollbackInstantTimeOpt);
+                HeartbeatUtils.deleteHeartbeatFile(storage, basePath, 
rollbackInstantTimeOpt.get(), config);
+                heartbeatClient.start(rollbackInstantTimeOpt.get());
+                rollbackPlanOption = 
pendingRollbackOpt.map(HoodiePendingRollbackInfo::getRollbackPlan);
+              } else {
+                // Heartbeat is still active for another writer, ignore 
rollback for now
+                // TODO: ABCDEFGHI revisit return value
+                return false;
+              }
+            } else if 
(Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
+                .filter(instant -> EQUALS.test(instant.requestedTime(), 
commitInstantTime))
+                .findFirst()).isEmpty()) {

Review Comment:
   🤖 nit: passing `rollbackInstantTimeOpt` (an Option) to LOG.info will print 
'Option[...]' instead of the value; use `.get()` or extract the string first 
for cleaner logs.
   
   <sub><i>- Generated by an AI agent and may contain mistakes. Please verify 
any suggestions before applying.</i></sub>



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1245,25 +1245,57 @@ public boolean rollback(final String commitInstantTime, 
Option<HoodiePendingRoll
     final Timer.Context timerContext = this.metrics.getRollbackCtx();
     try {
       HoodieTable table = createTable(config, storageConf, skipVersionCheck);
+
       Option<HoodieInstant> commitInstantOpt = 
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
           .filter(instant -> EQUALS.test(instant.requestedTime(), 
commitInstantTime))
           .findFirst());
-      Option<HoodieRollbackPlan> rollbackPlanOption;
-      String rollbackInstantTime;
-      if (pendingRollbackInfo.isPresent()) {
+      Option<HoodieRollbackPlan> rollbackPlanOption = Option.empty();
+      Option<String> rollbackInstantTimeOpt;
+      if (!config.isExclusiveRollbackEnabled() && 
pendingRollbackInfo.isPresent()) {
+        // Only case when lock can be skipped is if exclusive rollback is 
disabled and
+        // there is a pending rollback info available
         rollbackPlanOption = 
Option.of(pendingRollbackInfo.get().getRollbackPlan());
-        rollbackInstantTime = 
pendingRollbackInfo.get().getRollbackInstant().requestedTime();
+        rollbackInstantTimeOpt = 
Option.of(pendingRollbackInfo.get().getRollbackInstant().requestedTime());
       } else {
-        if (commitInstantOpt.isEmpty()) {
-          log.error("Cannot find instant {} in the timeline of table {} for 
rollback", commitInstantTime, config.getBasePath());
-          return false;
-        }
         if (!skipLocking) {
           txnManager.beginStateChange(Option.empty(), Option.empty());
         }
         try {
-          rollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() -> 
createNewInstantTime(false));
-          rollbackPlanOption = table.scheduleRollback(context, 
rollbackInstantTime, commitInstantOpt.get(), false, 
config.shouldRollbackUsingMarkers(), false);
+          if (config.isExclusiveRollbackEnabled()) {
+            // Reload meta client within the lock so that the timeline is 
latest while executing pending rollback
+            table.getMetaClient().reloadActiveTimeline();
+            Option<HoodiePendingRollbackInfo> pendingRollbackOpt = 
getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
+            rollbackInstantTimeOpt = pendingRollbackOpt.map(info -> 
info.getRollbackInstant().requestedTime());
+            if (pendingRollbackOpt.isPresent()) {
+              // If pending rollback and heartbeat is expired, writer should 
start heartbeat and execute rollback
+              if 
(heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) {
+                LOG.info("Heartbeat expired for rollback instant {}, executing 
rollback now", rollbackInstantTimeOpt);
+                HeartbeatUtils.deleteHeartbeatFile(storage, basePath, 
rollbackInstantTimeOpt.get(), config);
+                heartbeatClient.start(rollbackInstantTimeOpt.get());
+                rollbackPlanOption = 
pendingRollbackOpt.map(HoodiePendingRollbackInfo::getRollbackPlan);
+              } else {
+                // Heartbeat is still active for another writer, ignore 
rollback for now
+                // TODO: ABCDEFGHI revisit return value
+                return false;
+              }
+            } else if 
(Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
+                .filter(instant -> EQUALS.test(instant.requestedTime(), 
commitInstantTime))
+                .findFirst()).isEmpty()) {
+              // Assume rollback is already executed since the commit is no 
longer present in the timeline
+              return false;
+            }
+          } else {
+            // Case where no pending rollback is present,
+            if (commitInstantOpt.isEmpty()) {
+              log.error("Cannot find instant {} in the timeline of table {} 
for rollback", commitInstantTime, config.getBasePath());
+              return false;
+            }
+            rollbackInstantTimeOpt = suppliedRollbackInstantTime.or(() -> 
Option.of(createNewInstantTime(false)));
+            if (config.isExclusiveRollbackEnabled()) {
+              heartbeatClient.start(rollbackInstantTimeOpt.get());
+            }

Review Comment:
   🤖 nit: the TODO comment contains 'ABCDEFGHI' which looks like a placeholder; 
consider replacing with a real issue reference (e.g., HUDI-#### or a meaningful 
comment).
   
   <sub><i>- Generated by an AI agent and may contain mistakes. Please verify 
any suggestions before applying.</i></sub>



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1245,25 +1245,57 @@ public boolean rollback(final String commitInstantTime, 
Option<HoodiePendingRoll
     final Timer.Context timerContext = this.metrics.getRollbackCtx();
     try {
       HoodieTable table = createTable(config, storageConf, skipVersionCheck);
+
       Option<HoodieInstant> commitInstantOpt = 
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
           .filter(instant -> EQUALS.test(instant.requestedTime(), 
commitInstantTime))
           .findFirst());
-      Option<HoodieRollbackPlan> rollbackPlanOption;
-      String rollbackInstantTime;
-      if (pendingRollbackInfo.isPresent()) {
+      Option<HoodieRollbackPlan> rollbackPlanOption = Option.empty();
+      Option<String> rollbackInstantTimeOpt;
+      if (!config.isExclusiveRollbackEnabled() && 
pendingRollbackInfo.isPresent()) {
+        // Only case when lock can be skipped is if exclusive rollback is 
disabled and
+        // there is a pending rollback info available
         rollbackPlanOption = 
Option.of(pendingRollbackInfo.get().getRollbackPlan());
-        rollbackInstantTime = 
pendingRollbackInfo.get().getRollbackInstant().requestedTime();
+        rollbackInstantTimeOpt = 
Option.of(pendingRollbackInfo.get().getRollbackInstant().requestedTime());
       } else {
-        if (commitInstantOpt.isEmpty()) {
-          log.error("Cannot find instant {} in the timeline of table {} for 
rollback", commitInstantTime, config.getBasePath());
-          return false;
-        }
         if (!skipLocking) {
           txnManager.beginStateChange(Option.empty(), Option.empty());
         }
         try {
-          rollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() -> 
createNewInstantTime(false));
-          rollbackPlanOption = table.scheduleRollback(context, 
rollbackInstantTime, commitInstantOpt.get(), false, 
config.shouldRollbackUsingMarkers(), false);
+          if (config.isExclusiveRollbackEnabled()) {
+            // Reload meta client within the lock so that the timeline is 
latest while executing pending rollback
+            table.getMetaClient().reloadActiveTimeline();
+            Option<HoodiePendingRollbackInfo> pendingRollbackOpt = 
getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
+            rollbackInstantTimeOpt = pendingRollbackOpt.map(info -> 
info.getRollbackInstant().requestedTime());
+            if (pendingRollbackOpt.isPresent()) {
+              // If pending rollback and heartbeat is expired, writer should 
start heartbeat and execute rollback
+              if 
(heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) {
+                LOG.info("Heartbeat expired for rollback instant {}, executing 
rollback now", rollbackInstantTimeOpt);
+                HeartbeatUtils.deleteHeartbeatFile(storage, basePath, 
rollbackInstantTimeOpt.get(), config);
+                heartbeatClient.start(rollbackInstantTimeOpt.get());
+                rollbackPlanOption = 
pendingRollbackOpt.map(HoodiePendingRollbackInfo::getRollbackPlan);
+              } else {
+                // Heartbeat is still active for another writer, ignore 
rollback for now
+                // TODO: ABCDEFGHI revisit return value
+                return false;
+              }
+            } else if 
(Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
+                .filter(instant -> EQUALS.test(instant.requestedTime(), 
commitInstantTime))
+                .findFirst()).isEmpty()) {
+              // Assume rollback is already executed since the commit is no 
longer present in the timeline
+              return false;
+            }
+          } else {
+            // Case where no pending rollback is present,
+            if (commitInstantOpt.isEmpty()) {
+              log.error("Cannot find instant {} in the timeline of table {} 
for rollback", commitInstantTime, config.getBasePath());
+              return false;
+            }
+            rollbackInstantTimeOpt = suppliedRollbackInstantTime.or(() -> 
Option.of(createNewInstantTime(false)));
+            if (config.isExclusiveRollbackEnabled()) {
+              heartbeatClient.start(rollbackInstantTimeOpt.get());
+            }
+            rollbackPlanOption = table.scheduleRollback(context, 
rollbackInstantTimeOpt.get(), commitInstantOpt.get(), false, 
config.shouldRollbackUsingMarkers(), false);
+          }
         } finally {

Review Comment:
   🤖 nit: this code duplicates the commit-in-timeline check from lines 
1254-1257; consider extracting into a helper method or reusing the earlier 
result.
   
   <sub><i>- Generated by an AI agent and may contain mistakes. Please verify 
any suggestions before applying.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to