dramaticlly commented on code in PR #13720:
URL: https://github.com/apache/iceberg/pull/13720#discussion_r2252698558


##########
core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java:
##########
@@ -87,6 +87,24 @@ public Set<Pair<String, String>> copyPlan() {
     }
   }
 
+  public static class RewrittenFileInfo implements Serializable {

Review Comment:
   Since this is in iceberg core and this class is only used in static class 
for record and in SparkAction, I am wondering if we want to define it here. 



##########
core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java:
##########
@@ -357,6 +438,55 @@ public static RewriteResult<DataFile> rewriteDataManifest(
     }
   }
 
+
+  /**
+   * Rewrite a data manifest, replacing path references.
+   *
+   * @param manifestFile source manifest file to rewrite
+   * @param snapshotIds snapshot ids for filtering returned data manifest 
entries
+   * @param outputFile output file to rewrite manifest file to
+   * @param io file io
+   * @param format format of the manifest file
+   * @param specsById map of partition specs by id
+   * @param sourcePrefix source prefix that will be replaced
+   * @param targetPrefix target prefix that will replace it
+   * @return rewritten manifest file and a copy plan for the referenced 
content files
+   */
+  public static Pair<ManifestFile, RewriteResult<DataFile>> 
rewriteDataManifestWithResult(
+      ManifestFile manifestFile,
+      Set<Long> snapshotIds,
+      OutputFile outputFile,
+      FileIO io,
+      int format,
+      Map<Integer, PartitionSpec> specsById,
+      String sourcePrefix,
+      String targetPrefix)
+      throws IOException {
+    PartitionSpec spec = specsById.get(manifestFile.partitionSpecId());
+    ManifestWriter<DataFile> writer =
+            ManifestFiles.write(format, spec, outputFile, 
manifestFile.snapshotId());
+    RewriteResult<DataFile> rewriteResult = null;
+
+    try (ManifestWriter<DataFile> dataManifestWriter = writer;
+         ManifestReader<DataFile> reader =
+            ManifestFiles.read(manifestFile, io, specsById)
+                .select(Arrays.asList("*"))) {
+       rewriteResult =
+          StreamSupport.stream(reader.entries().spliterator(), false)
+            .map(
+                entry ->
+                    writeDataFileEntry(
+                        entry,
+                        snapshotIds,
+                        spec,
+                        sourcePrefix,
+                        targetPrefix,
+                        writer))
+            .reduce(new RewriteResult<>(), RewriteResult::append);
+    }
+    return Pair.of(writer.toManifestFile(), rewriteResult);

Review Comment:
   Looks like we are returning entire new manifestFile on top of list of data 
files in copy plan , but later in SparkAction we only use its new location and 
length, can we do better memory wise?



##########
core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java:
##########
@@ -274,6 +294,65 @@ public static RewriteResult<ManifestFile> 
rewriteManifestList(
     }
   }
 
