mxm commented on code in PR #13714:
URL: https://github.com/apache/iceberg/pull/13714#discussion_r2351547795
##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java:
##########
@@ -76,11 +85,21 @@ public void open() throws Exception {
@Override
public void finish() throws IOException {
- prepareSnapshotPreBarrier(Long.MAX_VALUE);
+ prepareSnapshotPreBarrier(lastCheckpointId + 1);
}
@Override
public void prepareSnapshotPreBarrier(long checkpointId) throws IOException {
+ if (checkpointId == lastCheckpointId) {
Review Comment:
>do you mean Iceberg 1.10.0 release? it was announced already.
Yes, I meant 1.10.0 :) We may have to fix it for 1.10.1 then.
>This is still my concern. is it possible to know the execution mode
(streaming or batch) and set the checkpoint id differently based on the mode?
I tried again setting the checkpoint id to `Long.MAX_VALUE` in the
`finish()` method, but that does not work anymore. Counter-intuitively, in
streaming mode it does work, but in batch mode it does not. Previously, state
for the special checkpoint id `Long.MAX_VALUE` was factored in, but that's no
longer the case. We need the actual next checkpoint id for the state to be
flushed. Otherwise, the latest snapshot will not include all data. You can test
this by running a test like:
`TestFlinkTableSinkExtended#testHashDistributeMode`.
This works, but I think it's not what you want:
```java
@Override
public void finish() throws IOException {
if (getRuntimeContext().getJobType() == JobType.STREAMING) {
prepareSnapshotPreBarrier(Long.MAX_VALUE);
} else {
prepareSnapshotPreBarrier(lastCheckpointId + 1);
}
}
```
Just to be clear, this does NOT work (fails in batch mode):
```java
@Override
public void finish() throws IOException {
if (getRuntimeContext().getJobType() == JobType.BATCH) {
prepareSnapshotPreBarrier(Long.MAX_VALUE);
} else {
prepareSnapshotPreBarrier(lastCheckpointId + 1);
}
}
```
All in all, this new behavior raises some questions, but I think it is
probably a good idea to start using the correct checkpoint id and move away
from markers like `Long.MAX_VALUE`. This is what also other two phase commit
sinks do in Flink:
```java
@Override
public void finish() throws IOException {
prepareSnapshotPreBarrier(lastCheckpointId + 1);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]