Guosmilesmile opened a new pull request, #12639:
URL: https://github.com/apache/iceberg/pull/12639

   We encountered a scenario where, when using Flink source to incrementally 
consume data from Iceberg, the lastSnapshotId being consumed has already been 
cleaned up. This can happen, for example, through Spark's expire_snapshots 
(CALL iceberg.system.expire_snapshots(table => 'default.my_table', older_than 
=> TIMESTAMP '2025-03-25 00:00:00.000', retain_last => 1)) or in other cases 
where consumption is too slow and historical snapshots are cleaned up. 
   
   ```java
   
   
   Caused by: java.lang.IllegalArgumentException: Starting snapshot (exclusive) 
2444106500863389603 is not a parent ancestor of end snapshot 2357271669960485754
     at 
org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument(Preconditions.java:430)
     at 
org.apache.iceberg.BaseIncrementalScan.fromSnapshotIdExclusive(BaseIncrementalScan.java:179)
     at 
org.apache.iceberg.BaseIncrementalScan.planFiles(BaseIncrementalScan.java:104)
     at 
org.apache.iceberg.BaseIncrementalAppendScan.planTasks(BaseIncrementalAppendScan.java:61)
     at 
org.apache.iceberg.flink.source.FlinkSplitPlanner.planTasks(FlinkSplitPlanner.java:119)
     at 
org.apache.iceberg.flink.source.FlinkSplitPlanner.planIcebergSourceSplits(FlinkSplitPlanner.java:76)
     at 
org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlannerImpl.discoverIncrementalSplits(ContinuousSplitPlannerImpl.java:135)
     at 
org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlannerImpl.planSplits(ContinuousSplitPlannerImpl.java:83)
     at 
org.apache.iceberg.flink.source.enumerator.ContinuousIcebergEnumerator.discoverSplits(ContinuousIcebergEnumerator.java:130)
     at 
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$5(ExecutorNotifier.java:130)
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
     at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
     at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
   
   
   ```
   
   
   In such scenarios, the Flink job will repeatedly restart due to the snapshot 
ID stored in the state being unavailable, requiring manual intervention to 
restart the job for recovery. We hope to have a mechanism that allows the job 
to recover automatically when encountering this situation, similar to how Kafka 
handles out-of-range offsets by automatically starting to consume from the 
earliest or latest available data.
   
   
   This PR mainly adds a configuration option called 
snapshot-expiration-reset-strategy. When the lastSnapshot is not the parent 
ancestor of the current snapshot, it can be handled in three ways to avoid 
manual intervention to restart the job for recovery :
   
   Default mode: Maintain the current behavior.
   Earliest mode: Start incremental consumption from the oldest snapshot as the 
lastSnapshot.
   Latest mode: Start incremental consumption from the current latest snapshot 
as the lastSnapshot.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to