stevenzwu commented on code in PR #13714:
URL: https://github.com/apache/iceberg/pull/13714#discussion_r2349785830
##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java:
##########
@@ -76,11 +85,21 @@ public void open() throws Exception {
@Override
public void finish() throws IOException {
- prepareSnapshotPreBarrier(Long.MAX_VALUE);
+ prepareSnapshotPreBarrier(lastCheckpointId + 1);
}
@Override
public void prepareSnapshotPreBarrier(long checkpointId) throws IOException {
+ if (checkpointId == lastCheckpointId) {
Review Comment:
> Avoid the double flushing is important because we would otherwise flush
twice with the same checkpoint id!
I was earlier thinking in the writer operator. realized that this is the
aggregator operator for v2 sink.
> Turns out that for 2.0.1 and 1.20.2 we will also need that fix because it
was backported to those versions.
agree. we need to backport this change to 2.0 and 1.20 modules.
> Probably it could be considered somewhat a blocker for the Iceberg 1.9.0
release because flushing might be broken for those versions.
do you mean Iceberg 1.10.0 release? it was announced already.
> but it is 100% required for batch to ensure all data gets committed to the
table on shutdown. Showing checkpoint id 1 might surprise users who normally
saw Long.MAX_VALUE
This is still my concern. is it possible to know the execution mode
(streaming or batch) and set the checkpoint id differently based on the mode.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]