aiborodin commented on PR #14182:
URL: https://github.com/apache/iceberg/pull/14182#issuecomment-3336891582

   @mxm Even with this change, the data loss will still occur for WriteResults 
with delete files in the scenario described in 
https://github.com/apache/iceberg/issues/14090.  For example, consider the case 
when the `DynamicCommitter` fails after the first committed `RowDelta` in the 
`private void commitDeltaTxn()` method:
   ```java
   private void commitDeltaTxn(...) {
     // ...
     RowDelta rowDelta = null;
     long checkpointId = -1;
   
     for (Map.Entry<Long, List<WriteResult>> e : pendingResults.entrySet()) {
         // ...
         if (rowDelta != null
             && writeResults.stream().anyMatch(writeResult -> 
writeResult.deleteFiles().length > 0)) {
           // ...
           // aiborodin: The data loss occurs if we fail after the first 
iteration.
           // Flink will re-attempt all dynamic committables from the last 
checkpoint and skip the
           // remaining committables on line 146 because we already committed 
the first committable
           // with the current checkpoint ID
           commitOperation(
               table, branch, rowDelta, summary, "rowDelta", newFlinkJobId, 
operatorId, checkpointId);
           rowDelta = null;
         }
   ```
   
   The complete solution is to aggregate _all WriteResults_ for a (checkpoint, 
table, branch) triplet, which I implemented in 
https://github.com/apache/iceberg/pull/14092 in `DynamicWriteResultAggregator`. 
It is valid to aggregate delete files from WriteResults **within a single 
checkpoint** because all changes within a checkpoint are logically concurrent 
and get the same sequence number when committed. The non-dynamic `IcebergSink` 
aggregates WriteResults in the same way within the scope of the same checkpoint.
   
   More generally, I think `DynamicCommitter` should not be responsible for 
aggregating WriteResults from a _single checkpoint_. There is a dedicated class 
for this - `DynamicWriteResultAggregator`, which decides how commit requests 
(DynamicCommittables) are created. `DynamicCommitter` should commit incoming 
requests and rely on a contract of a single commit request per (table, branch, 
checkpointId) triplet. That's why in 
https://github.com/apache/iceberg/pull/14092, I changed `NavigableMap<Long, 
List<WriteResult>> pendingResults` to `NavigableMap<Long, WriteResult> 
pendingResults` in the `DynamicCommitter` - there should be one and only one 
commit request per checkpoint to maintain the idempotence contract.
   
   Combining WriteResults across checkpoints for appends is a different story. 
It is valid to do this in DynamicCommitter because it is the only logical place 
in the code that has the context across multiple checkpoints, while 
`DynamicWriteResultAggregator` always operates within a single checkpoint.
   
   I'm happy to discuss this online in Slack or over a Zoom call to clarify 
this. @pvary, would you be interested in joining as well?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to