nbalajee commented on code in PR #18279:
URL: https://github.com/apache/hudi/pull/18279#discussion_r3047929388
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.java:
##########
@@ -229,6 +227,9 @@ protected List<HoodieRollbackStat>
deleteFiles(HoodieTableMetaClient metaClient,
}).collect(Collectors.toList());
}
+ /**
+ * Generates the header for a rollback command block. Used by {@link
RollbackHelperV1} for V6 rollback.
+ */
Review Comment:
DOne.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.java:
##########
@@ -194,6 +133,65 @@ List<Pair<String, HoodieRollbackStat>>
maybeDeleteAndCollectStats(HoodieEngineCo
}, numPartitions);
}
+ /**
+ * Builds the lookup key for pre-computed log versions. Used by {@link
RollbackHelperV1} for V6 rollback.
+ */
+ protected static String logVersionLookupKey(String partitionPath, String
fileId, String commitTime) {
+ return partitionPath + "|" + fileId + "|" + commitTime;
+ }
+
+ /**
+ * Pre-compute the latest log version for each (partition, fileId,
deltaCommitTime) tuple
+ * by listing each unique partition directory once. This replaces N
per-request listing
+ * calls (one per rollback request) with P per-partition listings (where P
is much less than N).
+ *
+ * <p>Used by {@link RollbackHelperV1} for V6 rollback where log blocks are
appended.
+ */
+ protected Map<String, Pair<Integer, String>> preComputeLogVersions(
Review Comment:
Moved the following functions, with visibility updated to private (from
protected):
preComputeLogVersions(), logVersionLookupKey(), generateHeader()
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.java:
##########
@@ -119,72 +115,15 @@ List<Pair<String, HoodieRollbackStat>>
maybeDeleteAndCollectStats(HoodieEngineCo
HoodieInstant instantToRollback,
List<SerializableHoodieRollbackRequest> rollbackRequests,
boolean
doDelete, int numPartitions) {
- // The rollback requests for append only exist in table version 6 and
below which require groupBy
- List<SerializableHoodieRollbackRequest> processedRollbackRequests =
-
metaClient.getTableConfig().getTableVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT)
- ? rollbackRequests
- :
groupSerializableRollbackRequestsBasedOnFileGroup(rollbackRequests);
- final TaskContextSupplier taskContextSupplier =
context.getTaskContextSupplier();
- return context.flatMap(processedRollbackRequests,
(SerializableFunction<SerializableHoodieRollbackRequest, Stream<Pair<String,
HoodieRollbackStat>>>) rollbackRequest -> {
+ return context.flatMap(rollbackRequests,
(SerializableFunction<SerializableHoodieRollbackRequest, Stream<Pair<String,
HoodieRollbackStat>>>) rollbackRequest -> {
List<String> filesToBeDeleted = rollbackRequest.getFilesToBeDeleted();
if (!filesToBeDeleted.isEmpty()) {
List<HoodieRollbackStat> rollbackStats = deleteFiles(metaClient,
filesToBeDeleted, doDelete);
return rollbackStats.stream().map(entry ->
Pair.of(entry.getPartitionPath(), entry));
- } else if (!rollbackRequest.getLogBlocksToBeDeleted().isEmpty()) {
- HoodieLogFormat.Writer writer = null;
- final StoragePath filePath;
- try {
- String fileId = rollbackRequest.getFileId();
- HoodieTableVersion tableVersion =
metaClient.getTableConfig().getTableVersion();
-
- writer = HoodieLogFormat.newWriterBuilder()
-
.onParentPath(FSUtils.constructAbsolutePath(metaClient.getBasePath(),
rollbackRequest.getPartitionPath()))
- .withFileId(fileId)
-
.withLogWriteToken(CommonClientUtils.generateWriteToken(taskContextSupplier))
-
.withInstantTime(tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)
- ? instantToRollback.requestedTime() :
rollbackRequest.getLatestBaseInstant()
- )
- .withStorage(metaClient.getStorage())
- .withTableVersion(tableVersion)
- .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
-
- // generate metadata
- if (doDelete) {
- Map<HoodieLogBlock.HeaderMetadataType, String> header =
generateHeader(instantToRollback.requestedTime());
- // if update belongs to an existing log file
- // use the log file path from AppendResult in case the file handle
may roll over
- filePath = writer.appendBlock(new
HoodieCommandBlock(header)).logFile().getPath();
- } else {
- filePath = writer.getLogFile().getPath();
- }
- } catch (IOException | InterruptedException io) {
- throw new HoodieRollbackException("Failed to rollback for instant "
+ instantToRollback, io);
- } finally {
- try {
- if (writer != null) {
- writer.close();
- }
- } catch (IOException io) {
- throw new HoodieIOException("Error appending rollback block", io);
- }
- }
-
- // This step is intentionally done after writer is closed. Guarantees
that
- // getFileStatus would reflect correct stats and FileNotFoundException
is not thrown in
- // cloud-storage : HUDI-168
- Map<StoragePathInfo, Long> filesToNumBlocksRollback =
Collections.singletonMap(
-
metaClient.getStorage().getPathInfo(Objects.requireNonNull(filePath)),
- 1L
- );
-
- return Stream.of(
- Pair.of(rollbackRequest.getPartitionPath(),
- HoodieRollbackStat.newBuilder()
- .withPartitionPath(rollbackRequest.getPartitionPath())
-
.withRollbackBlockAppendResults(filesToNumBlocksRollback)
- .build()));
} else {
- // no action needed.
+ checkArgument(rollbackRequest.getLogBlocksToBeDeleted().isEmpty(),
+ "V8+ rollback should not have logBlocksToBeDeleted, but found for
partition: "
+ + rollbackRequest.getPartitionPath() + ", fileId: " +
rollbackRequest.getFileId());
Review Comment:
Created a @override for collectRollbackStats() to call the V1-specific
6-param maybeDeleteAndCollectStats with doDelete=false. For the instantTime
parameter (needed by marker initialization), using a EMPTY_STRING placeholder
since doDelete=false never triggers marker creation
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]