stevenzwu commented on issue #10892: URL: https://github.com/apache/iceberg/issues/10892#issuecomment-2277069747
Let's understand the motivation of `maxCommittedCheckpointId`, it was used to check if there are any unsuccessful commits from restored checkpoint state. But this problem shows a limitation of this approach. let's also clarify what should be the expected behavior. When users rewind the job from an older checkpoint/savepoint to reprocess older data, it indicates users are accepting duplicates. One potential fix is to reset the `maxCommittedCheckpointId` to the restored checkpointId after the check. Any uncommitted attempts (before or at the restored checkpointId) should be committed as usual. Any uncommitted attempts after the restored checkpointId should be discarded. Let's look at this particular scenario. Let's the previous job completed checkpoint 10. But checkpoint 10 have commit failures (say network issue with the catalog service). So the c is 9. The job was rewinded to checkpoint 6. after checking what are the committed checkpoints, committer should reset `maxCommittedCheckpointId` to the restored checkpointId `6`. Uncommitted checkpoints 10 from previous job should be discarded. But it is also not an ideal situation if we are looking at the table snapshot history. we should say this order (from earliest to latest) regarding the committed Flink checkpoints. ``` 5, 6, 7, 8, 9, (job rewinded), 7, 8, ... ``` It won't be a monotonically increased checkpointIds anymore, which would violate the assumption of `getMaxCommittedCheckpointId` method. I see two potential options forward. 1. We can fix the `getMaxCommittedCheckpointId` so that it only looks back until the monotonically decreasing ends. e.g. when `getMaxCommittedCheckpointId` runs for the new job that restored from the 2nd checkpoint 8, it should return `8` as the backward searching should stop at the 2nd checkpointId `7`. the snapshot before it has an higher checkpointId of '9'. back tracing should stop in this case. 2. To me, this is also an indication of the limitation of using checkpointId to check successful/unsuccessful commits. When I implemented the Flink Iceberg sink at my previous company (Netflix) before the OSS implementation here, I generates an UUID for each checkpoint entry/commit. During state restore, committer use the UUIDs to check if the restored checkpoints/attempts have been committed or not. There we don't care about the monotonically increasing checkpoint Ids. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org