yihua commented on code in PR #18448:
URL: https://github.com/apache/hudi/pull/18448#discussion_r3067037101
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1160,25 +1160,59 @@ public boolean rollback(final String commitInstantTime,
Option<HoodiePendingRoll
final Timer.Context timerContext = this.metrics.getRollbackCtx();
try {
HoodieTable table = createTable(config, storageConf, skipVersionCheck);
+
Option<HoodieInstant> commitInstantOpt =
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
.filter(instant -> EQUALS.test(instant.requestedTime(),
commitInstantTime))
.findFirst());
- Option<HoodieRollbackPlan> rollbackPlanOption;
- String rollbackInstantTime;
- if (pendingRollbackInfo.isPresent()) {
+ Option<HoodieRollbackPlan> rollbackPlanOption = Option.empty();
+ Option<String> rollbackInstantTimeOpt;
Review Comment:
_⚠️ Potential issue_ | _🟠 Major_
**Refresh `commitInstantOpt` after reloading the timeline.**
The exclusive branch rechecks commit presence on the reloaded timeline, but
Line 1205 still uses the pre-lock `commitInstantOpt`. If the first snapshot
missed the instant and the refreshed one finds it, this ends up dereferencing
an empty option instead of scheduling the rollback.
<details>
<summary>Proposed fix</summary>
```diff
Option<HoodieInstant> commitInstantOpt =
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
.filter(instant -> EQUALS.test(instant.requestedTime(),
commitInstantTime))
.findFirst());
@@
if (config.isExclusiveRollbackEnabled()) {
// Reload meta client within the lock so that the timeline is
latest while executing pending rollback
table.getMetaClient().reloadActiveTimeline();
+ commitInstantOpt =
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
+ .filter(instant -> EQUALS.test(instant.requestedTime(),
commitInstantTime))
+ .findFirst());
Option<HoodiePendingRollbackInfo> pendingRollbackOpt =
getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
rollbackInstantTimeOpt = pendingRollbackOpt.map(info ->
info.getRollbackInstant().requestedTime());
@@
- } else if
(Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
- .filter(instant -> EQUALS.test(instant.requestedTime(),
commitInstantTime))
- .findFirst()).isEmpty()) {
+ } else if (commitInstantOpt.isEmpty()) {
// Assume rollback is already executed since the commit is no
longer present in the timeline
return false;
} else {
```
</details>
Also applies to: 1179-1205
<details>
<summary>🤖 Prompt for AI Agents</summary>
```
Verify each finding against the current code and only fix it if needed.
In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java`
around lines 1164 - 1168, The code may use a stale commitInstantOpt after
reloading the timeline; update the logic in BaseHoodieTableServiceClient so
that
after calling table.reloadActiveTimeline() and re-checking commits (the
exclusive branch around the reloaded timeline check), you re-evaluate
commitInstantOpt by re-querying
table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream() (same
filter using EQUALS.test(instant.requestedTime(), commitInstantTime)) before
dereferencing it or scheduling rollback; ensure the subsequent use of
rollbackPlanOption / rollbackInstantTimeOpt uses this refreshed
commitInstantOpt.
```
</details>
<!--
fingerprinting:phantom:medusa:grasshopper:689a17c4-94cb-4ee4-8156-feeb68c2f992
-->
<!-- This is an auto-generated comment by CodeRabbit -->
— *CodeRabbit*
([original](https://github.com/yihua/hudi/pull/34#discussion_r3067036668))
(source:comment#3067036668)
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1160,25 +1160,59 @@ public boolean rollback(final String commitInstantTime,
Option<HoodiePendingRoll
final Timer.Context timerContext = this.metrics.getRollbackCtx();
try {
HoodieTable table = createTable(config, storageConf, skipVersionCheck);
+
Option<HoodieInstant> commitInstantOpt =
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
.filter(instant -> EQUALS.test(instant.requestedTime(),
commitInstantTime))
.findFirst());
- Option<HoodieRollbackPlan> rollbackPlanOption;
- String rollbackInstantTime;
- if (pendingRollbackInfo.isPresent()) {
+ Option<HoodieRollbackPlan> rollbackPlanOption = Option.empty();
+ Option<String> rollbackInstantTimeOpt;
+ if (!config.isExclusiveRollbackEnabled() &&
pendingRollbackInfo.isPresent()) {
+ // Only case when lock can be skipped is if exclusive rollback is
disabled and
+ // there is a pending rollback info available
rollbackPlanOption =
Option.of(pendingRollbackInfo.get().getRollbackPlan());
- rollbackInstantTime =
pendingRollbackInfo.get().getRollbackInstant().requestedTime();
+ rollbackInstantTimeOpt =
Option.of(pendingRollbackInfo.get().getRollbackInstant().requestedTime());
} else {
- if (commitInstantOpt.isEmpty()) {
- log.error("Cannot find instant {} in the timeline of table {} for
rollback", commitInstantTime, config.getBasePath());
- return false;
- }
if (!skipLocking) {
txnManager.beginStateChange(Option.empty(), Option.empty());
}
try {
- rollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() ->
createNewInstantTime(false));
- rollbackPlanOption = table.scheduleRollback(context,
rollbackInstantTime, commitInstantOpt.get(), false,
config.shouldRollbackUsingMarkers(), false);
+ if (config.isExclusiveRollbackEnabled()) {
+ // Reload meta client within the lock so that the timeline is
latest while executing pending rollback
+ table.getMetaClient().reloadActiveTimeline();
+ Option<HoodiePendingRollbackInfo> pendingRollbackOpt =
getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
+ rollbackInstantTimeOpt = pendingRollbackOpt.map(info ->
info.getRollbackInstant().requestedTime());
+ if (pendingRollbackOpt.isPresent()) {
+ // If pending rollback and heartbeat is expired, writer should
start heartbeat and execute rollback
+ if
(heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) {
+ LOG.info("Heartbeat expired for rollback instant {}, executing
rollback now", rollbackInstantTimeOpt);
+ HeartbeatUtils.deleteHeartbeatFile(storage, basePath,
rollbackInstantTimeOpt.get(), config);
+ heartbeatClient.start(rollbackInstantTimeOpt.get());
+ rollbackPlanOption =
pendingRollbackOpt.map(HoodiePendingRollbackInfo::getRollbackPlan);
+ } else {
+ // Heartbeat is still active for another writer, ignore
rollback for now
+ // TODO: ABCDEFGHI revisit return value
+ return false;
Review Comment:
<a href="#"><img alt="P2"
src="https://greptile-static-assets.s3.amazonaws.com/badges/p2.svg?v=7"
align="top"></a> **Leftover placeholder TODO comment**
The comment `// TODO: ABCDEFGHI revisit return value` looks like a
development-time placeholder that was left in accidentally. Based on the test
`testExclusiveRollbackDefersToActiveHeartbeat`, returning `false` when another
writer's heartbeat is active is the intended and correct behavior. If there is
still an open design question here, the TODO should use a proper JIRA issue
reference instead of `ABCDEFGHI`.
```suggestion
} else {
// Heartbeat is still active for another writer — skip
rollback for now.
// The active writer will complete the rollback; caller
should retry later.
return false;
```
— *Greptile*
([original](https://github.com/yihua/hudi/pull/34#discussion_r3067036706))
(source:comment#3067036706)
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1160,25 +1160,59 @@ public boolean rollback(final String commitInstantTime,
Option<HoodiePendingRoll
final Timer.Context timerContext = this.metrics.getRollbackCtx();
try {
HoodieTable table = createTable(config, storageConf, skipVersionCheck);
+
Option<HoodieInstant> commitInstantOpt =
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
.filter(instant -> EQUALS.test(instant.requestedTime(),
commitInstantTime))
.findFirst());
- Option<HoodieRollbackPlan> rollbackPlanOption;
- String rollbackInstantTime;
- if (pendingRollbackInfo.isPresent()) {
+ Option<HoodieRollbackPlan> rollbackPlanOption = Option.empty();
+ Option<String> rollbackInstantTimeOpt;
+ if (!config.isExclusiveRollbackEnabled() &&
pendingRollbackInfo.isPresent()) {
+ // Only case when lock can be skipped is if exclusive rollback is
disabled and
+ // there is a pending rollback info available
rollbackPlanOption =
Option.of(pendingRollbackInfo.get().getRollbackPlan());
- rollbackInstantTime =
pendingRollbackInfo.get().getRollbackInstant().requestedTime();
+ rollbackInstantTimeOpt =
Option.of(pendingRollbackInfo.get().getRollbackInstant().requestedTime());
} else {
- if (commitInstantOpt.isEmpty()) {
- log.error("Cannot find instant {} in the timeline of table {} for
rollback", commitInstantTime, config.getBasePath());
- return false;
- }
if (!skipLocking) {
txnManager.beginStateChange(Option.empty(), Option.empty());
}
try {
- rollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() ->
createNewInstantTime(false));
- rollbackPlanOption = table.scheduleRollback(context,
rollbackInstantTime, commitInstantOpt.get(), false,
config.shouldRollbackUsingMarkers(), false);
+ if (config.isExclusiveRollbackEnabled()) {
+ // Reload meta client within the lock so that the timeline is
latest while executing pending rollback
+ table.getMetaClient().reloadActiveTimeline();
+ Option<HoodiePendingRollbackInfo> pendingRollbackOpt =
getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
+ rollbackInstantTimeOpt = pendingRollbackOpt.map(info ->
info.getRollbackInstant().requestedTime());
+ if (pendingRollbackOpt.isPresent()) {
+ // If pending rollback and heartbeat is expired, writer should
start heartbeat and execute rollback
+ if
(heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) {
+ LOG.info("Heartbeat expired for rollback instant {}, executing
rollback now", rollbackInstantTimeOpt);
Review Comment:
<a href="#"><img alt="P0"
src="https://greptile-static-assets.s3.amazonaws.com/badges/p0.svg?v=7"
align="top"></a> **Compilation error: `LOG` is undefined — should be `log`**
The class is annotated with `@Slf4j`, which generates a field named `log`
(lowercase). There is no `LOG` field defined here or in any parent class
(`BaseHoodieClient` also uses `log`). This line will fail to compile.
```suggestion
log.info("Heartbeat expired for rollback instant {},
executing rollback now", rollbackInstantTimeOpt);
```
— *Greptile*
([original](https://github.com/yihua/hudi/pull/34#discussion_r3067036686))
(source:comment#3067036686)
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -1160,25 +1160,59 @@ public boolean rollback(final String commitInstantTime,
Option<HoodiePendingRoll
final Timer.Context timerContext = this.metrics.getRollbackCtx();
try {
HoodieTable table = createTable(config, storageConf, skipVersionCheck);
+
Option<HoodieInstant> commitInstantOpt =
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstantsAsStream()
.filter(instant -> EQUALS.test(instant.requestedTime(),
commitInstantTime))
.findFirst());
- Option<HoodieRollbackPlan> rollbackPlanOption;
- String rollbackInstantTime;
- if (pendingRollbackInfo.isPresent()) {
+ Option<HoodieRollbackPlan> rollbackPlanOption = Option.empty();
+ Option<String> rollbackInstantTimeOpt;
+ if (!config.isExclusiveRollbackEnabled() &&
pendingRollbackInfo.isPresent()) {
+ // Only case when lock can be skipped is if exclusive rollback is
disabled and
+ // there is a pending rollback info available
rollbackPlanOption =
Option.of(pendingRollbackInfo.get().getRollbackPlan());
- rollbackInstantTime =
pendingRollbackInfo.get().getRollbackInstant().requestedTime();
+ rollbackInstantTimeOpt =
Option.of(pendingRollbackInfo.get().getRollbackInstant().requestedTime());
} else {
- if (commitInstantOpt.isEmpty()) {
- log.error("Cannot find instant {} in the timeline of table {} for
rollback", commitInstantTime, config.getBasePath());
- return false;
- }
if (!skipLocking) {
txnManager.beginStateChange(Option.empty(), Option.empty());
}
try {
- rollbackInstantTime = suppliedRollbackInstantTime.orElseGet(() ->
createNewInstantTime(false));
- rollbackPlanOption = table.scheduleRollback(context,
rollbackInstantTime, commitInstantOpt.get(), false,
config.shouldRollbackUsingMarkers(), false);
+ if (config.isExclusiveRollbackEnabled()) {
+ // Reload meta client within the lock so that the timeline is
latest while executing pending rollback
+ table.getMetaClient().reloadActiveTimeline();
+ Option<HoodiePendingRollbackInfo> pendingRollbackOpt =
getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
+ rollbackInstantTimeOpt = pendingRollbackOpt.map(info ->
info.getRollbackInstant().requestedTime());
+ if (pendingRollbackOpt.isPresent()) {
+ // If pending rollback and heartbeat is expired, writer should
start heartbeat and execute rollback
+ if
(heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) {
+ LOG.info("Heartbeat expired for rollback instant {}, executing
rollback now", rollbackInstantTimeOpt);
+ HeartbeatUtils.deleteHeartbeatFile(storage, basePath,
rollbackInstantTimeOpt.get(), config);
+ heartbeatClient.start(rollbackInstantTimeOpt.get());
+ rollbackPlanOption =
pendingRollbackOpt.map(HoodiePendingRollbackInfo::getRollbackPlan);
Review Comment:
_⚠️ Potential issue_ | _🟠 Major_
**Always stop the rollback heartbeat when this writer started it.**
If `scheduleRollback(...)` or `table.rollback(...)` throws after
`heartbeatClient.start(...)`, the heartbeat never gets stopped. In the
exclusive path that leaves a live rollback heartbeat behind, so other writers
will keep deferring until it expires.
<details>
<summary>Proposed fix</summary>
```diff
Option<HoodieRollbackPlan> rollbackPlanOption = Option.empty();
Option<String> rollbackInstantTimeOpt;
+ boolean startedRollbackHeartbeat = false;
@@
if
(heartbeatClient.isHeartbeatExpired(rollbackInstantTimeOpt.get())) {
LOG.info("Heartbeat expired for rollback instant {},
executing rollback now", rollbackInstantTimeOpt);
HeartbeatUtils.deleteHeartbeatFile(storage, basePath,
rollbackInstantTimeOpt.get(), config);
heartbeatClient.start(rollbackInstantTimeOpt.get());
+ startedRollbackHeartbeat = true;
rollbackPlanOption =
pendingRollbackOpt.map(HoodiePendingRollbackInfo::getRollbackPlan);
@@
rollbackInstantTimeOpt = suppliedRollbackInstantTime.or(() ->
Option.of(createNewInstantTime(false)));
heartbeatClient.start(rollbackInstantTimeOpt.get());
+ startedRollbackHeartbeat = true;
rollbackPlanOption = table.scheduleRollback(context,
rollbackInstantTimeOpt.get(), commitInstantOpt.get(), false,
config.shouldRollbackUsingMarkers(), false);
@@
- HoodieRollbackMetadata rollbackMetadata =
commitInstantOpt.isPresent()
- ? table.rollback(context, rollbackInstantTimeOpt.get(),
commitInstantOpt.get(), true, skipLocking)
- : table.rollback(context, rollbackInstantTimeOpt.get(),
table.getMetaClient().createNewInstant(
- HoodieInstant.State.INFLIGHT,
rollbackPlanOption.get().getInstantToRollback().getAction(), commitInstantTime),
- false, skipLocking);
- if (timerContext != null) {
- long durationInMs = metrics.getDurationInMs(timerContext.stop());
- metrics.updateRollbackMetrics(durationInMs,
rollbackMetadata.getTotalFilesDeleted());
- }
- if (config.isExclusiveRollbackEnabled()) {
- heartbeatClient.stop(rollbackInstantTimeOpt.get());
- }
- return true;
+ try {
+ HoodieRollbackMetadata rollbackMetadata =
commitInstantOpt.isPresent()
+ ? table.rollback(context, rollbackInstantTimeOpt.get(),
commitInstantOpt.get(), true, skipLocking)
+ : table.rollback(context, rollbackInstantTimeOpt.get(),
table.getMetaClient().createNewInstant(
+ HoodieInstant.State.INFLIGHT,
rollbackPlanOption.get().getInstantToRollback().getAction(), commitInstantTime),
+ false, skipLocking);
+ if (timerContext != null) {
+ long durationInMs =
metrics.getDurationInMs(timerContext.stop());
+ metrics.updateRollbackMetrics(durationInMs,
rollbackMetadata.getTotalFilesDeleted());
+ }
+ return true;
+ } finally {
+ if (config.isExclusiveRollbackEnabled() &&
startedRollbackHeartbeat && rollbackInstantTimeOpt.isPresent()) {
+ heartbeatClient.stop(rollbackInstantTimeOpt.get());
+ }
+ }
```
</details>
Also applies to: 1203-1205, 1230-1241
<details>
<summary>🤖 Prompt for AI Agents</summary>
```
Verify each finding against the current code and only fix it if needed.
In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java`
around lines 1184 - 1190, The rollback heartbeat started via
heartbeatClient.start(rollbackInstantTimeOpt.get()) in
BaseHoodieTableServiceClient may be left running if subsequent calls (e.g.,
scheduleRollback(...) or table.rollback(...)) throw; wrap the sequence that
starts the heartbeat and then performs rollback-scheduling/execution in a
try/finally (or catch and rethrow) to ensure
heartbeatClient.stop(rollbackInstantTimeOpt.get()) is always called on
failure.
Apply the same pattern to the other similar blocks referenced (around the
instances that call heartbeatClient.start(...) at the locations
corresponding to
lines ~1203-1205 and ~1230-1241) so any early exception will stop the
rollback
heartbeat before propagating the error.
```
</details>
<!--
fingerprinting:phantom:medusa:grasshopper:689a17c4-94cb-4ee4-8156-feeb68c2f992
-->
<!-- This is an auto-generated comment by CodeRabbit -->
— *CodeRabbit*
([original](https://github.com/yihua/hudi/pull/34#discussion_r3067036671))
(source:comment#3067036671)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]