dramaticlly commented on code in PR #9724:
URL: https://github.com/apache/iceberg/pull/9724#discussion_r1498475146


##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java:
##########
@@ -507,4 +645,54 @@ public int totalGroupCount() {
       return totalGroupCount;
     }
   }
+
+  private static class MakeDeleteFile implements MapFunction<Row, DeleteFile> {
+
+    private final boolean posDeletes;
+    private final Types.StructType partitionType;
+    private final Map<Integer, PartitionSpec> specsById;
+
+    /**
+     * Map function that transforms entries table rows into {@link DeleteFile}
+     *
+     * @param posDeletes true for position deletes, false for equality deletes
+     * @param partitionType partition type of table
+     * @param specsById table's partition specs
+     */
+    MakeDeleteFile(
+        boolean posDeletes, Types.StructType partitionType, Map<Integer, 
PartitionSpec> specsById) {
+      this.posDeletes = posDeletes;
+      this.partitionType = partitionType;
+      this.specsById = specsById;
+    }
+
+    @Override
+    public DeleteFile call(Row row) throws Exception {
+      PartitionData partition = new PartitionData(partitionType);
+      GenericRowWithSchema partitionRow = row.getAs(0);
+
+      for (int i = 0; i < partitionRow.length(); i++) {
+        partition.set(i, partitionRow.get(i));
+      }
+
+      int specId = row.getAs(1);
+      String path = row.getAs(2);
+      long fileSize = row.getAs(3);
+      long recordCount = row.getAs(4);
+
+      FileMetadata.Builder builder = 
FileMetadata.deleteFileBuilder(specsById.get(specId));

Review Comment:
   I see what you mean, because of the fact that join condition was based on 
specId and partition data to match, this will not remove dangling deletes after 
partition evolution where rewrite data files rewrite data files into new 
partition spec. Quick example illustrated below where even though we have 
equality delete with sequence number of 3 smaller than data files min sequence 
number of 5, due to partition evolution, the dangling equality delete will not 
be evaluated in this case. 
   
   Instead of looking into project partition data into new struct, maybe we can 
see for how to remove delete file based on file path. 
   
   ```
   equality delete entries()
   
+---------+-------+--------------------+-------+------------------+------------+---------------+
   |partition|spec_id|           
file_path|content|file_size_in_bytes|record_count|sequence_number|
   
+---------+-------+--------------------+-------+------------------+------------+---------------+
   |{2, NULL}|      0|file:/var/folders...|      2|               465|          
 1|              4|
   |{1, NULL}|      0|file:/var/folders...|      2|               465|          
 1|              3|
   
+---------+-------+--------------------+-------+------------------+------------+---------------+
   ```
   
   ```
   minSeqNumberPerDataFilesPartition
   +---------+-------+------------------------+
   |partition|spec_id|min_data_sequence_number|
   +---------+-------+------------------------+
   |{1, CCCC}|      1|                       5|
   |{1, BBBB}|      1|                       5|
   |{2, DDDD}|      1|                       5|
   +---------+-------+------------------------+
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to