mxm commented on code in PR #13714:
URL: https://github.com/apache/iceberg/pull/13714#discussion_r2344732240


##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java:
##########
@@ -76,11 +85,21 @@ public void open() throws Exception {
 
   @Override
   public void finish() throws IOException {
-    prepareSnapshotPreBarrier(Long.MAX_VALUE);
+    prepareSnapshotPreBarrier(lastCheckpointId + 1);
   }
 
   @Override
   public void prepareSnapshotPreBarrier(long checkpointId) throws IOException {
+    if (checkpointId == lastCheckpointId) {

Review Comment:
   Yes, this change is required for Flink 2.1. The runtime logic changed. 
`finish()` no longer can use `Long.MAX_VALUE` as the checkpoint id. In 
`finish()` we call: `prepareSnapshotPreBarrier(Long.MAX_VALUE)`. The checkpoint 
id needs to be valid, which is why we now use 
`prepareSnapshotPreBarrier(lastCheckpointId + 1)`. We restore the checkpoint id 
on restore.
   
   When there is a checkpoint triggered before `finish()` gets called by the 
runtime, which can happen on shutdown, `prepareSnapshotPreBarrier` will already 
be called. We want to avoid sending out the WriteResults twice. We don't even 
have the WriteResults available anymore after the first call.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to