aiborodin commented on PR #14182: URL: https://github.com/apache/iceberg/pull/14182#issuecomment-3336891582
@mxm Even with this change, the data loss will still occur for WriteResults with delete files in the scenario described in https://github.com/apache/iceberg/issues/14090. For example, consider the case when the `DynamicCommitter` fails after the first committed `RowDelta` in the `private void commitDeltaTxn()` method: ```java private void commitDeltaTxn(...) { // ... RowDelta rowDelta = null; long checkpointId = -1; for (Map.Entry<Long, List<WriteResult>> e : pendingResults.entrySet()) { // ... if (rowDelta != null && writeResults.stream().anyMatch(writeResult -> writeResult.deleteFiles().length > 0)) { // ... // aiborodin: The data loss occurs if we fail after the first iteration. // Flink will re-attempt all dynamic committables from the last checkpoint and skip the // remaining committables on line 146 because we already committed the first committable // with the current checkpoint ID commitOperation( table, branch, rowDelta, summary, "rowDelta", newFlinkJobId, operatorId, checkpointId); rowDelta = null; } ``` The complete solution is to aggregate _all WriteResults_ for a (checkpoint, table, branch) triplet, which I implemented in https://github.com/apache/iceberg/pull/14092 in `DynamicWriteResultAggregator`. It is valid to aggregate delete files from WriteResults **within a single checkpoint** because all changes within a checkpoint are logically concurrent and get the same sequence number when committed. The non-dynamic `IcebergSink` aggregates WriteResults in the same way within the scope of the same checkpoint. More generally, I think `DynamicCommitter` should not be responsible for aggregating WriteResults from a _single checkpoint_. There is a dedicated class for this - `DynamicWriteResultAggregator`, which decides how commit requests (DynamicCommittables) are created. `DynamicCommitter` should commit incoming requests and rely on a contract of a single commit request per (table, branch, checkpointId) triplet. That's why in https://github.com/apache/iceberg/pull/14092, I changed `NavigableMap<Long, List<WriteResult>> pendingResults` to `NavigableMap<Long, WriteResult> pendingResults` in the `DynamicCommitter` - there should be one and only one commit request per checkpoint to maintain the idempotence contract. Combining WriteResults across checkpoints for appends is a different story. It is valid to do this in DynamicCommitter because it is the only logical place in the code that has the context across multiple checkpoints, while `DynamicWriteResultAggregator` always operates within a single checkpoint. I'm happy to discuss this online in Slack or over a Zoom call to clarify this. @pvary, would you be interested in joining as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
