dramaticlly commented on code in PR #13720:
URL: https://github.com/apache/iceberg/pull/13720#discussion_r2292030624
##########
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java:
##########
@@ -281,24 +281,32 @@ private String rebuildMetadata() {
// rebuild version files
RewriteResult<Snapshot> rewriteVersionResult =
rewriteVersionFiles(endMetadata);
Set<Snapshot> deltaSnapshots = deltaSnapshots(startMetadata,
rewriteVersionResult.toRewrite());
-
- Set<String> manifestsToRewrite = manifestsToRewrite(deltaSnapshots,
startMetadata);
Set<Snapshot> validSnapshots =
Sets.difference(snapshotSet(endMetadata), snapshotSet(startMetadata));
+ // rebuild manifest files
+ Set<ManifestFile> manifestsToRewrite = manifestsToRewrite(validSnapshots);
+
+ Map<String, RewriteContentFileResult> rewriteManifestResult =
+ rewriteManifests(deltaSnapshots, endMetadata, manifestsToRewrite);
+
+ RewriteContentFileResult allManifestsResult = new
RewriteContentFileResult();
+ Map<String, Long> rewrittenManifestLengths = Maps.newHashMap();
+ rewriteManifestResult.forEach(
+ (path, rewriteResult) -> {
+ rewrittenManifestLengths.put(path, rewriteResult.size());
+ allManifestsResult.append(rewriteResult);
+ });
+
// rebuild manifest-list files
RewriteResult<ManifestFile> rewriteManifestListResult =
validSnapshots.stream()
- .map(snapshot -> rewriteManifestList(snapshot, endMetadata,
manifestsToRewrite))
+ .map(snapshot -> rewriteManifestList(snapshot, endMetadata,
rewrittenManifestLengths))
.reduce(new RewriteResult<>(), RewriteResult::append);
- // rebuild manifest files
- RewriteContentFileResult rewriteManifestResult =
- rewriteManifests(deltaSnapshots, endMetadata,
rewriteManifestListResult.toRewrite());
-
// rebuild position delete files
Set<DeleteFile> deleteFiles =
- rewriteManifestResult.toRewrite().stream()
+ allManifestsResult.toRewrite().stream()
Review Comment:
I think having one iteration on rewriteManifestResult to build both
aggregated results and length mapping helps but it's a bit hard to follow the
control flow. Here's what I have in mind
```java
// Extract manifest file sizes for manifest list rewriting
Map<String, Long> rewrittenManifestLengths =
rewriteManifestResult.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry ->
entry.getValue().size()));
// rebuild manifest-list files
RewriteResult<ManifestFile> rewriteManifestListResult =
validSnapshots.stream()
.map(snapshot -> rewriteManifestList(snapshot, endMetadata,
rewrittenManifestLengths))
.reduce(new RewriteResult<>(), RewriteResult::append);
// Aggregate all manifest rewrite results
RewriteContentFileResult allManifestsResult =
rewriteManifestResult.values().stream()
.reduce(new RewriteContentFileResult(),
RewriteContentFileResult::append);
// rebuild position delete files
Set<DeleteFile> deleteFiles =
allManifestsResult.toRewrite().stream()
.filter(e -> e instanceof DeleteFile)
.map(e -> (DeleteFile) e)
.collect(Collectors.toSet());
rewritePositionDeletes(endMetadata, deleteFiles);
```
I moved allManifestsResult later where it's being used in rebuild the
position delete files. Apart from what this PR to fix for manifest length in
manifest-list, the length of position delete in delete manifests also needs to
be fixed in a separate change where we might need to move this block before
rebuild manifest.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]