yihua commented on code in PR #18448:
URL: https://github.com/apache/hudi/pull/18448#discussion_r3067037101


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1160,25 +1160,59 @@ public boolean rollback(final String commitInstantTime, 
Option<HoodiePendingRoll
     final Timer.Context timerContext = this.metrics.getRollbackCtx();
     try {
       HoodieTable table = createTable(config, storageConf, skipVersionCheck);
+
       Option<HoodieInstant> commitInstantOpt = 
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
           .filter(instant -> EQUALS.test(instant.requestedTime(), 
commitInstantTime))
           .findFirst());
-      Option<HoodieRollbackPlan> rollbackPlanOption;
-      String rollbackInstantTime;
-      if (pendingRollbackInfo.isPresent()) {
+      Option<HoodieRollbackPlan> rollbackPlanOption = Option.empty();
+      Option<String> rollbackInstantTimeOpt;

Review Comment:
   _⚠️ Potential issue_ | _🟠 Major_
   
   **Refresh `commitInstantOpt` after reloading the timeline.**
   
   The exclusive branch rechecks commit presence on the reloaded timeline, but 
Line 1205 still uses the pre-lock `commitInstantOpt`. If the first snapshot 
missed the instant and the refreshed one finds it, this ends up dereferencing 
an empty option instead of scheduling the rollback.
   
   
   <details>
   <summary>Proposed fix</summary>
   
   ```diff
          Option<HoodieInstant> commitInstantOpt = 
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
              .filter(instant -> EQUALS.test(instant.requestedTime(), 
commitInstantTime))
              .findFirst());
   @@
              if (config.isExclusiveRollbackEnabled()) {
                // Reload meta client within the lock so that the timeline is 
latest while executing pending rollback
                table.getMetaClient().reloadActiveTimeline();
   +            commitInstantOpt = 
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
   +                .filter(instant -> EQUALS.test(instant.requestedTime(), 
commitInstantTime))
   +                .findFirst());
                Option<HoodiePendingRollbackInfo> pendingRollbackOpt = 
getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
                rollbackInstantTimeOpt = pendingRollbackOpt.map(info -> 
info.getRollbackInstant().requestedTime());
   @@
   -            } else if 
(Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
   -                .filter(instant -> EQUALS.test(instant.requestedTime(), 
commitInstantTime))
   -                .findFirst()).isEmpty()) {
   +            } else if (commitInstantOpt.isEmpty()) {
                  // Assume rollback is already executed since the commit is no 
longer present in the timeline
                  return false;
                } else {
   ```
   </details>
   
   
   Also applies to: 1179-1205
   
   <details>
   <summary>🤖 Prompt for AI Agents</summary>
   
   ```
   Verify each finding against the current code and only fix it if needed.
   
   In
   
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java`
   around lines 1164 - 1168, The code may use a stale commitInstantOpt after
   reloading the timeline; update the logic in BaseHoodieTableServiceClient so 
that
   after calling table.reloadActiveTimeline() and re-checking commits (the
   exclusive branch around the reloaded timeline check), you re-evaluate
   commitInstantOpt by re-querying
   table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream() (same
   filter using EQUALS.test(instant.requestedTime(), commitInstantTime)) before
   dereferencing it or scheduling rollback; ensure the subsequent use of
   rollbackPlanOption / rollbackInstantTimeOpt uses this refreshed
   commitInstantOpt.
   ```
   
   </details>
   
   <!-- 
fingerprinting:phantom:medusa:grasshopper:689a17c4-94cb-4ee4-8156-feeb68c2f992 
-->
   
   <!-- This is an auto-generated comment by CodeRabbit -->
   
   — *CodeRabbit* 
([original](https://github.com/yihua/hudi/pull/34#discussion_r3067036668)) 
(source:comment#3067036668)



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1160,25 +1160,59 @@ public boolean rollback(final String commitInstantTime, 
Option<HoodiePendingRoll
     final Timer.Context timerContext = this.metrics.getRollbackCtx();
     try {
       HoodieTable table = createTable(config, storageConf, skipVersionCheck);
+
       Option<HoodieInstant> commitInstantOpt = 
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
           .filter(instant -> EQUALS.test(instant.requestedTime(), 
commitInstantTime))
           .findFirst());
-      Option<HoodieRollbackPlan> rollbackPlanOption;
-      String rollbackInstantTime;
-      if (pendingRollbackInfo.isPresent()) {
+      Option<HoodieRollbackPlan> rollbackPlanOption = Option.empty();
+      Option<String> rollbackInstantTimeOpt;
+      if (!config.isExclusiveRollbackEnabled() && 
pendingRollbackInfo.isPresent()) {
+        // Only case when lock can be skipped is if exclusive rollback is 
disabled and
+        // there is a pending rollback info available
         rollbackPlanOption = 
Option.of(pendingRollbackInfo.get().getRollbackPlan());
-        rollbackInstantTime = 
pendingRollbackInfo.get().getRollbackInstant().requestedTime();
+        rollbackInstantTimeOpt = 
Option.of(pendingRollbackInfo.get().getRollbackInstant().requestedTime());
       } else {
-        if (commitInstantOpt.isEmpty()) {
-          log.error("Cannot find instant {} in the timeline of table {} for 
rollback", commitInstantTime, config.getBasePath());
-          return false;
-        }
         if (!skipLocking) {
           txnManager.beginStateChange(Option.empty(), Option.empty());
         }
         try {
-          rollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() -> 
createNewInstantTime(false));
-          rollbackPlanOption = table.scheduleRollback(context, 
rollbackInstantTime, commitInstantOpt.get(), false, 
config.shouldRollbackUsingMarkers(), false);
+          if (config.isExclusiveRollbackEnabled()) {
+            // Reload meta client within the lock so that the timeline is 
latest while executing pending rollback
+            table.getMetaClient().reloadActiveTimeline();
+            Option<HoodiePendingRollbackInfo> pendingRollbackOpt = 
getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
+            rollbackInstantTimeOpt = pendingRollbackOpt.map(info -> 
info.getRollbackInstant().requestedTime());
+            if (pendingRollbackOpt.isPresent()) {
+              // If pending rollback and heartbeat is expired, writer should 
start heartbeat and execute rollback
+              if 
(heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) {
+                LOG.info("Heartbeat expired for rollback instant {}, executing 
rollback now", rollbackInstantTimeOpt);
+                HeartbeatUtils.deleteHeartbeatFile(storage, basePath, 
rollbackInstantTimeOpt.get(), config);
+                heartbeatClient.start(rollbackInstantTimeOpt.get());
+                rollbackPlanOption = 
pendingRollbackOpt.map(HoodiePendingRollbackInfo::getRollbackPlan);
+              } else {
+                // Heartbeat is still active for another writer, ignore 
rollback for now
+                // TODO: ABCDEFGHI revisit return value
+                return false;

Review Comment:
   <a href="#"><img alt="P2" 
src="https://greptile-static-assets.s3.amazonaws.com/badges/p2.svg?v=7"; 
align="top"></a> **Leftover placeholder TODO comment**
   
   The comment `// TODO: ABCDEFGHI revisit return value` looks like a 
development-time placeholder that was left in accidentally. Based on the test 
`testExclusiveRollbackDefersToActiveHeartbeat`, returning `false` when another 
writer's heartbeat is active is the intended and correct behavior. If there is 
still an open design question here, the TODO should use a proper JIRA issue 
reference instead of `ABCDEFGHI`.
   
   ```suggestion
                 } else {
                   // Heartbeat is still active for another writer — skip 
rollback for now.
                   // The active writer will complete the rollback; caller 
should retry later.
                   return false;
   ```
   
   — *Greptile* 
([original](https://github.com/yihua/hudi/pull/34#discussion_r3067036706)) 
(source:comment#3067036706)



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1160,25 +1160,59 @@ public boolean rollback(final String commitInstantTime, 
Option<HoodiePendingRoll
     final Timer.Context timerContext = this.metrics.getRollbackCtx();
     try {
       HoodieTable table = createTable(config, storageConf, skipVersionCheck);
+
       Option<HoodieInstant> commitInstantOpt = 
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
           .filter(instant -> EQUALS.test(instant.requestedTime(), 
commitInstantTime))
           .findFirst());
-      Option<HoodieRollbackPlan> rollbackPlanOption;
-      String rollbackInstantTime;
-      if (pendingRollbackInfo.isPresent()) {
+      Option<HoodieRollbackPlan> rollbackPlanOption = Option.empty();
+      Option<String> rollbackInstantTimeOpt;
+      if (!config.isExclusiveRollbackEnabled() && 
pendingRollbackInfo.isPresent()) {
+        // Only case when lock can be skipped is if exclusive rollback is 
disabled and
+        // there is a pending rollback info available
         rollbackPlanOption = 
Option.of(pendingRollbackInfo.get().getRollbackPlan());
-        rollbackInstantTime = 
pendingRollbackInfo.get().getRollbackInstant().requestedTime();
+        rollbackInstantTimeOpt = 
Option.of(pendingRollbackInfo.get().getRollbackInstant().requestedTime());
       } else {
-        if (commitInstantOpt.isEmpty()) {
-          log.error("Cannot find instant {} in the timeline of table {} for 
rollback", commitInstantTime, config.getBasePath());
-          return false;
-        }
         if (!skipLocking) {
           txnManager.beginStateChange(Option.empty(), Option.empty());
         }
         try {
-          rollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() -> 
createNewInstantTime(false));
-          rollbackPlanOption = table.scheduleRollback(context, 
rollbackInstantTime, commitInstantOpt.get(), false, 
config.shouldRollbackUsingMarkers(), false);
+          if (config.isExclusiveRollbackEnabled()) {
+            // Reload meta client within the lock so that the timeline is 
latest while executing pending rollback
+            table.getMetaClient().reloadActiveTimeline();
+            Option<HoodiePendingRollbackInfo> pendingRollbackOpt = 
getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
+            rollbackInstantTimeOpt = pendingRollbackOpt.map(info -> 
info.getRollbackInstant().requestedTime());
+            if (pendingRollbackOpt.isPresent()) {
+              // If pending rollback and heartbeat is expired, writer should 
start heartbeat and execute rollback
+              if 
(heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) {
+                LOG.info("Heartbeat expired for rollback instant {}, executing 
rollback now", rollbackInstantTimeOpt);

Review Comment:
   <a href="#"><img alt="P0" 
src="https://greptile-static-assets.s3.amazonaws.com/badges/p0.svg?v=7"; 
align="top"></a> **Compilation error: `LOG` is undefined — should be `log`**
   
   The class is annotated with `@Slf4j`, which generates a field named `log` 
(lowercase). There is no `LOG` field defined here or in any parent class 
(`BaseHoodieClient` also uses `log`). This line will fail to compile.
   
   ```suggestion
                   log.info("Heartbeat expired for rollback instant {}, 
executing rollback now", rollbackInstantTimeOpt);
   ```
   
   — *Greptile* 
([original](https://github.com/yihua/hudi/pull/34#discussion_r3067036686)) 
(source:comment#3067036686)



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1160,25 +1160,59 @@ public boolean rollback(final String commitInstantTime, 
Option<HoodiePendingRoll
     final Timer.Context timerContext = this.metrics.getRollbackCtx();
     try {
       HoodieTable table = createTable(config, storageConf, skipVersionCheck);
+
       Option<HoodieInstant> commitInstantOpt = 
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
           .filter(instant -> EQUALS.test(instant.requestedTime(), 
commitInstantTime))
           .findFirst());
-      Option<HoodieRollbackPlan> rollbackPlanOption;
-      String rollbackInstantTime;
-      if (pendingRollbackInfo.isPresent()) {
+      Option<HoodieRollbackPlan> rollbackPlanOption = Option.empty();
+      Option<String> rollbackInstantTimeOpt;
+      if (!config.isExclusiveRollbackEnabled() && 
pendingRollbackInfo.isPresent()) {
+        // Only case when lock can be skipped is if exclusive rollback is 
disabled and
+        // there is a pending rollback info available
         rollbackPlanOption = 
Option.of(pendingRollbackInfo.get().getRollbackPlan());
-        rollbackInstantTime = 
pendingRollbackInfo.get().getRollbackInstant().requestedTime();
+        rollbackInstantTimeOpt = 
Option.of(pendingRollbackInfo.get().getRollbackInstant().requestedTime());
       } else {
-        if (commitInstantOpt.isEmpty()) {
-          log.error("Cannot find instant {} in the timeline of table {} for 
rollback", commitInstantTime, config.getBasePath());
-          return false;
-        }
         if (!skipLocking) {
           txnManager.beginStateChange(Option.empty(), Option.empty());
         }
         try {
-          rollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() -> 
createNewInstantTime(false));
-          rollbackPlanOption = table.scheduleRollback(context, 
rollbackInstantTime, commitInstantOpt.get(), false, 
config.shouldRollbackUsingMarkers(), false);
+          if (config.isExclusiveRollbackEnabled()) {
+            // Reload meta client within the lock so that the timeline is 
latest while executing pending rollback
+            table.getMetaClient().reloadActiveTimeline();
+            Option<HoodiePendingRollbackInfo> pendingRollbackOpt = 
getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
+            rollbackInstantTimeOpt = pendingRollbackOpt.map(info -> 
info.getRollbackInstant().requestedTime());
+            if (pendingRollbackOpt.isPresent()) {
+              // If pending rollback and heartbeat is expired, writer should 
start heartbeat and execute rollback
+              if 
(heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) {
+                LOG.info("Heartbeat expired for rollback instant {}, executing 
rollback now", rollbackInstantTimeOpt);
+                HeartbeatUtils.deleteHeartbeatFile(storage, basePath, 
rollbackInstantTimeOpt.get(), config);
+                heartbeatClient.start(rollbackInstantTimeOpt.get());
+                rollbackPlanOption = 
pendingRollbackOpt.map(HoodiePendingRollbackInfo::getRollbackPlan);

Review Comment:
   _⚠️ Potential issue_ | _🟠 Major_
   
   **Always stop the rollback heartbeat when this writer started it.**
   
   If `scheduleRollback(...)` or `table.rollback(...)` throws after 
`heartbeatClient.start(...)`, the heartbeat never gets stopped. In the 
exclusive path that leaves a live rollback heartbeat behind, so other writers 
will keep deferring until it expires.
   
   
   <details>
   <summary>Proposed fix</summary>
   
   ```diff
          Option<HoodieRollbackPlan> rollbackPlanOption = Option.empty();
          Option<String> rollbackInstantTimeOpt;
   +      boolean startedRollbackHeartbeat = false;
   @@
                  if 
(heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) {
                    LOG.info("Heartbeat expired for rollback instant {}, 
executing rollback now", rollbackInstantTimeOpt);
                    HeartbeatUtils.deleteHeartbeatFile(storage, basePath, 
rollbackInstantTimeOpt.get(), config);
                    heartbeatClient.start(rollbackInstantTimeOpt.get());
   +                startedRollbackHeartbeat = true;
                    rollbackPlanOption = 
pendingRollbackOpt.map(HoodiePendingRollbackInfo::getRollbackPlan);
   @@
                  rollbackInstantTimeOpt = suppliedRollbackInstantTime.or(() -> 
Option.of(createNewInstantTime(false)));
                  heartbeatClient.start(rollbackInstantTimeOpt.get());
   +              startedRollbackHeartbeat = true;
                  rollbackPlanOption = table.scheduleRollback(context, 
rollbackInstantTimeOpt.get(), commitInstantOpt.get(), false, 
config.shouldRollbackUsingMarkers(), false);
   @@
   -        HoodieRollbackMetadata rollbackMetadata = 
commitInstantOpt.isPresent()
   -            ? table.rollback(context, rollbackInstantTimeOpt.get(), 
commitInstantOpt.get(), true, skipLocking)
   -            : table.rollback(context, rollbackInstantTimeOpt.get(), 
table.getMetaClient().createNewInstant(
   -                HoodieInstant.State.INFLIGHT, 
rollbackPlanOption.get().getInstantToRollback().getAction(), commitInstantTime),
   -            false, skipLocking);
   -        if (timerContext != null) {
   -          long durationInMs = metrics.getDurationInMs(timerContext.stop());
   -          metrics.updateRollbackMetrics(durationInMs, 
rollbackMetadata.getTotalFilesDeleted());
   -        }
   -        if (config.isExclusiveRollbackEnabled()) {
   -          heartbeatClient.stop(rollbackInstantTimeOpt.get());
   -        }
   -        return true;
   +        try {
   +          HoodieRollbackMetadata rollbackMetadata = 
commitInstantOpt.isPresent()
   +              ? table.rollback(context, rollbackInstantTimeOpt.get(), 
commitInstantOpt.get(), true, skipLocking)
   +              : table.rollback(context, rollbackInstantTimeOpt.get(), 
table.getMetaClient().createNewInstant(
   +                  HoodieInstant.State.INFLIGHT, 
rollbackPlanOption.get().getInstantToRollback().getAction(), commitInstantTime),
   +              false, skipLocking);
   +          if (timerContext != null) {
   +            long durationInMs = 
metrics.getDurationInMs(timerContext.stop());
   +            metrics.updateRollbackMetrics(durationInMs, 
rollbackMetadata.getTotalFilesDeleted());
   +          }
   +          return true;
   +        } finally {
   +          if (config.isExclusiveRollbackEnabled() && 
startedRollbackHeartbeat && rollbackInstantTimeOpt.isPresent()) {
   +            heartbeatClient.stop(rollbackInstantTimeOpt.get());
   +          }
   +        }
   ```
   </details>
   
   
   Also applies to: 1203-1205, 1230-1241
   
   <details>
   <summary>🤖 Prompt for AI Agents</summary>
   
   ```
   Verify each finding against the current code and only fix it if needed.
   
   In
   
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java`
   around lines 1184 - 1190, The rollback heartbeat started via
   heartbeatClient.start(rollbackInstantTimeOpt.get()) in
   BaseHoodieTableServiceClient may be left running if subsequent calls (e.g.,
   scheduleRollback(...) or table.rollback(...)) throw; wrap the sequence that
   starts the heartbeat and then performs rollback-scheduling/execution in a
   try/finally (or catch and rethrow) to ensure
   heartbeatClient.stop(rollbackInstantTimeOpt.get()) is always called on 
failure.
   Apply the same pattern to the other similar blocks referenced (around the
   instances that call heartbeatClient.start(...) at the locations 
corresponding to
   lines ~1203-1205 and ~1230-1241) so any early exception will stop the 
rollback
   heartbeat before propagating the error.
   ```
   
   </details>
   
   <!-- 
fingerprinting:phantom:medusa:grasshopper:689a17c4-94cb-4ee4-8156-feeb68c2f992 
-->
   
   <!-- This is an auto-generated comment by CodeRabbit -->
   
   — *CodeRabbit* 
([original](https://github.com/yihua/hudi/pull/34#discussion_r3067036671)) 
(source:comment#3067036671)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to