cccs-jc opened a new issue, #10156:
URL: https://github.com/apache/iceberg/issues/10156

   ### Apache Iceberg version
   
   1.5.0 (latest release)
   
   ### Query engine
   
   Spark
   
   ### Please describe the bug 🐞
   
   When using spark readStream the option `stream-from-timestamp` is used to 
position the read at the specified timestamp. The query below uses the current 
time to read from the head of the queue.
   ```python
   ts = int(time.time() * 1000)
   df = spark.readStream.format("iceberg")
           .option("streaming-skip-delete-snapshots", True)
           .option("streaming-skip-overwrite-snapshots", True)
           .option("streaming-max-files-per-micro-batch", max_files)
           .option("streaming-max-rows-per-micro-batch", max_rows)
           .option("stream-from-timestamp", ts)
           .load(source_table)
   ```
   
   You can kill your streaming job and wait 10 minutes. Then start it again. 
The readStream will load the checkpointed offset from disk and is supposed to 
read from that offset. However, there is a bug that cause it to skip the 
commits that occurred in that 10 minutes and instead the readStream reads from 
the latest commit.
   
   I can work around this bug by not specifying the  `stream-from-timestamp` if 
the query uses the checkpointed offset.
   ```python
       ts = int(time.time() * 1000)
       use_ts = checkpoint_dir_exists(checkpoint_location) == False
       df = ( 
           get_spark().readStream.format("iceberg")
           .option("streaming-skip-delete-snapshots", True)
           .option("streaming-skip-overwrite-snapshots", True)
           .option("streaming-max-files-per-micro-batch", max_files)
           .option("streaming-max-rows-per-micro-batch", max_rows)
           )
       if use_ts:
           df = df.option("stream-from-timestamp", ts)
       df = df.load(source_table)
   ```
   But this is error prone. As a user I expect the readStream to continue from 
the last checkpointed offset.
   
   I suspect the issue might be here. 
https://github.com/apache/iceberg/blob/fc5b2b336c774b0b8b032f7d87a1fb21e76b3f20/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java#L326
   
   This seems to short-circuit the saved `startOffset`  by checking of the last 
snapshot in the table is older than the requested `stream-from-timestamp`. 
   
   @singhpk234 I have not stepped through the code to be sure. Is my theory 
possible?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to