lkokhreidze opened a new issue, #10892:
URL: https://github.com/apache/iceberg/issues/10892

   ### Apache Iceberg version
   
   1.5.2
   
   ### Query engine
   
   Flink
   
   ### Please describe the bug 🐞
   
   When Flink's state is restored, Iceberg File Committer gets the max 
committed checkpoint ID from the table's metadata as seen 
[here](https://github.com/apache/iceberg/blob/main/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java#L189).
 When recovering the job from the older savepoint, this behaviour might cause a 
silent data loss for Kafka Source (the one we have observed). Consider the 
following case:
   
   We have Kafka topic which is materialised as Iceberg table with checkpoint 
interval of 30 seconds.
   We create job savepoints twice per day, so we could rollback the job to 
older state if something bad happens. Snapshots for the Iceberg table is kept 
for the last 2 days.
   
   If during the normal operation the job is at checkpoint 10000, if we would 
need to restore the job to the yesterdays version from the savepoint, then 
checkpoint would be rolled back to 9000 (as an example). Savepoint also 
contains offsets of the Kafka source which were committed when savepoint was 
triggered. Upon recovery from the savepoint, Kafka source seeks the 
topic-partition offset based on committed offset in the savepoint.
   Iceberg File Committer though, upon recovery, would still get the max 
checkpoint ID as 10000 from table's  and it would skip committing any files to 
iceberg table due to 
[this](https://github.com/apache/iceberg/blob/main/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java#L240)
 check. The problem is that there's no guarantee that Kafka source will consume 
exact same amount of data as before. So it might consume lag faster now due to 
increased parallelism, therefore in just 500 checkpoints it might consume the 
whole topic lag, while Iceberg File Committer would skip the committing 
anything new to the table since ongoing checkpoint ID will still be less then 
max checkpoint stored in the table metadata.
   
   ### Willingness to contribute
   
   - [ ] I can contribute a fix for this bug independently
   - [X] I would be willing to contribute a fix for this bug with guidance from 
the Iceberg community
   - [ ] I cannot contribute a fix for this bug at this time


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to