zinking commented on code in PR #9724:
URL: https://github.com/apache/iceberg/pull/9724#discussion_r1498626099


##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java:
##########
@@ -323,10 +350,117 @@ private Result doExecute(
 
     List<FileGroupRewriteResult> rewriteResults =
         
rewrittenGroups.stream().map(RewriteFileGroup::asResult).collect(Collectors.toList());
-    return 
ImmutableRewriteDataFiles.Result.builder().rewriteResults(rewriteResults).build();
+    return 
ImmutableRewriteDataFiles.Result.builder().rewriteResults(rewriteResults);
+  }
+
+  private List<DeleteFile> removeDanglingDeletes() {
+    if (table.specs().size() == 1 && table.spec().isUnpartitioned()) {
+      // ManifestFilterManager already performs this table-wide on each commit
+      return Collections.emptyList();
+    }
+
+    DeleteFiles deleteFiles = table.newDelete();
+    List<DeleteFile> toRemove = Lists.newArrayList();
+    LOG.info("Evaluating dangling delete files for {}", table.name());
+    Dataset<Row> entries =
+        loadMetadataTable(table, MetadataTableType.ENTRIES)
+            .filter("status < 2") // live entries
+            .selectExpr(
+                "data_file.partition as partition",
+                "data_file.spec_id as spec_id",
+                "data_file.file_path as file_path",
+                "data_file.content as content",
+                "data_file.file_size_in_bytes as file_size_in_bytes",
+                "data_file.record_count as record_count",
+                "sequence_number");
+
+    toRemove.addAll(withReusableDS(entries, this::danglingDeletes));
+    toRemove.forEach(f -> LOG.debug("Removing dangling delete file {}", 
f.path()));
+    toRemove.forEach(deleteFiles::deleteFile);
+
+    if (!toRemove.isEmpty()) {
+      commit(deleteFiles);
+    }
+
+    return toRemove;
+  }
+
+  private List<DeleteFile> danglingDeletes(Dataset<Row> entries) {
+    List<DeleteFile> removedDeleteFiles = Lists.newArrayList();
+
+    // Minimum sequence number of data files in each partition
+    Dataset<Row> minDataSeqNumberPerPartition =
+        entries
+            .filter("content == 0") // data files
+            .groupBy("partition", "spec_id")
+            .agg(min("sequence_number"))
+            .toDF("partition", "spec_id", "min_data_sequence_number")
+            .cache();
+
+    // Dangling position delete files
+    removedDeleteFiles.addAll(danglingPositionDeletes(entries, 
minDataSeqNumberPerPartition));

Review Comment:
   it's slightly different join condition where for eq delete it is <= and for 
pos it is < 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to