mxm commented on issue #14090: URL: https://github.com/apache/iceberg/issues/14090#issuecomment-3319216551
Thanks for reporting this issue! I just checked the code to see how the scenario you described can occur. Let me describe what happens from the `DynamicCommitter` perspective in the scenario which you outlined. **Components** - DynamicWriter - DynamicWriteResultAggregator - DynamicCommitter **Flow** 1. Checkpoint 1 gets triggered from the source. Checkpoint barrier travels through the DAG until it reaches the sink. 2. Checkpoint barrier arrives at the sink. 3. Flink's sink framework triggers `DynamicWriter#prepareCommit()` which sends a DynamicWriteResult downstream to DynamicWriteResultAggregator 4. Checkpoint barrier gets send downstream and arrives at DynamicWriteResultAggregator 5. `DynamicWriteResultAggregator#prepareSnapshotPreBarrier(1)` gets called. Manifests are written for each DynamicWriteResult and send downstream to DynamicCommitter. 6. Checkpoint barrier gets send dowstream and arrives at DynamicCommitter. 7. `DynamicCommiter#commit()` gets called. DynamicCommitter groups the committables by `TableKey (table name, branch name) => checkpoint id => committables`. 8. The maximum checkpoint id is looked up per TableKey from the `flink.max-committed-checkpoint-id` snapshot property. Every table/branch has its own snapshot property for the max committed checkpoint id. Committables are filtered out who are less or equal than the the max looked up checkpoint id. 9. For each TableKey (table name / branch), we commit _each_ committable as an Iceberg table snapshot. 10. Checkpoint 1 completes. (9) is where the flaw lies, because every committable (WriteResult) updates the snapshot properties with the the max checkpoint id. We can fix this issue by staging all WriteResults for a given TableKey and only then commit. This will also work across branches because the max checkpoint id is already maintained per branch. Even if we fail in the process, we will only attempt to commit the WriteResults for a given TableKey which haven't been committed previously. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
