dramaticlly commented on code in PR #13720:
URL: https://github.com/apache/iceberg/pull/13720#discussion_r2287136446
##########
core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java:
##########
@@ -335,7 +393,9 @@ public static RewriteResult<DataFile> rewriteDataManifest(
* @param sourcePrefix source prefix that will be replaced
* @param targetPrefix target prefix that will replace it
* @return a copy plan of content files in the manifest that was rewritten
+ * @deprecated since 1.10.0, will be removed in 1.11.0
*/
+ @Deprecated
Review Comment:
@vaultah normally we will need to go through the deprecation cycle for
public method, but as Steven suggested, both of these methods (LINE 398 and
LINE 521) are part of
https://github.com/apache/iceberg/commit/62d9ff5d043a5571efe020b9177998ae763a41a0
which never gets released. So we can actually remove these directly.
##########
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java:
##########
@@ -494,77 +478,128 @@ public RewriteContentFileResult
appendDeleteFile(RewriteResult<DeleteFile> r1) {
}
}
- /** Rewrite manifest files in a distributed manner and return rewritten data
files path pairs. */
- private RewriteContentFileResult rewriteManifests(
+ private static class ManifestsRewriteResult {
+ private final RewriteContentFileResult contentFileResult;
+ private final Map<String, Long> rewrittenManifests;
+
+ ManifestsRewriteResult(
+ RewriteContentFileResult contentFileResult, Map<String, Long>
rewrittenManifests) {
+ this.contentFileResult = contentFileResult;
+ this.rewrittenManifests = rewrittenManifests;
+ }
+
+ public RewriteContentFileResult getContentFileResult() {
+ return contentFileResult;
+ }
+
+ public Map<String, Long> getRewrittenManifests() {
+ return rewrittenManifests;
+ }
+ }
+
+ /**
+ * Rewrite manifest files in a distributed manner and return the resulting
manifests and content
+ * files selected for rewriting.
+ */
+ private ManifestsRewriteResult rewriteManifests(
Set<Snapshot> deltaSnapshots, TableMetadata tableMetadata,
Set<ManifestFile> toRewrite) {
if (toRewrite.isEmpty()) {
- return new RewriteContentFileResult();
+ return new ManifestsRewriteResult(new RewriteContentFileResult(),
Maps.newHashMap());
}
Encoder<ManifestFile> manifestFileEncoder =
Encoders.javaSerialization(ManifestFile.class);
+ Encoder<RewriteContentFileResult> contentResultEncoder =
+ Encoders.javaSerialization(RewriteContentFileResult.class);
+ Encoder<Tuple3<String, Long, RewriteContentFileResult>> tupleEncoder =
+ Encoders.tuple(Encoders.STRING(), Encoders.LONG(),
contentResultEncoder);
+
Dataset<ManifestFile> manifestDS =
spark().createDataset(Lists.newArrayList(toRewrite),
manifestFileEncoder);
Set<Long> deltaSnapshotIds =
deltaSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
- return manifestDS
- .repartition(toRewrite.size())
- .map(
- toManifests(
- tableBroadcast(),
- sparkContext().broadcast(deltaSnapshotIds),
- stagingDir,
- tableMetadata.formatVersion(),
- sourcePrefix,
- targetPrefix),
- Encoders.bean(RewriteContentFileResult.class))
- // duplicates are expected here as the same data file can have
different statuses
- // (e.g. added and deleted)
- .reduce((ReduceFunction<RewriteContentFileResult>)
RewriteContentFileResult::append);
- }
-
- private static MapFunction<ManifestFile, RewriteContentFileResult>
toManifests(
- Broadcast<Table> table,
- Broadcast<Set<Long>> deltaSnapshotIds,
- String stagingLocation,
- int format,
- String sourcePrefix,
- String targetPrefix) {
+ RewriteContentFileResult finalContentResult = new
RewriteContentFileResult();
+ Iterator<Tuple3<String, Long, RewriteContentFileResult>> resultIterator =
+ manifestDS
+ .repartition(toRewrite.size())
+ .map(
+ toManifests(
+ tableBroadcast(),
+ sparkContext().broadcast(deltaSnapshotIds),
+ stagingDir,
+ tableMetadata.formatVersion(),
+ sourcePrefix,
+ targetPrefix),
+ tupleEncoder)
+ .toLocalIterator();
+
+ Map<String, Long> rewrittenManifests = Maps.newHashMap();
+
+ while (resultIterator.hasNext()) {
+ Tuple3<String, Long, RewriteContentFileResult> resultTuple =
resultIterator.next();
+ String originalManifestPath = resultTuple._1();
+ Long rewrittenManifestLength = resultTuple._2();
+ RewriteContentFileResult contentFileResult = resultTuple._3();
+ String stagingManifestPath =
+ RewriteTablePathUtil.stagingPath(originalManifestPath, sourcePrefix,
stagingDir);
+ String targetManifestPath =
+ RewriteTablePathUtil.newPath(originalManifestPath, sourcePrefix,
targetPrefix);
+
+ finalContentResult.append(contentFileResult);
+ finalContentResult.copyPlan().add(Pair.of(stagingManifestPath,
targetManifestPath));
+ rewrittenManifests.put(originalManifestPath, rewrittenManifestLength);
+ }
+
+ return new ManifestsRewriteResult(finalContentResult, rewrittenManifests);
+ }
+
+ private static MapFunction<ManifestFile, Tuple3<String, Long,
RewriteContentFileResult>>
+ toManifests(
+ Broadcast<Table> table,
+ Broadcast<Set<Long>> deltaSnapshotIds,
+ String stagingLocation,
+ int format,
+ String sourcePrefix,
+ String targetPrefix) {
return manifestFile -> {
- RewriteContentFileResult result = new RewriteContentFileResult();
switch (manifestFile.content()) {
case DATA:
- result.appendDataFile(
+ Pair<Long, RewriteResult<DataFile>> dataManifestResult =
writeDataManifest(
manifestFile,
table,
deltaSnapshotIds,
stagingLocation,
format,
sourcePrefix,
- targetPrefix));
- break;
+ targetPrefix);
+ return Tuple3.apply(
+ manifestFile.path(),
Review Comment:
I think we can give this a try
##########
core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java:
##########
@@ -414,7 +516,9 @@ public static RewriteResult<DeleteFile>
rewriteDeleteManifest(
* @param stagingLocation staging location for rewritten files (referred
delete file will be
* rewritten here)
* @return a copy plan of content files in the manifest that was rewritten
+ * @deprecated since 1.10.0, will be removed in 1.11.0
*/
+ @Deprecated
Review Comment:
agreed!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]