aokolnychyi commented on code in PR #9724:
URL: https://github.com/apache/iceberg/pull/9724#discussion_r1498711912


##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java:
##########
@@ -323,10 +350,117 @@ private Result doExecute(
 
     List<FileGroupRewriteResult> rewriteResults =
         
rewrittenGroups.stream().map(RewriteFileGroup::asResult).collect(Collectors.toList());
-    return 
ImmutableRewriteDataFiles.Result.builder().rewriteResults(rewriteResults).build();
+    return 
ImmutableRewriteDataFiles.Result.builder().rewriteResults(rewriteResults);
+  }
+
+  private List<DeleteFile> removeDanglingDeletes() {
+    if (table.specs().size() == 1 && table.spec().isUnpartitioned()) {
+      // ManifestFilterManager already performs this table-wide on each commit
+      return Collections.emptyList();
+    }
+
+    DeleteFiles deleteFiles = table.newDelete();
+    List<DeleteFile> toRemove = Lists.newArrayList();
+    LOG.info("Evaluating dangling delete files for {}", table.name());
+    Dataset<Row> entries =
+        loadMetadataTable(table, MetadataTableType.ENTRIES)
+            .filter("status < 2") // live entries
+            .selectExpr(
+                "data_file.partition as partition",
+                "data_file.spec_id as spec_id",
+                "data_file.file_path as file_path",
+                "data_file.content as content",
+                "data_file.file_size_in_bytes as file_size_in_bytes",
+                "data_file.record_count as record_count",
+                "sequence_number");
+
+    toRemove.addAll(withReusableDS(entries, this::danglingDeletes));
+    toRemove.forEach(f -> LOG.debug("Removing dangling delete file {}", 
f.path()));
+    toRemove.forEach(deleteFiles::deleteFile);
+
+    if (!toRemove.isEmpty()) {
+      commit(deleteFiles);
+    }
+
+    return toRemove;
+  }
+
+  private List<DeleteFile> danglingDeletes(Dataset<Row> entries) {
+    List<DeleteFile> removedDeleteFiles = Lists.newArrayList();
+
+    // Minimum sequence number of data files in each partition
+    Dataset<Row> minDataSeqNumberPerPartition =
+        entries
+            .filter("content == 0") // data files
+            .groupBy("partition", "spec_id")
+            .agg(min("sequence_number"))
+            .toDF("partition", "spec_id", "min_data_sequence_number")
+            .cache();
+
+    // Dangling position delete files
+    removedDeleteFiles.addAll(danglingPositionDeletes(entries, 
minDataSeqNumberPerPartition));

Review Comment:
   Is it the join or the filter condition? I think the join is on the partition 
and spec ID in both cases, and we can come up with a filter can check the 
delete type and use `<=` or `<`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to