mxm commented on code in PR #13714:
URL: https://github.com/apache/iceberg/pull/13714#discussion_r2349619546
##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java:
##########
@@ -76,11 +85,21 @@ public void open() throws Exception {
@Override
public void finish() throws IOException {
- prepareSnapshotPreBarrier(Long.MAX_VALUE);
+ prepareSnapshotPreBarrier(lastCheckpointId + 1);
}
@Override
public void prepareSnapshotPreBarrier(long checkpointId) throws IOException {
+ if (checkpointId == lastCheckpointId) {
Review Comment:
Got it!
>2. avoid double flush in streaming execution when finish is called before
checkpoint (for partially closed DAG). this is just a minor improvement.
Avoid the double flushing is important because we would otherwise flush
twice with the same checkpoint id!
>For 1, curious what's the jira/PR from the Flink side?
The change was done through
https://issues.apache.org/jira/browse/FLINK-37605 and
https://github.com/apache/flink/pull/26433. Turns out that for `2.0.1` and
`1.20.2` we will also need that fix because it was backported to those
versions. Probably it could be considered somewhat a blocker for the Iceberg
1.9.0 release because flushing might be broken for those versions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]