dramaticlly commented on code in PR #13720:
URL: https://github.com/apache/iceberg/pull/13720#discussion_r2288998248


##########
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java:
##########
@@ -494,77 +478,128 @@ public RewriteContentFileResult 
appendDeleteFile(RewriteResult<DeleteFile> r1) {
     }
   }
 
-  /** Rewrite manifest files in a distributed manner and return rewritten data 
files path pairs. */
-  private RewriteContentFileResult rewriteManifests(
+  private static class ManifestsRewriteResult {
+    private final RewriteContentFileResult contentFileResult;
+    private final Map<String, Long> rewrittenManifests;
+
+    ManifestsRewriteResult(
+        RewriteContentFileResult contentFileResult, Map<String, Long> 
rewrittenManifests) {
+      this.contentFileResult = contentFileResult;
+      this.rewrittenManifests = rewrittenManifests;
+    }
+
+    public RewriteContentFileResult getContentFileResult() {
+      return contentFileResult;
+    }
+
+    public Map<String, Long> getRewrittenManifests() {
+      return rewrittenManifests;
+    }
+  }
+
+  /**
+   * Rewrite manifest files in a distributed manner and return the resulting 
manifests and content
+   * files selected for rewriting.
+   */
+  private ManifestsRewriteResult rewriteManifests(
       Set<Snapshot> deltaSnapshots, TableMetadata tableMetadata, 
Set<ManifestFile> toRewrite) {
     if (toRewrite.isEmpty()) {
-      return new RewriteContentFileResult();
+      return new ManifestsRewriteResult(new RewriteContentFileResult(), 
Maps.newHashMap());
     }
 
     Encoder<ManifestFile> manifestFileEncoder = 
Encoders.javaSerialization(ManifestFile.class);
+    Encoder<RewriteContentFileResult> contentResultEncoder =
+        Encoders.javaSerialization(RewriteContentFileResult.class);
+    Encoder<Tuple3<String, Long, RewriteContentFileResult>> tupleEncoder =
+        Encoders.tuple(Encoders.STRING(), Encoders.LONG(), 
contentResultEncoder);
+
     Dataset<ManifestFile> manifestDS =
         spark().createDataset(Lists.newArrayList(toRewrite), 
manifestFileEncoder);
     Set<Long> deltaSnapshotIds =
         
deltaSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
 
-    return manifestDS
-        .repartition(toRewrite.size())
-        .map(
-            toManifests(
-                tableBroadcast(),
-                sparkContext().broadcast(deltaSnapshotIds),
-                stagingDir,
-                tableMetadata.formatVersion(),
-                sourcePrefix,
-                targetPrefix),
-            Encoders.bean(RewriteContentFileResult.class))
-        // duplicates are expected here as the same data file can have 
different statuses
-        // (e.g. added and deleted)
-        .reduce((ReduceFunction<RewriteContentFileResult>) 
RewriteContentFileResult::append);
-  }
-
-  private static MapFunction<ManifestFile, RewriteContentFileResult> 
toManifests(
-      Broadcast<Table> table,
-      Broadcast<Set<Long>> deltaSnapshotIds,
-      String stagingLocation,
-      int format,
-      String sourcePrefix,
-      String targetPrefix) {
+    RewriteContentFileResult finalContentResult = new 
RewriteContentFileResult();
+    Iterator<Tuple3<String, Long, RewriteContentFileResult>> resultIterator =
+        manifestDS
+            .repartition(toRewrite.size())
+            .map(
+                toManifests(
+                    tableBroadcast(),
+                    sparkContext().broadcast(deltaSnapshotIds),
+                    stagingDir,
+                    tableMetadata.formatVersion(),
+                    sourcePrefix,
+                    targetPrefix),
+                tupleEncoder)
+            .toLocalIterator();
+
+    Map<String, Long> rewrittenManifests = Maps.newHashMap();
+
+    while (resultIterator.hasNext()) {
+      Tuple3<String, Long, RewriteContentFileResult> resultTuple = 
resultIterator.next();
+      String originalManifestPath = resultTuple._1();
+      Long rewrittenManifestLength = resultTuple._2();
+      RewriteContentFileResult contentFileResult = resultTuple._3();
+      String stagingManifestPath =
+          RewriteTablePathUtil.stagingPath(originalManifestPath, sourcePrefix, 
stagingDir);
+      String targetManifestPath =
+          RewriteTablePathUtil.newPath(originalManifestPath, sourcePrefix, 
targetPrefix);
+
+      finalContentResult.append(contentFileResult);
+      finalContentResult.copyPlan().add(Pair.of(stagingManifestPath, 
targetManifestPath));
+      rewrittenManifests.put(originalManifestPath, rewrittenManifestLength);
+    }
+
+    return new ManifestsRewriteResult(finalContentResult, rewrittenManifests);
+  }
+
+  private static MapFunction<ManifestFile, Tuple3<String, Long, 
RewriteContentFileResult>>
+      toManifests(
+          Broadcast<Table> table,
+          Broadcast<Set<Long>> deltaSnapshotIds,
+          String stagingLocation,
+          int format,
+          String sourcePrefix,
+          String targetPrefix) {
 
     return manifestFile -> {
-      RewriteContentFileResult result = new RewriteContentFileResult();
       switch (manifestFile.content()) {
         case DATA:
-          result.appendDataFile(
+          Pair<Long, RewriteResult<DataFile>> dataManifestResult =
               writeDataManifest(
                   manifestFile,
                   table,
                   deltaSnapshotIds,
                   stagingLocation,
                   format,
                   sourcePrefix,
-                  targetPrefix));
-          break;
+                  targetPrefix);
+          return Tuple3.apply(
+              manifestFile.path(),

Review Comment:
   I think some of the complexity of this can contribute to the fact that the 
size is treated outside the RewriteResult, so we are bundling tuple3 by 
ManifestFile in MappingFunction and unzip & reassembly in localIterator with 
necessary aggregation. 
   
   Can we explore on bundle the size in RewriteResult and see `Map<String, 
RewriteContentFileResult>` helps with such situation? 



##########
core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java:
##########
@@ -447,6 +551,58 @@ public static RewriteResult<DeleteFile> 
rewriteDeleteManifest(
     }
   }
 
+  /**
+   * Rewrite a delete manifest, replacing path references.
+   *
+   * @param manifestFile source delete manifest to rewrite
+   * @param snapshotIds snapshot ids for filtering returned delete manifest 
entries
+   * @param outputFile output file to rewrite manifest file to
+   * @param io file io
+   * @param format format of the manifest file
+   * @param specsById map of partition specs by id
+   * @param sourcePrefix source prefix that will be replaced
+   * @param targetPrefix target prefix that will replace it
+   * @param stagingLocation staging location for rewritten files (referred 
delete file will be
+   *     rewritten here)
+   * @return size of the resulting manifest file and a copy plan for the 
referenced content files
+   */
+  public static Pair<Long, RewriteResult<DeleteFile>> 
rewriteDeleteManifestWithResult(

Review Comment:
   I think there's value in this, as in this PR we are doing bottom first 
rewrite in order to have size, this differs from previous top down approach and 
result might need to change accordingly. We return `Pair<Long, 
RewriteResult<DeleteFile>>` when rewrite a result of delete manifests for both 
long and discovered list of deletes to copy and to rewrite, all of which belong 
to a single delete manifests. 
   
   For other top down rewrite such as given metadata.json to discover list of 
delta snapshots, size is unknown and we can leave it null. It's can be up to 
caller to leverage the size in the rewrite result. Let's give it a try and see 
if there's compatibility problem



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to