rdblue commented on PR #9230: URL: https://github.com/apache/iceberg/pull/9230#issuecomment-1859376491
@jasonf20, sorry for being unclear in my reply earlier, but I don't think that the tests in this PR reproduce the error. The tests here reproduce similar errors, but do it by manually calling `commit` on operations multiple times, which is only allowed by transactions. After looking at the problem more, I was able to produce a test case that used both the transaction and append APIs correct and lost data. Here's the test: ```java @Test public void testTransactionRecommit() { // update table settings to merge when there are 3 manifests table.updateProperties().set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "3").commit(); // create manifests so that the next commit will trigger a merge table.newFastAppend().appendFile(FILE_A).commit(); table.newFastAppend().appendFile(FILE_B).commit(); // start a transaction with appended files that will merge Transaction transaction = Transactions.newTransaction(table.name(), table.ops()); AppendFiles append = transaction.newAppend().appendFile(FILE_D); Snapshot pending = append.apply(); Assert.assertEquals( "Should produce 1 pending merged manifest", 1, pending.allManifests(table.io()).size()); // because a merge happened, the appended manifest is deleted the by append operation append.commit(); // concurrently commit FILE_A without a transaction to cause the previous append to retry table.newAppend().appendFile(FILE_C).commit(); Assert.assertEquals( "Should produce 1 committed merged manifest", 1, table.currentSnapshot().allManifests(table.io()).size()); transaction.commitTransaction(); Set<String> paths = Sets.newHashSet( Iterables.transform( table.newScan().planFiles(), task -> task.file().path().toString())); Set<String> expectedPaths = Sets.newHashSet( FILE_A.path().toString(), FILE_B.path().toString(), FILE_C.path().toString(), FILE_D.path().toString() ); Assert.assertEquals("Should contain all committed files", expectedPaths, paths); Assert.assertEquals( "Should produce 2 manifests", 2, table.currentSnapshot().allManifests(table.io()).size()); } ``` The problem happens when two concurrent commits both compact the latest manifests. When that happens, the new data file manifest is removed by the operation cleanup logic because it was not committed to the transaction. Then, when the transaction needs to re-apply the change, the reuse logic notices that the appended file manifests have already been written and reuses them. However, the list is no longer correct because it was filtered. There are a few things to take away from this. First, this doesn't affect fast appends because that operation never merges manifests. However, it's still incorrect to filter the added manifests list so we should apply the same fix to `FastAppend` that we do to `MergingSnapshotProducer`. Second, we can't recover when any new file manifest has been deleted, so I think the solution is to catch when any manifest is deleted and rewrite the batch by setting the reused manifests to `null`. That's like your original solution, but the difference is that we need to check whether `deleteFunc` is called and only set the cached manifests to `null` if that's the case: ```diff diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index e06a491098..bde92daa4a 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -879,16 +879,20 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> { private void cleanUncommittedAppends(Set<ManifestFile> committed) { if (cachedNewDataManifests != null) { + boolean hasDelete = false; List<ManifestFile> committedNewDataManifests = Lists.newArrayList(); for (ManifestFile manifest : cachedNewDataManifests) { if (committed.contains(manifest)) { committedNewDataManifests.add(manifest); } else { deleteFile(manifest.path()); + hasDelete = true; } } - this.cachedNewDataManifests = committedNewDataManifests; + if (hasDelete) { + cachedNewDataManifests = null; + } } ListIterator<ManifestFile> deleteManifestsIterator = cachedNewDeleteManifests.listIterator(); ``` This is going to be better for transactions than always setting the cached manifests to null because the manifests are _usually_ in the committed list. We also need to check whether similar logic is needed for `cachedNewDeleteManifests`, which is also modified. I think we will need it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org