stevenzwu commented on code in PR #13714:
URL: https://github.com/apache/iceberg/pull/13714#discussion_r2353201142
##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java:
##########
@@ -76,11 +85,21 @@ public void open() throws Exception {
@Override
public void finish() throws IOException {
- prepareSnapshotPreBarrier(Long.MAX_VALUE);
+ prepareSnapshotPreBarrier(lastCheckpointId + 1);
}
@Override
public void prepareSnapshotPreBarrier(long checkpointId) throws IOException {
+ if (checkpointId == lastCheckpointId) {
Review Comment:
agree that the change in the `finish` method fixes the checkpoint id for
streaming execution. But it is arguable that checkpoint id (`1`) is incorrect
for the batch execution.
I tried the `TestFlinkTableSinkExtended#testHashDistributeMode` test as you
suggested.
```
@Override
public void finish() throws IOException {
if (getRuntimeContext().getJobType() == JobType.BATCH) {
prepareSnapshotPreBarrier(Long.MAX_VALUE);
} else {
prepareSnapshotPreBarrier(lastCheckpointId + 1);
}
}
```
It works correctly with V1 sink, as the `IcebergFileCommitter#endInput()`
method was called in the batch execution mode that would commit the flushed
data with checkpoint id `Long.MAX_VALUE`.
But with V2 sink, the `IcebergCommitter` doesn't have similar callback for
end of input. Is this a limitation of V2 sink that can be improved/fixed? I
would imagine `Committer#commit(Collection<CommitRequest<CommT>>)` should be
called for end of input in the batch execution mode for the v2 sink framework
regardless of the checkpoint id passed down? It seems more correct semantically
to pass down checkpoint id as `Long.MAX_VALUE` than `1` for batch execution
mode.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]