mxm commented on code in PR #13714:
URL: https://github.com/apache/iceberg/pull/13714#discussion_r2348129511
##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java:
##########
@@ -76,11 +85,21 @@ public void open() throws Exception {
@Override
public void finish() throws IOException {
- prepareSnapshotPreBarrier(Long.MAX_VALUE);
+ prepareSnapshotPreBarrier(lastCheckpointId + 1);
}
@Override
public void prepareSnapshotPreBarrier(long checkpointId) throws IOException {
+ if (checkpointId == lastCheckpointId) {
Review Comment:
`prepareSnapshotPreBarrier` is necessary for flushing any records which
should be part of the next snapshot. The method is called before the barrier is
sent downstream, which ensures the records are processed before the snapshot is
being taken. If we did not sent those records downstream, they must be saved as
part of the checkpointed state and processed after the checkpoint finishes.
In the context of the Iceberg sink, flushing state in
`prepareSnapshotPreBarrier` means that the data files written will be committed
to the Iceberg table as part of the Flink checkpoint. If we did not do that,
those files would be part of the next snapshot which would mean that the table
changes aren't visible until the next Flink checkpoint.
>what about batch execution? I think we added the prepareSnapshotPreBarrier
mainly for batch execution. Now, batch execution can commit to Iceberg with
checkpointId as 1, which is a valid checkpoint id. I don't see a correctness
problem with that. but it does feels weird when inspecting the table snapshot
summary.
It is useful also in streaming mode, to make the table changes visible
faster, but it is 100% required for batch to ensure all data gets committed to
the table on shutdown. Showing checkpoint id 1 might surprise users who
normally saw Long.MAX_VALUE, but I don't see a big issue with that, since it
reflects what's actually happening in Flink.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]