fqaiser94 opened a new issue, #6514:
URL: https://github.com/apache/iceberg/issues/6514

   ### Feature Request / Improvement
   
   TLDR: I'd like to propose adding a new `void 
commitTransaction(CommitCondition commitCondition)` method to the `Transaction` 
interface so users can specify the conditions under which a transaction can be 
safely committed or not, given that other transactions may have changed the 
table concurrently. This will enable use-cases such as monotonically increasing 
watermarks in table properties. 
   
   # General Problem
   
   I want to start by describing the challenge I currently face using the 
existing `Transaction.commitTransaction()` API.
   
   Consider the following example situation: 
   - We have an iceberg table and we are maintaining a custom watermark in the 
table properties. 
     The expectation is that every time we append new data to the iceberg 
table, the custom watermark should be incremented. 
     Let's say the current value of this `"custom_watermark"` is `"0"` in our 
iceberg table's properties. 
   - Now let's say we have two transactions running concurrently against this 
Iceberg table. 
     Both transactions are appending a different datafile to the iceberg table: 
     `txn.newAppend().appendFile(dataFile).commit()`
     As part of each transactions, we are also advancing the 
`"custom_watermark"` table property to the next value which can be easily 
calculated by pulling the latest value from the existing table properties and 
adding one i.e. `"1"`: 
     `txn.updateProperties().set("custom_watermark", "1").commit()`
   - What happens next is that one of the transactions will "win" and be 
committed to the table first. 
     We have new data in the table and the custom watermark has been correctly 
incremented from `"0"` to `"1"`. So far so good.
   - The "loser" transaction will fail with `CommitFailedException` and be 
automatically retried (assuming `TableProperties.COMMIT_NUM_RETRIES >= 1`). The 
retry mechanism is where things get interesting.
   - First, iceberg will refresh it's view of the table. Internally this is 
done by calling `TableOperations.refresh()` which returns an up-to-date 
`TableMetadata`). 
   - Then iceberg will re-apply the updates in the "losing" transaction on top 
of the refreshed `TableMetadata` i.e. 
     It will again attempt to append data to the table and set the custom 
watermark table property to `"1"`.
     You can already see the problem here: our custom watermark isn't being 
incremented correctly! 
   - Unfortunately, all Iceberg sees is a set of updates (from the "losing" 
transaction) that as far as it knows do not conflict in any way that it cares 
(with the changes from the "winning" transaction), so it will happily attempt 
to commit the updated `TableMetadata` a second time and succeed in doing so. 
     Now we are in a "bad" state: although the table has new data, the custom 
watermark has not been advanced. 
   
   This scenario can demonstrated in code like so: 
   
   ```
   @Test  
   public void 
