lkokhreidze opened a new issue, #10892: URL: https://github.com/apache/iceberg/issues/10892
### Apache Iceberg version 1.5.2 ### Query engine Flink ### Please describe the bug 🐞 When Flink's state is restored, Iceberg File Committer gets the max committed checkpoint ID from the table's metadata as seen [here](https://github.com/apache/iceberg/blob/main/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java#L189). When recovering the job from the older savepoint, this behaviour might cause a silent data loss for Kafka Source (the one we have observed). Consider the following case: We have Kafka topic which is materialised as Iceberg table with checkpoint interval of 30 seconds. We create job savepoints twice per day, so we could rollback the job to older state if something bad happens. Snapshots for the Iceberg table is kept for the last 2 days. If during the normal operation the job is at checkpoint 10000, if we would need to restore the job to the yesterdays version from the savepoint, then checkpoint would be rolled back to 9000 (as an example). Savepoint also contains offsets of the Kafka source which were committed when savepoint was triggered. Upon recovery from the savepoint, Kafka source seeks the topic-partition offset based on committed offset in the savepoint. Iceberg File Committer though, upon recovery, would still get the max checkpoint ID as 10000 from table's and it would skip committing any files to iceberg table due to [this](https://github.com/apache/iceberg/blob/main/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java#L240) check. The problem is that there's no guarantee that Kafka source will consume exact same amount of data as before. So it might consume lag faster now due to increased parallelism, therefore in just 500 checkpoints it might consume the whole topic lag, while Iceberg File Committer would skip the committing anything new to the table since ongoing checkpoint ID will still be less then max checkpoint stored in the table metadata. ### Willingness to contribute - [ ] I can contribute a fix for this bug independently - [X] I would be willing to contribute a fix for this bug with guidance from the Iceberg community - [ ] I cannot contribute a fix for this bug at this time -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org