yihua commented on code in PR #18279:
URL: https://github.com/apache/hudi/pull/18279#discussion_r3047399149
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.java:
##########
@@ -119,72 +115,15 @@ List<Pair<String, HoodieRollbackStat>>
maybeDeleteAndCollectStats(HoodieEngineCo
HoodieInstant instantToRollback,
List<SerializableHoodieRollbackRequest> rollbackRequests,
boolean
doDelete, int numPartitions) {
- // The rollback requests for append only exist in table version 6 and
below which require groupBy
- List<SerializableHoodieRollbackRequest> processedRollbackRequests =
-
metaClient.getTableConfig().getTableVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT)
- ? rollbackRequests
- :
groupSerializableRollbackRequestsBasedOnFileGroup(rollbackRequests);
- final TaskContextSupplier taskContextSupplier =
context.getTaskContextSupplier();
- return context.flatMap(processedRollbackRequests,
(SerializableFunction<SerializableHoodieRollbackRequest, Stream<Pair<String,
HoodieRollbackStat>>>) rollbackRequest -> {
+ return context.flatMap(rollbackRequests,
(SerializableFunction<SerializableHoodieRollbackRequest, Stream<Pair<String,
HoodieRollbackStat>>>) rollbackRequest -> {
List<String> filesToBeDeleted = rollbackRequest.getFilesToBeDeleted();
if (!filesToBeDeleted.isEmpty()) {
List<HoodieRollbackStat> rollbackStats = deleteFiles(metaClient,
filesToBeDeleted, doDelete);
return rollbackStats.stream().map(entry ->
Pair.of(entry.getPartitionPath(), entry));
- } else if (!rollbackRequest.getLogBlocksToBeDeleted().isEmpty()) {
- HoodieLogFormat.Writer writer = null;
- final StoragePath filePath;
- try {
- String fileId = rollbackRequest.getFileId();
- HoodieTableVersion tableVersion =
metaClient.getTableConfig().getTableVersion();
-
- writer = HoodieLogFormat.newWriterBuilder()
-
.onParentPath(FSUtils.constructAbsolutePath(metaClient.getBasePath(),
rollbackRequest.getPartitionPath()))
- .withFileId(fileId)
-
.withLogWriteToken(CommonClientUtils.generateWriteToken(taskContextSupplier))
-
.withInstantTime(tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)
- ? instantToRollback.requestedTime() :
rollbackRequest.getLatestBaseInstant()
- )
- .withStorage(metaClient.getStorage())
- .withTableVersion(tableVersion)
- .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
-
- // generate metadata
- if (doDelete) {
- Map<HoodieLogBlock.HeaderMetadataType, String> header =
generateHeader(instantToRollback.requestedTime());
- // if update belongs to an existing log file
- // use the log file path from AppendResult in case the file handle
may roll over
- filePath = writer.appendBlock(new
HoodieCommandBlock(header)).logFile().getPath();
- } else {
- filePath = writer.getLogFile().getPath();
- }
- } catch (IOException | InterruptedException io) {
- throw new HoodieRollbackException("Failed to rollback for instant "
+ instantToRollback, io);
- } finally {
- try {
- if (writer != null) {
- writer.close();
- }
- } catch (IOException io) {
- throw new HoodieIOException("Error appending rollback block", io);
- }
- }
-
- // This step is intentionally done after writer is closed. Guarantees
that
- // getFileStatus would reflect correct stats and FileNotFoundException
is not thrown in
- // cloud-storage : HUDI-168
- Map<StoragePathInfo, Long> filesToNumBlocksRollback =
Collections.singletonMap(
-
metaClient.getStorage().getPathInfo(Objects.requireNonNull(filePath)),
- 1L
- );
-
- return Stream.of(
- Pair.of(rollbackRequest.getPartitionPath(),
- HoodieRollbackStat.newBuilder()
- .withPartitionPath(rollbackRequest.getPartitionPath())
-
.withRollbackBlockAppendResults(filesToNumBlocksRollback)
- .build()));
} else {
- // no action needed.
+ checkArgument(rollbackRequest.getLogBlocksToBeDeleted().isEmpty(),
+ "V8+ rollback should not have logBlocksToBeDeleted, but found for
partition: "
+ + rollbackRequest.getPartitionPath() + ", fileId: " +
rollbackRequest.getFileId());
Review Comment:
_⚠️ Potential issue_ | _🟠 Major_
**`collectRollbackStats()` now bypasses the V6 rollback path.**
`RollbackHelperV1` only overloads `maybeDeleteAndCollectStats(...)` with the
extra `instantTime` parameter. The inherited `collectRollbackStats(...)` still
calls this base implementation, so any V6 request with `logBlocksToBeDeleted`
will now hit Line 124's `checkArgument(...)` instead of the V1 rollback-block
flow. That breaks the dry-run / upgrade-downgrade stats path for V6 tables.
Please override `collectRollbackStats(...)` in `RollbackHelperV1` or refactor
the helper signature so the V1 implementation is used for `doDelete=false` as
well.
<details>
<summary>🤖 Prompt for AI Agents</summary>
```
Verify each finding against the current code and only fix it if needed.
In
`@hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.java`
around lines 118 - 126, The V6 path's collectRollbackStats is incorrectly
invoking the base implementation and hitting the checkArgument for V6
requests
with logBlocksToBeDeleted; fix by overriding collectRollbackStats(...) in
RollbackHelperV1 so it delegates to the V1-specific
maybeDeleteAndCollectStats(...) overload that accepts instantTime (or
refactor
the helper signature) and ensure the V1 logic is used for doDelete=false
(dry-run/upgrade-downgrade) cases instead of the base implementation that
throws
on logBlocksToBeDeleted.
```
</details>
<!--
fingerprinting:phantom:medusa:grasshopper:c7349b04-2a27-4782-b775-7a05dfc88337
-->
<!-- This is an auto-generated comment by CodeRabbit -->
— *CodeRabbit*
([original](https://github.com/yihua/hudi/pull/24#discussion_r3047397943))
(source:comment#3047397943)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]