stevenzwu commented on issue #10892:
URL: https://github.com/apache/iceberg/issues/10892#issuecomment-2277069747

   Let's understand the motivation of `maxCommittedCheckpointId`, it was used 
to check if there are any unsuccessful commits from restored checkpoint state. 
But this problem shows a limitation of this approach.
   
   let's also clarify what should be the expected behavior. When users rewind 
the job from an older checkpoint/savepoint to reprocess older data, it 
indicates users are accepting duplicates.
   
   One potential fix is to reset the `maxCommittedCheckpointId` to the restored 
checkpointId after the check. Any uncommitted attempts (before or at the 
restored checkpointId) should be committed as usual. Any uncommitted attempts 
after the restored checkpointId should be discarded.
   
   Let's look at this particular scenario. Let's the previous job completed 
checkpoint 10. But checkpoint 10 have commit failures (say network issue with 
the catalog service). So the c is 9. The job was rewinded to checkpoint 6.
   
   after checking what are the committed checkpoints, committer should reset 
`maxCommittedCheckpointId` to the restored checkpointId `6`. Uncommitted 
checkpoints 10 from previous job should be discarded.
   
   But it is also not an ideal situation if we are looking at the table 
snapshot history. we should say this order (from earliest to latest) regarding 
the committed Flink checkpoints.
   ```
   5, 6, 7, 8, 9, (job rewinded), 7, 8, ...
   ```
   
   It won't be a monotonically increased checkpointIds anymore, which would 
violate the assumption of `getMaxCommittedCheckpointId` method. 
   
   I see two potential options forward.
   
   1. We can fix the `getMaxCommittedCheckpointId` so that it only looks back 
until the monotonically decreasing ends. e.g. when 
`getMaxCommittedCheckpointId` runs for the new job that restored from the 2nd 
checkpoint 8, it should return `8` as the backward searching should stop at the 
2nd checkpointId `7`. the snapshot before it has an higher checkpointId of '9'. 
back tracing should stop in this case.
   
   2. To me, this is also an indication of the limitation of using checkpointId 
to check successful/unsuccessful commits. When I implemented the Flink Iceberg 
sink at my previous company (Netflix) before the OSS implementation here, I 
generates an UUID for each checkpoint entry/commit. During state restore, 
committer use the UUIDs to check if the restored checkpoints/attempts have been 
committed or not. There we don't care about the monotonically increasing 
checkpoint Ids.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to