stevenzwu commented on code in PR #13714:
URL: https://github.com/apache/iceberg/pull/13714#discussion_r2353906851
##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java:
##########
@@ -76,11 +85,21 @@ public void open() throws Exception {
@Override
public void finish() throws IOException {
- prepareSnapshotPreBarrier(Long.MAX_VALUE);
+ prepareSnapshotPreBarrier(lastCheckpointId + 1);
}
@Override
public void prepareSnapshotPreBarrier(long checkpointId) throws IOException {
+ if (checkpointId == lastCheckpointId) {
Review Comment:
I am looking at the linked PR
https://github.com/apache/flink/pull/26433/files
It seems to me that it is incorrect for the `CommitterOperator` to validate
the checkpoint id for the `endInput` method.
<img width="671" height="188" alt="image"
src="https://github.com/user-attachments/assets/973a6732-7729-4dc5-a839-56ce1d93e15d"
/>
As the code comment suggested, all committables should be committed here.
```
// There will be no final checkpoint, all committables should be committed
here
```
It should have called
```
commitAndEmitCheckpoints(Long.MAX_VALUE)
```
If the `IcebergWriteAggregator.lastCheckpointId` has an initial value larger
than 0 for batch execution, this PR would fail. Flink `CommitterOperator`
would only allows checkpointId `1` to commit with its current logic after the
Flink PR 26433.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]