dramaticlly commented on code in PR #9724: URL: https://github.com/apache/iceberg/pull/9724#discussion_r1498475146
########## spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java: ########## @@ -507,4 +645,54 @@ public int totalGroupCount() { return totalGroupCount; } } + + private static class MakeDeleteFile implements MapFunction<Row, DeleteFile> { + + private final boolean posDeletes; + private final Types.StructType partitionType; + private final Map<Integer, PartitionSpec> specsById; + + /** + * Map function that transforms entries table rows into {@link DeleteFile} + * + * @param posDeletes true for position deletes, false for equality deletes + * @param partitionType partition type of table + * @param specsById table's partition specs + */ + MakeDeleteFile( + boolean posDeletes, Types.StructType partitionType, Map<Integer, PartitionSpec> specsById) { + this.posDeletes = posDeletes; + this.partitionType = partitionType; + this.specsById = specsById; + } + + @Override + public DeleteFile call(Row row) throws Exception { + PartitionData partition = new PartitionData(partitionType); + GenericRowWithSchema partitionRow = row.getAs(0); + + for (int i = 0; i < partitionRow.length(); i++) { + partition.set(i, partitionRow.get(i)); + } + + int specId = row.getAs(1); + String path = row.getAs(2); + long fileSize = row.getAs(3); + long recordCount = row.getAs(4); + + FileMetadata.Builder builder = FileMetadata.deleteFileBuilder(specsById.get(specId)); Review Comment: I see what you mean, because of the fact that join condition was based on specId and partition data to match, this will not remove dangling deletes after partition evolution where rewrite data files rewrite data files into new partition spec. Quick example illustrated below where even though we have equality delete with sequence number of 3 smaller than data files min sequence number of 5, due to partition evolution, the dangling equality delete will not be evaluated in this case. Instead of looking into project partition data into new struct, maybe we can see for how to remove delete file based on file path. ``` equality delete entries() +---------+-------+--------------------+-------+------------------+------------+---------------+ |partition|spec_id| file_path|content|file_size_in_bytes|record_count|sequence_number| +---------+-------+--------------------+-------+------------------+------------+---------------+ |{2, NULL}| 0|file:/var/folders...| 2| 465| 1| 4| |{1, NULL}| 0|file:/var/folders...| 2| 465| 1| 3| +---------+-------+--------------------+-------+------------------+------------+---------------+ ``` ``` minSeqNumberPerDataFilesPartition +---------+-------+------------------------+ |partition|spec_id|min_data_sequence_number| +---------+-------+------------------------+ |{1, CCCC}| 1| 5| |{1, BBBB}| 1| 5| |{2, DDDD}| 1| 5| +---------+-------+------------------------+ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org