+  /**
+   * Rewrite a manifest list representing a snapshot, replacing path 
references.
+   * @param snapshot snapshot represented by the manifest list
+   * @param io file io
+   * @param tableMetadata metadata of table
+   * @param rewrittenManifests information about rewritten manifest files
+   * @param sourcePrefix source prefix that will be replaced
+   * @param targetPrefix target prefix that will replace it
+   * @param stagingDir staging directory
+   * @param outputPath location to write the manifest list
+   */
+  public static void rewriteManifestList(
+      Snapshot snapshot,
+      FileIO io,
+      TableMetadata tableMetadata,
+      Map<String, RewrittenFileInfo> rewrittenManifests,
+      String sourcePrefix,
+      String targetPrefix,
+      String stagingDir,

Review Comment:
   I believe this `stagingDir` is no longer used as now path come from 
`rewrittenManifests.get(file.path()).getNewPath()`



##########
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java:
##########
@@ -415,54 +421,85 @@ private Set<Pair<String, String>> statsFileCopyPlan(
    * @param snapshot snapshot represented by the manifest list
    * @param tableMetadata metadata of table
    * @param manifestsToRewrite filter of manifests to rewrite.
-   * @return a result including a copy plan for the manifests contained in the 
manifest list, as
-   *     well as for the manifest list itself
+   * @return a result including a copy plan for the manifest list itself
    */
   private RewriteResult<ManifestFile> rewriteManifestList(
-      Snapshot snapshot, TableMetadata tableMetadata, Set<String> 
manifestsToRewrite) {
+      Snapshot snapshot,
+      TableMetadata tableMetadata,
+      Map<String, RewrittenFileInfo> rewrittenManifests) {
     RewriteResult<ManifestFile> result = new RewriteResult<>();
 
     String path = snapshot.manifestListLocation();
     String outputPath = RewriteTablePathUtil.stagingPath(path, stagingDir);
-    RewriteResult<ManifestFile> rewriteResult =
-        RewriteTablePathUtil.rewriteManifestList(
-            snapshot,
-            table.io(),
-            tableMetadata,
-            manifestsToRewrite,
-            sourcePrefix,
-            targetPrefix,
-            stagingDir,
-            outputPath);
-
-    result.append(rewriteResult);
+    RewriteTablePathUtil.rewriteManifestList(
+        snapshot,
+        table.io(),
+        tableMetadata,
+        rewrittenManifests,
+        sourcePrefix,
+        targetPrefix,
+        stagingDir,
+        outputPath);
+
     // add the manifest list copy plan itself to the result
     result
         .copyPlan()
         .add(Pair.of(outputPath, RewriteTablePathUtil.newPath(path, 
sourcePrefix, targetPrefix)));
     return result;
   }
 
-  private Set<String> manifestsToRewrite(
+  private Set<ManifestFile> manifestsToRewrite(
       Set<Snapshot> deltaSnapshots, TableMetadata startMetadata) {
     try {
       Table endStaticTable = newStaticTable(endVersionName, table.io());
-      Dataset<Row> lastVersionFiles = 
manifestDS(endStaticTable).select("path");
+      Dataset<Row> allManifestsDF = manifestDS(endStaticTable).select("path");
+      Set<String> expectedManifestPaths;
+
       if (startMetadata == null) {
-        return 
Sets.newHashSet(lastVersionFiles.distinct().as(Encoders.STRING()).collectAsList());
+        expectedManifestPaths =
+            
Sets.newHashSet(allManifestsDF.distinct().as(Encoders.STRING()).collectAsList());
       } else {
         Set<Long> deltaSnapshotIds =
             
deltaSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
-        return Sets.newHashSet(
-            lastVersionFiles
-                .distinct()
-                .filter(
-                    functions
-                        .column(ManifestFile.SNAPSHOT_ID.name())
-                        .isInCollection(deltaSnapshotIds))
-                .as(Encoders.STRING())
-                .collectAsList());
+        expectedManifestPaths =
+            Sets.newHashSet(
+                allManifestsDF
+                    .distinct()
+                    .filter(
+                        functions
+                            .column(ManifestFile.SNAPSHOT_ID.name())
+                            .isInCollection(deltaSnapshotIds))
+                    .as(Encoders.STRING())
+                    .collectAsList());
       }
+
+      Set<ManifestFile> foundManifests =
+          deltaSnapshots.stream()
+              .flatMap(
+                  s -> {
+                    try {
+                      return s.allManifests(table.io()).stream();
+                    } catch (NotFoundException e) {
+                      LOG.warn(
+                          "Skipping snapshot {} as its manifest list is 
missing (likely expired).",
+                          s.snapshotId(),
+                          e);
+                      return Stream.empty();
+                    }
+                  })
+              .collect(Collectors.toSet());
+
+      Set<String> foundManifestPaths =
+          
foundManifests.stream().map(ManifestFile::path).collect(Collectors.toSet());
+      Set<String> missingPaths = Sets.difference(expectedManifestPaths, 
foundManifestPaths);
+      Preconditions.checkState(
+          missingPaths.isEmpty(),
+          "Could not find all expected manifests. Missing files: %s",
+          String.join(", ", missingPaths));
+
+      return foundManifests.stream()
+          .filter(m -> expectedManifestPaths.contains(m.path()))
+          .collect(Collectors.toSet());

Review Comment:
   If you are looking for set of manifests filtered by snapshot ids, I think 
this can probably be replaced with something similar to 
https://github.com/apache/iceberg/blob/main/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java#L182-L190,
 



##########
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java:
##########
@@ -491,77 +528,128 @@ public RewriteContentFileResult 
appendDeleteFile(RewriteResult<DeleteFile> r1) {
     }
   }
 
-  /** Rewrite manifest files in a distributed manner and return rewritten data 
files path pairs. */
-  private RewriteContentFileResult rewriteManifests(
+  public static class ManifestsRewriteResult {
+    private final RewriteContentFileResult contentFileResult;
+    private final Map<String, RewrittenFileInfo> rewrittenManifests;
+
+    ManifestsRewriteResult(
+        RewriteContentFileResult contentFileResult,
+        Map<String, RewrittenFileInfo> rewrittenManifests) {
+      this.contentFileResult = contentFileResult;
+      this.rewrittenManifests = rewrittenManifests;
+    }
+
+    public RewriteContentFileResult getContentFileResult() {
+      return contentFileResult;
+    }
+
+    public Map<String, RewrittenFileInfo> getRewrittenManifests() {
+      return rewrittenManifests;
+    }
+  }

Review Comment:
   I believe we also need the similar for position deletes, where we need write 
delete files before write delete manifests, so I am wondering if we can 
generalize this result class to take into account both



##########
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java:
##########
@@ -282,23 +287,25 @@ private String rebuildMetadata() {
     RewriteResult<Snapshot> rewriteVersionResult = 
rewriteVersionFiles(endMetadata);
     Set<Snapshot> deltaSnapshots = deltaSnapshots(startMetadata, 
rewriteVersionResult.toRewrite());
 
-    Set<String> manifestsToRewrite = manifestsToRewrite(deltaSnapshots, 
startMetadata);
-    Set<Snapshot> validSnapshots =
-        Sets.difference(snapshotSet(endMetadata), snapshotSet(startMetadata));
+    // rebuild manifest files
+    Set<ManifestFile> manifestsToRewrite = manifestsToRewrite(deltaSnapshots, 
startMetadata);
+    ManifestsRewriteResult rewriteManifestResult =
+        rewriteManifests(deltaSnapshots, endMetadata, manifestsToRewrite);

Review Comment:
   Just some notes about behavior change here and also potential performance 
implications
   
   Before this change we are doing rewrite top down in one pass from 
version-file -> manifest-list -> parallelize rewrite of manifest-files as well 
as position deletes. So it's optimized to do top layer rewrite + discover the 
subset of bottom layer require to be rewritten and gradually expand.
   
   However in order to get size correctly, we need to reverse some of the 
rewrite ordering by rewrite the manifest first and then later the 
manifest-list. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to