watermarkIsNotIncrementedCorrectlyWithConcurrentTransactionCommits() throws 
Exception {  
     String customWatermarkKey = "custom_watermark";  
     
     table.updateProperties().set(customWatermarkKey, "0").commit();  
     Integer initialExpectedVersion = 1;  
     Assert.assertEquals("Table should be on version 1", 
initialExpectedVersion, version());  
     Assert.assertEquals(  
       "Initial custom watermark value is",  
       "0",  
       table.properties().get(customWatermarkKey));  
     
     Supplier<String> nextWatermarkValue = () ->  
       Optional.ofNullable(table.properties().get(customWatermarkKey))  
       .map(Integer::parseInt)  
       .map(x -> x + 1)  
       .map(String::valueOf)  
       .orElse("0");  
     
     Function<DataFile, Thread> makeThread = (dataFile) -> {  
       Transaction txn = table.newTransaction();  
       txn.newAppend().appendFile(dataFile).commit();  
       txn.updateProperties().set(customWatermarkKey, 
nextWatermarkValue.get()).commit();  
       return new Thread(txn::commitTransaction);  
     };  
     
     Thread thread1 = makeThread.apply(FILE_A);  
     Thread thread2 = makeThread.apply(FILE_B);  
     
     thread1.start();  
     thread2.start();  
     
     thread1.join();  
     thread2.join();  
     
     Assert.assertEquals(  
       "Table should be on two versions ahead as two transactions have been 
committed successfully",  
       initialExpectedVersion + 2,  
       (int) version());  
     Assert.assertEquals(  
       "We want custom_watermark to also be incremented twice but in fact it 
appears to have been incremented only once",  
       "1",  
       table.properties().get(customWatermarkKey));  
   }
   ```
   
   You might think at this point a simple solution to this problem is to simply 
set  `TableProperties.COMMIT_NUM_RETRIES = 0`. 
   Setting this property ensures iceberg will just throw a 
`CommitFailedException`, instead pf retrying "losing" transactions and putting 
us in a bad state. In that sense, this is an improvement. 
   Unfortunately, this only works for the specific sequence of events described 
above, and is not a general solution. 
   This is because before Iceberg even attempts to perform the atomic commit 
operation the first time (i.e. not on a retry attempt), it will first check 
whether the `TableMetadata` is up-to-date and if not it will refresh the 
`TableMetadata` before applying the updates and attempting the commit. 
   Put another way, the automatically-refresh-`TableMetadata` behaviour is not 
a part of the retry mechanism and so you can still hit this problem even 
without any retries. This situation can also be reproduced in code, as follows: 
   
   ```
   @Test  
   public void removingRetriesIsNotAGeneralSolution() throws Exception {  
     String customWatermarkKey = "custom_watermark";  
     
     table.updateProperties().set(TableProperties.COMMIT_NUM_RETRIES, 
"0").set(customWatermarkKey, "0").commit();  
     Integer initialExpectedVersion = 1;  
     Assert.assertEquals("Table should be on version 1", 
initialExpectedVersion, version());  
     Assert.assertEquals("Initial custom watermark value is", "0", 
table.properties().get(customWatermarkKey));  
     
     Supplier<String> nextWatermarkValue = () ->  
       Optional.ofNullable(table.properties().get(customWatermarkKey))  
       .map(Integer::parseInt)  
       .map(x -> x + 1)  
       .map(String::valueOf)  
       .orElse("0");  
     
     Transaction txn1 = table.newTransaction();  
     txn1.newAppend().appendFile(FILE_A).commit();  
     txn1.updateProperties().set(customWatermarkKey, 
nextWatermarkValue.get()).commit();  
     
     // concurrent transaction which is committed before the first transaction 
ever calls .commit  
     Transaction txn2 = table.newTransaction();  
     txn2.newAppend().appendFile(FILE_B).commit();  
     txn2.updateProperties().set(customWatermarkKey, 
nextWatermarkValue.get()).commit();  
     txn2.commitTransaction();  
     Assert.assertEquals("Table should be on next version", 
initialExpectedVersion + 1, (int) version());  
     Assert.assertEquals("Table watermark is incremented to 1", "1", 
table.properties().get(customWatermarkKey));  
     
     txn1.commitTransaction();  
     Assert.assertEquals("Table should be on next version", 
initialExpectedVersion + 2, (int) version());  
     Assert.assertEquals("Table watermark has seemingly not been incremented", 
"1", table.properties().get(customWatermarkKey));  
   }
   ```
   
   If anyone has easy ideas for solving this issue, I would love to hear it. 
   Otherwise, please read on for the solution I'm proposing. 
   
   # Proposed Solution
   
   One way to view this problem is as a case of missing information. 
   While Iceberg does perform some validation checks internally to ensure 
updates don't conflict (a `ValidationException` is thrown in these cases), 
these obviously can't cover use-case specific conditions such as custom 
watermarks. 
   The only way iceberg could know about these is if iceberg is told. 
   Hence I'm proposing we expose an API that allows users to give iceberg this 
information. 
   
   To me, it made the most sense to add this as an overloaded 
`commitTransaction` method to the existing `Transaction` interface. 
   
   ```
   interface CommitCondition {  
       boolean check(Table baseReadOnlyTable, Table newReadOnlyTable);  
   }
   
   interface Transaction {
       ... existing methods ...
       void commitTransaction(CommitCondition commitCondition);
   }
   ```
   
   I think I have a working proof-of-concept of this idea as a pull request 
here: https://github.com/apache/iceberg/pull/6513
   There's plenty of design decisions remaining to be discussed here: 
   - Does it make sense to also add an overloaded `void commit(CommitCondition 
commitCondition)` on the `PendingUpdate` interface as well so non-`Transaction` 
API users can also take advantage of conditional commits? 
   - What should the `CommitCondition` interface look like? 
        - Should we expose the new state of the table that we're attempting to 
commit? 
          Or just the state of the base table (i.e. the `Table` changes were 
based on top of)? 
        - Should users be allowed to throw their own exceptions inside 
`CommitCondition.check`? 
          Or should `CommitCondition.check` just return a `boolean`?
   
   Some of these are implementation details but I just want to make clear that 
I haven't figured all of this out yet. 
   If I can get buy-in that this is an actual problem worth solving and if the 
general approach in the PR makes sense, I would be happy to figure out the 
remaining details to take this draft pull request to the finish line. 
   
   # Specific Usecase: Committer Fencing to enable Exactly-Once Commits
   
   I've tried to describe the problem and solution above in as general a 
fashion as possible because I think this API could be used to enable many and 
varied use-cases beyond just custom watermarks. 
   It might be helpful to understand the specific use-case I'm trying to solve 
for. 
   I have a "datafile-committer" application which does the following: 
   - Reads messages from a kafka topic
   - Extracts the Datafile from each message 
   - Append-commits the Datafile to the appropriate iceberg table
   
   The challenge for us is that we would like to execute this logic in an 
exactly-once fashion. 
   Unfortunately, "datafile-committer" is a distributed kafka consumer 
application, and as a result it's possible for multiple instances of the 
application to handle the same message/Datafile occassionally in exceptional 
scenarios such as: 
   - When there's a lot of rebalacing going on or
   - When an application instance becomes disconnected i.e. a zombie process 
   
   Currently, in these exceptional scenarios, we can end up append-committing 
the same Datafile to an iceberg table multiple times (once for each instance 
that is handling the same message/Datafile). 
   Since each Datafile can contain hundreds of thousands of records, the 
resulting iceberg tables can have a very high number of duplicated records. 
   This is obviously bad. 
   
   While it is possible to minimize how often these events happen, it is nearly 
impossible to guarantee that they will never happen. 
   However, since Kafka messages are associated with monotonically increasing 
offsets, it's possible to include these as a sort-of `custom_watermark` in the 
iceberg table properties that can be referenced at commit time to ensure that 
we always commit a Datafile that has an offset greater than the last committed 
offset to the iceberg table via a `CommitCondition`. 
   In this way, we could achieve effectively once guarantees (actually, there 
would be a little more logic needed to fence out zombie committers and get the 
desired exactly once guarantees but this is just more logic in the 
`CommitCondition`). 
   
   Hopefully that helps explain where I'm coming from. 
   
   ### Query engine
   
   None


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to