rdblue commented on PR #9230:
URL: https://github.com/apache/iceberg/pull/9230#issuecomment-1859376491

   @jasonf20, sorry for being unclear in my reply earlier, but I don't think 
that the tests in this PR reproduce the error. The tests here reproduce similar 
errors, but do it by manually calling `commit` on operations multiple times, 
which is only allowed by transactions.
   
   After looking at the problem more, I was able to produce a test case that 
used both the transaction and append APIs correct and lost data. Here's the 
test:
   
   ```java
     @Test
     public void testTransactionRecommit() {
       // update table settings to merge when there are 3 manifests
       table.updateProperties().set(TableProperties.MANIFEST_MIN_MERGE_COUNT, 
"3").commit();
   
       // create manifests so that the next commit will trigger a merge
       table.newFastAppend().appendFile(FILE_A).commit();
       table.newFastAppend().appendFile(FILE_B).commit();
   
       // start a transaction with appended files that will merge
       Transaction transaction = Transactions.newTransaction(table.name(), 
table.ops());
   
       AppendFiles append = transaction.newAppend().appendFile(FILE_D);
       Snapshot pending = append.apply();
   
       Assert.assertEquals(
           "Should produce 1 pending merged manifest", 1, 
pending.allManifests(table.io()).size());
   
       // because a merge happened, the appended manifest is deleted the by 
append operation
       append.commit();
   
       // concurrently commit FILE_A without a transaction to cause the 
previous append to retry
       table.newAppend().appendFile(FILE_C).commit();
       Assert.assertEquals(
           "Should produce 1 committed merged manifest",
           1,
           table.currentSnapshot().allManifests(table.io()).size());
   
       transaction.commitTransaction();
   
       Set<String> paths =
           Sets.newHashSet(
               Iterables.transform(
                   table.newScan().planFiles(), task -> 
task.file().path().toString()));
       Set<String> expectedPaths = Sets.newHashSet(
           FILE_A.path().toString(),
           FILE_B.path().toString(),
           FILE_C.path().toString(),
           FILE_D.path().toString()
       );
   
       Assert.assertEquals("Should contain all committed files", expectedPaths, 
paths);
   
       Assert.assertEquals(
           "Should produce 2 manifests",
           2,
           table.currentSnapshot().allManifests(table.io()).size());
     }
   ```
   
   The problem happens when two concurrent commits both compact the latest 
manifests. When that happens, the new data file manifest is removed by the 
operation cleanup logic because it was not committed to the transaction. Then, 
when the transaction needs to re-apply the change, the reuse logic notices that 
the appended file manifests have already been written and reuses them. However, 
the list is no longer correct because it was filtered.
   
   There are a few things to take away from this. First, this doesn't affect 
fast appends because that operation never merges manifests. However, it's still 
incorrect to filter the added manifests list so we should apply the same fix to 
`FastAppend` that we do to `MergingSnapshotProducer`. Second, we can't recover 
when any new file manifest has been deleted, so I think the solution is to 
catch when any manifest is deleted and rewrite the batch by setting the reused 
manifests to `null`. That's like your original solution, but the difference is 
that we need to check whether `deleteFunc` is called and only set the cached 
manifests to `null` if that's the case:
   
   ```diff
   diff --git 
a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java 
b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
   index e06a491098..bde92daa4a 100644
   --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
   +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
   @@ -879,16 +879,20 @@ abstract class MergingSnapshotProducer<ThisT> extends 
SnapshotProducer<ThisT> {
    
      private void cleanUncommittedAppends(Set<ManifestFile> committed) {
        if (cachedNewDataManifests != null) {
   +      boolean hasDelete = false;
          List<ManifestFile> committedNewDataManifests = Lists.newArrayList();
          for (ManifestFile manifest : cachedNewDataManifests) {
            if (committed.contains(manifest)) {
              committedNewDataManifests.add(manifest);
            } else {
              deleteFile(manifest.path());
   +          hasDelete = true;
            }
          }
    
   -      this.cachedNewDataManifests = committedNewDataManifests;
   +      if (hasDelete) {
   +        cachedNewDataManifests = null;
   +      }
        }
    
        ListIterator<ManifestFile> deleteManifestsIterator = 
cachedNewDeleteManifests.listIterator();
   ```
   
   This is going to be better for transactions than always setting the cached 
manifests to null because the manifests are _usually_ in the committed list. We 
also need to check whether similar logic is needed for 
`cachedNewDeleteManifests`, which is also modified. I think we will need it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to