amogh-jahagirdar commented on code in PR #10962: URL: https://github.com/apache/iceberg/pull/10962#discussion_r1746200353
########## core/src/test/java/org/apache/iceberg/TestRewriteFiles.java: ########## @@ -384,6 +386,116 @@ public void testRewriteDataAndAssignOldSequenceNumber() { assertThat(listManifestFiles()).hasSize(4); } + @TestTemplate + public void testRewriteDataAndAssignOldSequenceNumbersShouldNotDropDeleteFiles() { + assumeThat(formatVersion) + .as("Sequence number is only supported in iceberg format v2 or later") + .isGreaterThan(1); + assertThat(listManifestFiles()).isEmpty(); + + commit(table, table.newRowDelta().addRows(FILE_A).addDeletes(FILE_A2_DELETES), branch); + + long firstRewriteSequenceNumber = latestSnapshot(table, branch).sequenceNumber(); + + commit( + table, + table.newRowDelta().addRows(FILE_B).addRows(FILE_B).addDeletes(FILE_B2_DELETES), + branch); + commit( + table, + table.newRowDelta().addRows(FILE_B).addRows(FILE_C).addDeletes(FILE_C2_DELETES), + branch); + + long secondRewriteSequenceNumber = latestSnapshot(table, branch).sequenceNumber(); + + commit( + table, + table + .newRewrite() + .addFile(FILE_D) + .deleteFile(FILE_B) + .deleteFile(FILE_C) + .dataSequenceNumber(secondRewriteSequenceNumber), + branch); + + TableMetadata base = readMetadata(); + Snapshot baseSnap = latestSnapshot(base, branch); + long baseSnapshotId = baseSnap.snapshotId(); + + Comparator<ManifestFile> sequenceNumberOrdering = + new Comparator<>() { + @Override + public int compare(ManifestFile o1, ManifestFile o2) { + return (int) (o1.sequenceNumber() - o2.sequenceNumber()); + } + }; + + // FILE_B2_DELETES and FILE_A2_DELETES should not be removed as the rewrite specifies + // `firstRewriteSequenceNumber` + // explicitly which is the same as that of A2_DELETES and before B2_DELETES + + // Technically A1_DELETES could be removed since it's an equality delete and should apply on Review Comment: Sorry not sure I really follow the comment? There is no A1_DELETES....there's a FILE_A_DELETES but that's a positional delete. I don't see those referenced in the above operations. I think it's true though that we should be able to drop equality deletes older than the minimum sequence number but that's already happening in the existing MergingSnapshotProducer check no? Don't think anything needs to distinguish there between equality and positional delete ########## core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java: ########## @@ -833,7 +833,17 @@ public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) { filterManager.filterManifests( SnapshotUtil.schemaFor(base, targetBranch()), snapshot != null ? snapshot.dataManifests(ops.io()) : null); - long minDataSequenceNumber = + + long minNewFileSequenceNumber = + addedDataFiles().stream() + .filter(x -> x.dataSequenceNumber() != null && x.dataSequenceNumber() >= 0) + .mapToLong(ContentFile::dataSequenceNumber) + .reduce( + newDataFilesDataSequenceNumber != null + ? newDataFilesDataSequenceNumber + : base.nextSequenceNumber(), + Math::min); Review Comment: Do we actually need to iterate through the `addedDataFiles`? If I understood the issue correctly, the problem is that it's possible for a user to commit a rewrite operation and specify an older data sequence number, and the current logic would drop delete files which actually need to still be referenced in the new commit since it's not considering the specified data file sequence number. So I *think* all we would need to do here is keep the existing logic for determining minDataSequenceNumber and then also min that with the `newDataFilesDataSequenceNumber` if it's not null ``` long minNewDataSequenceNumber = <Existing logic> if (newDataFilesDataSequenceNumber != null) { minNewDataSequenceNumber = Math.min(minNewDataSequenceNumber, newDataFilesDataSequenceNumber); } ``` ########## core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java: ########## @@ -833,7 +833,17 @@ public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) { filterManager.filterManifests( SnapshotUtil.schemaFor(base, targetBranch()), snapshot != null ? snapshot.dataManifests(ops.io()) : null); - long minDataSequenceNumber = + + long minNewFileSequenceNumber = + addedDataFiles().stream() + .filter(x -> x.dataSequenceNumber() != null && x.dataSequenceNumber() >= 0) + .mapToLong(ContentFile::dataSequenceNumber) + .reduce( + newDataFilesDataSequenceNumber != null + ? newDataFilesDataSequenceNumber + : base.nextSequenceNumber(), + Math::min); Review Comment: I tried the above and the new unit test still passes, though let me know if you see a flaw in my reasoning. cc @rdblue @aokolnychyi @findepi -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org