mxm commented on code in PR #13714:
URL: https://github.com/apache/iceberg/pull/13714#discussion_r2348129511


##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java:
##########
@@ -76,11 +85,21 @@ public void open() throws Exception {
 
   @Override
   public void finish() throws IOException {
-    prepareSnapshotPreBarrier(Long.MAX_VALUE);
+    prepareSnapshotPreBarrier(lastCheckpointId + 1);
   }
 
   @Override
   public void prepareSnapshotPreBarrier(long checkpointId) throws IOException {
+    if (checkpointId == lastCheckpointId) {

Review Comment:
   `prepareSnapshotPreBarrier` is necessary for flushing any records which 
should be part of the next snapshot. The method is called before the barrier is 
sent downstream, which ensures the records are processed before the snapshot is 
being taken. If we did not sent those records downstream, they must be saved as 
part of the checkpointed state and processed after the checkpoint finishes.
   
   In the context of the Iceberg sink, flushing state in 
`prepareSnapshotPreBarrier` means that the data files written will be committed 
to the Iceberg table as part of the Flink checkpoint. If we did not do that, 
those files would be part of the next snapshot which would mean that the table 
changes aren't visible until the next Flink checkpoint.
   
   >what about batch execution? I think we added the prepareSnapshotPreBarrier 
mainly for batch execution. Now, batch execution can commit to Iceberg with 
checkpointId as 1, which is a valid checkpoint id. I don't see a correctness 
problem with that. but it does feels weird when inspecting the table snapshot 
summary.
   
   It is useful also in streaming mode, to make the table changes visible 
faster, but it is 100% required for batch to ensure all data gets committed to 
the table on shutdown. Showing checkpoint id 1 might surprise users who 
normally saw Long.MAX_VALUE, but I don't see a big issue with that, since it 
reflects what's actually happening in Flink.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to