mxm commented on code in PR #13714:
URL: https://github.com/apache/iceberg/pull/13714#discussion_r2344732240
##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java:
##########
@@ -76,11 +85,21 @@ public void open() throws Exception {
@Override
public void finish() throws IOException {
- prepareSnapshotPreBarrier(Long.MAX_VALUE);
+ prepareSnapshotPreBarrier(lastCheckpointId + 1);
}
@Override
public void prepareSnapshotPreBarrier(long checkpointId) throws IOException {
+ if (checkpointId == lastCheckpointId) {
Review Comment:
Yes, this change is required for Flink 2.1. The runtime logic changed.
`finish()` no longer can use `Long.MAX_VALUE` as the checkpoint id. In
`finish()` we call: `prepareSnapshotPreBarrier(Long.MAX_VALUE)`. The checkpoint
id needs to be valid, which is why we now use
`prepareSnapshotPreBarrier(lastCheckpointId + 1)`. We restore the checkpoint id
on restore.
When there is a checkpoint triggered before `finish()` gets called by the
runtime, which can happen on shutdown, `prepareSnapshotPreBarrier` will already
be called. We want to avoid sending out the WriteResults twice. We don't even
have the WriteResults available anymore after the first call.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]