vaultah commented on code in PR #13720:
URL: https://github.com/apache/iceberg/pull/13720#discussion_r2298773311
##########
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java:
##########
@@ -494,36 +483,60 @@ public RewriteContentFileResult
appendDeleteFile(RewriteResult<DeleteFile> r1) {
}
}
- /** Rewrite manifest files in a distributed manner and return rewritten data
files path pairs. */
- private RewriteContentFileResult rewriteManifests(
+ /**
+ * Rewrite manifest files in a distributed manner and return the resulting
manifests and content
+ * files selected for rewriting.
+ */
+ private Map<String, RewriteContentFileResult> rewriteManifests(
Set<Snapshot> deltaSnapshots, TableMetadata tableMetadata,
Set<ManifestFile> toRewrite) {
if (toRewrite.isEmpty()) {
- return new RewriteContentFileResult();
+ return Maps.newHashMap();
}
Encoder<ManifestFile> manifestFileEncoder =
Encoders.javaSerialization(ManifestFile.class);
+ Encoder<RewriteContentFileResult> manifestResultEncoder =
+ Encoders.javaSerialization(RewriteContentFileResult.class);
+ Encoder<Tuple2<String, RewriteContentFileResult>> tupleEncoder =
+ Encoders.tuple(Encoders.STRING(), manifestResultEncoder);
+
Dataset<ManifestFile> manifestDS =
spark().createDataset(Lists.newArrayList(toRewrite),
manifestFileEncoder);
Set<Long> deltaSnapshotIds =
deltaSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
- return manifestDS
- .repartition(toRewrite.size())
- .map(
- toManifests(
- tableBroadcast(),
- sparkContext().broadcast(deltaSnapshotIds),
- stagingDir,
- tableMetadata.formatVersion(),
- sourcePrefix,
- targetPrefix),
- Encoders.bean(RewriteContentFileResult.class))
- // duplicates are expected here as the same data file can have
different statuses
- // (e.g. added and deleted)
- .reduce((ReduceFunction<RewriteContentFileResult>)
RewriteContentFileResult::append);
- }
-
- private static MapFunction<ManifestFile, RewriteContentFileResult>
toManifests(
+ Iterator<Tuple2<String, RewriteContentFileResult>> resultIterator =
+ manifestDS
+ .repartition(toRewrite.size())
+ .map(
+ toManifests(
+ tableBroadcast(),
+ sparkContext().broadcast(deltaSnapshotIds),
+ stagingDir,
+ tableMetadata.formatVersion(),
+ sourcePrefix,
+ targetPrefix),
+ tupleEncoder)
+ .toLocalIterator();
Review Comment:
Thanks for the feedback. I looked into using `Dataset.reduce` as you
suggested. The challenge is that its function signature requires the return
type to match the input `Tuple2`, so it can't build the `Map` directly. The way
to make it work with `reduce` would be to first map each `Tuple2` to a
single-entry `Map`. The subsequent `reduce` operation would then have to
serialize and shuffle all of those intermediate map objects, which seems
inefficient.
An alternative is to use `.javaRDD().aggregate()`. It can build the map on
each partition without creating all the intermediate map objects, which should
be more performant:
```
private static final Function2<
Map<String, RewriteContentFileResult>,
Tuple2<String, RewriteContentFileResult>,
Map<String, RewriteContentFileResult>>
seqOp =
(accumulator, resultTuple) -> {
String originalManifestPath = resultTuple._1();
RewriteContentFileResult manifestRewriteResult = resultTuple._2();
accumulator.put(originalManifestPath, manifestRewriteResult);
return accumulator;
};
private static final Function2<
Map<String, RewriteContentFileResult>,
Map<String, RewriteContentFileResult>,
Map<String, RewriteContentFileResult>>
combOp =
(map1, map2) -> {
map1.putAll(map2);
return map1;
};
Map<String, RewriteContentFileResult> zeroValue = Maps.newHashMap();
Map<String, RewriteContentFileResult> rewrittenManifests =
manifestDS
.repartition(toRewrite.size())
.map(
toManifests(
tableBroadcast(),
sparkContext().broadcast(deltaSnapshotIds),
stagingDir,
tableMetadata.formatVersion(),
sourcePrefix,
targetPrefix),
tupleEncoder)
.javaRDD()
.aggregate(zeroValue, seqOp, combOp);
```
I can refactor the code to use this `aggregate` pattern if that approach
looks good.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]