mxm commented on code in PR #13714:
URL: https://github.com/apache/iceberg/pull/13714#discussion_r2358483021
##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java:
##########
@@ -76,11 +85,21 @@ public void open() throws Exception {
@Override
public void finish() throws IOException {
- prepareSnapshotPreBarrier(Long.MAX_VALUE);
+ prepareSnapshotPreBarrier(lastCheckpointId + 1);
}
@Override
public void prepareSnapshotPreBarrier(long checkpointId) throws IOException {
+ if (checkpointId == lastCheckpointId) {
Review Comment:
I think you're right that the patch releases (Flink 1.19.3, 1.20.2, 2.0.1)
should not have this behavioral change.
I've created a thread on the mailing list:
https://lists.apache.org/thread/ymsgmrf7mmw3qkwtp34b32fc9nhx9oy0 and an
associated JIRA: https://issues.apache.org/jira/browse/FLINK-38370
The crux is that `finish()` gets called in both batch and streaming
pipelines. Streaming pipelines theoretically are affected just as batch
pipelines, but in practice they are not because they have full checkpoint
support. In the tests, we use a special bounded source for streaming which
triggers a checkpoint before shutting down, which will always ensure that the
checkpoint goes through before `finish()` gets called. Any subsequent
checkpoints in streaming, which still happen even when operators are finished,
do not flush anything, so the issue is not apparent. This behavior is fine in
streaming because a proper shutdown of a streaming job via stop-with-savepoint
would also ensure a final checkpoint gets created before the finish() method
gets called. A simple cancel without a savepoint would discard the state
written after the last checkpoint, which is expected. So the issue is
batch-only.
For Flink 2.1.0, I think the changed behavior is fine. At this point, we're
mainly concerned about a checkpoint id of 1 for batch pipelines. It may look
unfamiliar, but I think it makes perfect sense. There is exactly one checkpoint
for batch pipelines. I would suggest to go ahead and merge this PR because it
is unlikely we will be able to change the Flink behavior for Flink 2.1.0.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]