Guosmilesmile opened a new pull request, #12639: URL: https://github.com/apache/iceberg/pull/12639
We encountered a scenario where, when using Flink source to incrementally consume data from Iceberg, the lastSnapshotId being consumed has already been cleaned up. This can happen, for example, through Spark's expire_snapshots (CALL iceberg.system.expire_snapshots(table => 'default.my_table', older_than => TIMESTAMP '2025-03-25 00:00:00.000', retain_last => 1)) or in other cases where consumption is too slow and historical snapshots are cleaned up. ```java Caused by: java.lang.IllegalArgumentException: Starting snapshot (exclusive) 2444106500863389603 is not a parent ancestor of end snapshot 2357271669960485754 at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument(Preconditions.java:430) at org.apache.iceberg.BaseIncrementalScan.fromSnapshotIdExclusive(BaseIncrementalScan.java:179) at org.apache.iceberg.BaseIncrementalScan.planFiles(BaseIncrementalScan.java:104) at org.apache.iceberg.BaseIncrementalAppendScan.planTasks(BaseIncrementalAppendScan.java:61) at org.apache.iceberg.flink.source.FlinkSplitPlanner.planTasks(FlinkSplitPlanner.java:119) at org.apache.iceberg.flink.source.FlinkSplitPlanner.planIcebergSourceSplits(FlinkSplitPlanner.java:76) at org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlannerImpl.discoverIncrementalSplits(ContinuousSplitPlannerImpl.java:135) at org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlannerImpl.planSplits(ContinuousSplitPlannerImpl.java:83) at org.apache.iceberg.flink.source.enumerator.ContinuousIcebergEnumerator.discoverSplits(ContinuousIcebergEnumerator.java:130) at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$5(ExecutorNotifier.java:130) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ``` In such scenarios, the Flink job will repeatedly restart due to the snapshot ID stored in the state being unavailable, requiring manual intervention to restart the job for recovery. We hope to have a mechanism that allows the job to recover automatically when encountering this situation, similar to how Kafka handles out-of-range offsets by automatically starting to consume from the earliest or latest available data. This PR mainly adds a configuration option called snapshot-expiration-reset-strategy. When the lastSnapshot is not the parent ancestor of the current snapshot, it can be handled in three ways to avoid manual intervention to restart the job for recovery : Default mode: Maintain the current behavior. Earliest mode: Start incremental consumption from the oldest snapshot as the lastSnapshot. Latest mode: Start incremental consumption from the current latest snapshot as the lastSnapshot. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org