mxm commented on code in PR #13714:
URL: https://github.com/apache/iceberg/pull/13714#discussion_r2353581979
##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java:
##########
@@ -76,11 +85,21 @@ public void open() throws Exception {
@Override
public void finish() throws IOException {
- prepareSnapshotPreBarrier(Long.MAX_VALUE);
+ prepareSnapshotPreBarrier(lastCheckpointId + 1);
}
@Override
public void prepareSnapshotPreBarrier(long checkpointId) throws IOException {
+ if (checkpointId == lastCheckpointId) {
Review Comment:
>agree that the change in the finish method fixes the checkpoint id for
streaming execution. But it is arguable that checkpoint id (1) is incorrect for
the batch execution.
I agree that it might confuse users if they do not see `Long.MAX_VALUE`
anymore for batch pipelines. Apart from that, a checkpoint id of 1 for batch
can make sense because there is technically only one checkpoint in batch
pipelines when they finish.
As you observed, this specifically is a limitation of the V2 commit
framework.
The reason why this no longer works is here:
https://github.com/apache/flink/blob/5ad464d8156b5094e0aba7712e4aa22b9f44e84d/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java#L154
Flink's CommitterOperator keeps track of the last completed checkpoint id
and filters any committables which have a higher checkpoint id:
https://github.com/apache/flink/blob/5ad464d8156b5094e0aba7712e4aa22b9f44e84d/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java#L167
I think what we can do, is to write `Long.MAX_VALUE` to the manifest for
batch pipelines and thereby keep the current behavior. The runtime can still
use a different checkpoint id for batch.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]