stevenzwu commented on code in PR #13720:
URL: https://github.com/apache/iceberg/pull/13720#discussion_r2291756180


##########
core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java:
##########
@@ -85,6 +86,14 @@ public Set<T> toRewrite() {
     public Set<Pair<String, String>> copyPlan() {
       return copyPlan;
     }
+
+    public Long size() {
+      return size;
+    }
+
+    public void setSize(long newSize) {

Review Comment:
   nit: Iceberg style skips the `set`. so this will be `void size(long newSize)`



##########
core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java:
##########
@@ -276,6 +287,62 @@ public static RewriteResult<ManifestFile> 
rewriteManifestList(
     }
   }
 
+  /**
+   * Rewrite a manifest list representing a snapshot, replacing path 
references.
+   *
+   * @param snapshot snapshot represented by the manifest list
+   * @param io file io
+   * @param tableMetadata metadata of table
+   * @param rewrittenManifestLengths rewritten manifest files and their sizes
+   * @param sourcePrefix source prefix that will be replaced
+   * @param targetPrefix target prefix that will replace it
+   * @param outputPath location to write the manifest list
+   */
+  public static void rewriteManifestList(

Review Comment:
   I know you mentioned that `RewriteResult` doesn't fit the manifest list 
rewrite. but `void` return type would also be inconsistent with other `rewrite` 
methods in this class.
   
   What do you think about moving this inside the 
`RewriteTablePathSparkAction`?  I see this method similar to the 
`rewriteVersionFile` method in the Spark class.
   
   This way, we have one less public API to worry about when we are going to 
refactor this non-intuitive rewrite result.



##########
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java:
##########
@@ -417,55 +424,37 @@ private Set<Pair<String, String>> statsFileCopyPlan(
    *
    * @param snapshot snapshot represented by the manifest list
    * @param tableMetadata metadata of table
-   * @param manifestsToRewrite filter of manifests to rewrite.
-   * @return a result including a copy plan for the manifests contained in the 
manifest list, as
-   *     well as for the manifest list itself
+   * @param rewrittenManifestLengths lengths of rewritten manifests
+   * @return a result including a copy plan for the manifest list itself
    */
   private RewriteResult<ManifestFile> rewriteManifestList(
-      Snapshot snapshot, TableMetadata tableMetadata, Set<String> 
manifestsToRewrite) {
+      Snapshot snapshot, TableMetadata tableMetadata, Map<String, Long> 
rewrittenManifestLengths) {
     RewriteResult<ManifestFile> result = new RewriteResult<>();
 
     String path = snapshot.manifestListLocation();
     String outputPath = RewriteTablePathUtil.stagingPath(path, sourcePrefix, 
stagingDir);
-    RewriteResult<ManifestFile> rewriteResult =
-        RewriteTablePathUtil.rewriteManifestList(
-            snapshot,
-            table.io(),
-            tableMetadata,
-            manifestsToRewrite,
-            sourcePrefix,
-            targetPrefix,
-            stagingDir,
-            outputPath);
-
-    result.append(rewriteResult);
+    RewriteTablePathUtil.rewriteManifestList(
+        snapshot,
+        table.io(),
+        tableMetadata,
+        rewrittenManifestLengths,
+        sourcePrefix,
+        targetPrefix,
+        outputPath);
+
     // add the manifest list copy plan itself to the result
     result
         .copyPlan()
         .add(Pair.of(outputPath, RewriteTablePathUtil.newPath(path, 
sourcePrefix, targetPrefix)));
     return result;
   }
 
-  private Set<String> manifestsToRewrite(
-      Set<Snapshot> deltaSnapshots, TableMetadata startMetadata) {
+  private Set<ManifestFile> manifestsToRewrite(Set<Snapshot> liveSnapshots) {

Review Comment:
   previously this method extract manifest files from the delta snapshots only. 
Right now, it returns manifest files from all live snapshots. Is the method 
name `manifestsToRewrite` still accurate?



##########
core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java:
##########
@@ -447,6 +551,58 @@ public static RewriteResult<DeleteFile> 
rewriteDeleteManifest(
     }
   }
 
+  /**
+   * Rewrite a delete manifest, replacing path references.
+   *
+   * @param manifestFile source delete manifest to rewrite
+   * @param snapshotIds snapshot ids for filtering returned delete manifest 
entries
+   * @param outputFile output file to rewrite manifest file to
+   * @param io file io
+   * @param format format of the manifest file
+   * @param specsById map of partition specs by id
+   * @param sourcePrefix source prefix that will be replaced
+   * @param targetPrefix target prefix that will replace it
+   * @param stagingLocation staging location for rewritten files (referred 
delete file will be
+   *     rewritten here)
+   * @return size of the resulting manifest file and a copy plan for the 
referenced content files
+   */
+  public static Pair<Long, RewriteResult<DeleteFile>> 
rewriteDeleteManifestWithResult(

Review Comment:
   > the way RewriteResult is currently implemented and used is 
counterintuitive to me. 
   
   Definitely agree. A larger refactoring can really help. but that probably 
can be tackled as a follow-up.



##########
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java:
##########
@@ -494,36 +483,60 @@ public RewriteContentFileResult 
appendDeleteFile(RewriteResult<DeleteFile> r1) {
     }
   }
 
-  /** Rewrite manifest files in a distributed manner and return rewritten data 
files path pairs. */
-  private RewriteContentFileResult rewriteManifests(
+  /**
+   * Rewrite manifest files in a distributed manner and return the resulting 
manifests and content
+   * files selected for rewriting.
+   */
+  private Map<String, RewriteContentFileResult> rewriteManifests(
       Set<Snapshot> deltaSnapshots, TableMetadata tableMetadata, 
Set<ManifestFile> toRewrite) {
     if (toRewrite.isEmpty()) {
-      return new RewriteContentFileResult();
+      return Maps.newHashMap();
     }
 
     Encoder<ManifestFile> manifestFileEncoder = 
Encoders.javaSerialization(ManifestFile.class);
+    Encoder<RewriteContentFileResult> manifestResultEncoder =
+        Encoders.javaSerialization(RewriteContentFileResult.class);
+    Encoder<Tuple2<String, RewriteContentFileResult>> tupleEncoder =
+        Encoders.tuple(Encoders.STRING(), manifestResultEncoder);
+
     Dataset<ManifestFile> manifestDS =
         spark().createDataset(Lists.newArrayList(toRewrite), 
manifestFileEncoder);
     Set<Long> deltaSnapshotIds =
         
deltaSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
 
-    return manifestDS
-        .repartition(toRewrite.size())
-        .map(
-            toManifests(
-                tableBroadcast(),
-                sparkContext().broadcast(deltaSnapshotIds),
-                stagingDir,
-                tableMetadata.formatVersion(),
-                sourcePrefix,
-                targetPrefix),
-            Encoders.bean(RewriteContentFileResult.class))
-        // duplicates are expected here as the same data file can have 
different statuses
-        // (e.g. added and deleted)
-        .reduce((ReduceFunction<RewriteContentFileResult>) 
RewriteContentFileResult::append);
-  }
-
-  private static MapFunction<ManifestFile, RewriteContentFileResult> 
toManifests(
+    Iterator<Tuple2<String, RewriteContentFileResult>> resultIterator =
+        manifestDS
+            .repartition(toRewrite.size())
+            .map(
+                toManifests(
+                    tableBroadcast(),
+                    sparkContext().broadcast(deltaSnapshotIds),
+                    stagingDir,
+                    tableMetadata.formatVersion(),
+                    sourcePrefix,
+                    targetPrefix),
+                tupleEncoder)
+            .toLocalIterator();

Review Comment:
   will we have memory concern with `toLocalIterator`?



##########
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java:
##########
@@ -281,24 +281,32 @@ private String rebuildMetadata() {
     // rebuild version files
     RewriteResult<Snapshot> rewriteVersionResult = 
rewriteVersionFiles(endMetadata);
     Set<Snapshot> deltaSnapshots = deltaSnapshots(startMetadata, 
rewriteVersionResult.toRewrite());
-
-    Set<String> manifestsToRewrite = manifestsToRewrite(deltaSnapshots, 
startMetadata);
     Set<Snapshot> validSnapshots =
         Sets.difference(snapshotSet(endMetadata), snapshotSet(startMetadata));
 
+    // rebuild manifest files
+    Set<ManifestFile> manifestsToRewrite = manifestsToRewrite(validSnapshots);

Review Comment:
   previously, `manifestsToRewrite` contains manifest files from delta 
snapshots. now it contains manifest files from all live snapshots. Can you help 
me understand how is the incremental rewrite still maintained in this case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to