mxm commented on code in PR #13714:
URL: https://github.com/apache/iceberg/pull/13714#discussion_r2349619546


##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java:
##########
@@ -76,11 +85,21 @@ public void open() throws Exception {
 
   @Override
   public void finish() throws IOException {
-    prepareSnapshotPreBarrier(Long.MAX_VALUE);
+    prepareSnapshotPreBarrier(lastCheckpointId + 1);
   }
 
   @Override
   public void prepareSnapshotPreBarrier(long checkpointId) throws IOException {
+    if (checkpointId == lastCheckpointId) {

Review Comment:
   Got it!
   
   >2. avoid double flush in streaming execution when finish is called before 
checkpoint (for partially closed DAG). this is just a minor improvement.
   
   Avoid the double flushing is important because we would otherwise flush 
twice with the same checkpoint id!
   
   >For 1, curious what's the jira/PR from the Flink side?
   
   The change was done through 
https://issues.apache.org/jira/browse/FLINK-37605 and 
https://github.com/apache/flink/pull/26433. Turns out that for `2.0.1` and 
`1.20.2` we will also need that fix because it was backported to those 
versions. Probably it could be considered somewhat a blocker for the Iceberg 
1.9.0 release because flushing might be broken for those versions.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to