mxm commented on code in PR #13714:
URL: https://github.com/apache/iceberg/pull/13714#discussion_r2358483021


##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java:
##########
@@ -76,11 +85,21 @@ public void open() throws Exception {
 
   @Override
   public void finish() throws IOException {
-    prepareSnapshotPreBarrier(Long.MAX_VALUE);
+    prepareSnapshotPreBarrier(lastCheckpointId + 1);
   }
 
   @Override
   public void prepareSnapshotPreBarrier(long checkpointId) throws IOException {
+    if (checkpointId == lastCheckpointId) {

Review Comment:
   I think you're right that the patch releases (Flink 1.19.3, 1.20.2, 2.0.1) 
should not have this behavioral change.
   
   I've created a thread on the mailing list: 
https://lists.apache.org/thread/ymsgmrf7mmw3qkwtp34b32fc9nhx9oy0 and an 
associated JIRA: https://issues.apache.org/jira/browse/FLINK-38370
   
   The crux is that `finish()` gets called in both batch and streaming 
pipelines. Streaming pipelines theoretically are affected just as batch 
pipelines, but in practice they are not because they have full checkpoint 
support. In the tests, we use a special bounded source for streaming which 
triggers a checkpoint before shutting down, which will always ensure that the 
checkpoint goes through before `finish()` gets called. Any subsequent 
checkpoints in streaming, which still happen even when operators are finished, 
do not flush anything, so the issue is not apparent. This behavior is fine in 
streaming because a proper shutdown of a streaming job via stop-with-savepoint 
would also ensure a final checkpoint gets created before the finish() method 
gets called. A simple cancel without a savepoint would discard the state 
written after the last checkpoint, which is expected. So the issue is 
batch-only.
   
   For Flink 2.1.0, I think the changed behavior is fine. At this point, we're 
mainly concerned about a checkpoint id of 1 for batch pipelines. It may look 
unfamiliar, but I think it makes perfect sense. There is exactly one checkpoint 
for batch pipelines. I would suggest to go ahead and merge this PR because it 
is unlikely we will be able to change the Flink behavior for Flink 2.1.0.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to