stevenzwu commented on code in PR #13714:
URL: https://github.com/apache/iceberg/pull/13714#discussion_r2353201142


##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java:
##########
@@ -76,11 +85,21 @@ public void open() throws Exception {
 
   @Override
   public void finish() throws IOException {
-    prepareSnapshotPreBarrier(Long.MAX_VALUE);
+    prepareSnapshotPreBarrier(lastCheckpointId + 1);
   }
 
   @Override
   public void prepareSnapshotPreBarrier(long checkpointId) throws IOException {
+    if (checkpointId == lastCheckpointId) {

Review Comment:
   agree that the change in the `finish` method fixes the checkpoint id for 
streaming execution. But it is arguable that checkpoint id (`1`) is incorrect 
for the batch execution.
   
   I tried the `TestFlinkTableSinkExtended#testHashDistributeMode` test as you 
suggested. 
   ```
     @Override
     public void finish() throws IOException {
       if (getRuntimeContext().getJobType() == JobType.BATCH) {
         prepareSnapshotPreBarrier(Long.MAX_VALUE);
       } else  {
         prepareSnapshotPreBarrier(lastCheckpointId + 1);
       }
     }
   ```
   
   It works correctly with V1 sink,  as the `IcebergFileCommitter#endInput()` 
method was called in the batch execution mode that would commit the flushed 
data with checkpoint id `Long.MAX_VALUE`. 
   
   But with V2 sink, the `IcebergCommitter` doesn't have similar callback for 
end of input. Is this a limitation of V2 sink that can be improved/fixed? I 
would imagine `Committer#commit(Collection<CommitRequest<CommT>>)` should be 
called for end of input in the v2 sink framework?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to