yihua commented on code in PR #18279:
URL: https://github.com/apache/hudi/pull/18279#discussion_r3048275546
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelper.java:
##########
@@ -146,14 +151,25 @@ List<Pair<String, HoodieRollbackStat>>
maybeDeleteAndCollectStats(HoodieEngineCo
)
.withStorage(metaClient.getStorage())
.withTableVersion(tableVersion)
- .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
+ .withFileExtension(HoodieLogFile.DELTA_EXTENSION);
+
Review Comment:
🤖 Agree with this observation. I traced the code paths and confirmed: for
V8+, `getLogBlocksToBeDeleted()` is always empty in `RollbackHelper` because
(a) marker-based rollbacks use `IOType.CREATE` for log files, and (b)
listing-based rollbacks guard with `isTableVersionLessThanEight`. The latest
revision correctly removes this dead code from `RollbackHelper` and keeps the
log-block-append logic only in `RollbackHelperV1`.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelperV1.java:
##########
@@ -168,6 +255,23 @@ public List<HoodieRollbackStat>
performRollback(HoodieEngineContext context, Str
return addLogFilesFromPreviousFailedRollbacksToStat(context,
mergedRollbackStatByPartitionPath, logPaths);
}
+ /**
+ * Collect all file info that needs to be rolled back, using the V1-specific
+ * 6-param {@code maybeDeleteAndCollectStats} so V6 log-block requests are
handled correctly.
+ */
+ @Override
+ public List<HoodieRollbackStat> collectRollbackStats(HoodieEngineContext
context, HoodieInstant instantToRollback,
+
List<HoodieRollbackRequest> rollbackRequests) {
+ int parallelism = Math.max(Math.min(rollbackRequests.size(),
config.getRollbackParallelism()), 1);
+ context.setJobStatus(this.getClass().getSimpleName(), "Collect rollback
stats: " + config.getTableName());
+ List<SerializableHoodieRollbackRequest> serializableRequests =
rollbackRequests.stream()
+
.map(SerializableHoodieRollbackRequest::new).collect(Collectors.toList());
+ return context.reduceByKey(
+ maybeDeleteAndCollectStats(context, EMPTY_STRING, instantToRollback,
+ serializableRequests, false, parallelism),
Review Comment:
🤖 The new `collectRollbackStats` override passes `EMPTY_STRING` as
`instantTime` to `maybeDeleteAndCollectStats`. This `instantTime` is used at
L207 to create `WriteMarkers`, and the `withFileCreationCallback` at L217
creates markers under `.hoodie/.temp/<instantTime>/...`. With empty string,
markers end up under a malformed path (`.hoodie/.temp//...`). Could you pass
`instantToRollback.requestedTime()` instead, or avoid creating markers when
`doDelete=false`?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelperV1.java:
##########
@@ -80,6 +81,92 @@ public RollbackHelperV1(HoodieTable table, HoodieWriteConfig
config) {
super(table, config);
}
+ /**
+ * Builds the lookup key for pre-computed log versions.
+ */
+ static String logVersionLookupKey(String partitionPath, String fileId,
String commitTime) {
+ return partitionPath + "|" + fileId + "|" + commitTime;
+ }
+
+ /**
+ * Pre-compute the latest log version for each (partition, fileId,
deltaCommitTime) tuple
+ * by listing each unique partition directory once. This replaces N
per-request listing
+ * calls (one per rollback request) with P per-partition listings (where P
is much less than N).
+ *
+ * <p>For file groups with no existing log files in a successfully listed
partition, a sentinel
+ * of (LOGFILE_BASE_VERSION, UNKNOWN_WRITE_TOKEN) is inserted so the caller
avoids a redundant
+ * per-request listing. If a partition listing fails (IOException), no
sentinels are inserted
+ * and the caller falls back to per-request listing naturally.
+ */
+ Map<String, Pair<Integer, String>> preComputeLogVersions(
+ List<SerializableHoodieRollbackRequest> rollbackRequests) {
+ List<SerializableHoodieRollbackRequest> logBlockRequests =
rollbackRequests.stream()
+ .filter(req -> !req.getLogBlocksToBeDeleted().isEmpty())
+ .collect(Collectors.toList());
+
+ if (logBlockRequests.isEmpty()) {
+ return Collections.emptyMap();
+ }
+
+ Map<String, Set<String>> expectedKeysByPartition = new HashMap<>();
+ for (SerializableHoodieRollbackRequest req : logBlockRequests) {
+ String key = logVersionLookupKey(req.getPartitionPath(),
req.getFileId(), req.getLatestBaseInstant());
+ expectedKeysByPartition.computeIfAbsent(req.getPartitionPath(), k -> new
HashSet<>()).add(key);
+ }
+
+ log.info("Pre-computing log versions for {} partition(s) to avoid
per-request listStatus calls",
+ expectedKeysByPartition.size());
+
+ Map<String, Pair<Integer, String>> logVersionMap = new HashMap<>();
+
+ for (Map.Entry<String, Set<String>> entry :
expectedKeysByPartition.entrySet()) {
Review Comment:
🤖 The listing filter uses
`path.getName().contains(HoodieLogFile.DELTA_EXTENSION)` which matches any file
name containing `.log` — this could include files like `.log.crc` or
`.log.backup`. The `InvalidHoodiePathException` catch handles most of these,
but have you considered using a stricter filter (e.g., `endsWith` or a regex)
to avoid parsing non-log files in the first place?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]