elopezal opened a new issue, #9504:
URL: https://github.com/apache/iceberg/issues/9504

   ### Apache Iceberg version
   
   1.4.0
   
   ### Query engine
   
   Spark
   
   ### Please describe the bug 🐞
   
   We are reading with spark streaming in Scala an Iceberg table as source to 
write into another Iceberg table.
   The source Iceberg table receives the information from kafka and it has 
compact and expiring snapshot maintenance options.
   
   We are using this code to read the source table
   
   ```
   spark.readStream
     .format("iceberg")
     .load(s"${icebergConf.icebergTableQualifier}")
   ```
   
   Source Iceberg table after different writeStream processes and compact 
maintenance operations has the following snapshots
   
   <img width="551" alt="Captura de pantalla 2024-01-18 a las 11 17 20" 
src="https://github.com/apache/iceberg/assets/157010260/9b8cca78-63f1-47df-917a-ad12e56154e8";>
   
   This situation works fine and we are able to readStream from this Iceberg 
table to write into another one, but once the expire snapshot maintenance 
operations are done and the oldest snapshots are removed the process fails and 
we have the following error.
   
   <img width="550" alt="Captura de pantalla 2024-01-18 a las 11 24 10" 
src="https://github.com/apache/iceberg/assets/157010260/80a2ad44-efe1-44db-8a0e-0b7e03e36d86";>
   
   
   `Exception in thread "main" ERROR: [STREAM_FAILED] Query [id = 
c084caa6-0907-4781-b6a7-8f8991929f97, runId = 
7781194d-298d-4929-be8e-c5407bd98566] terminated with exception: Cannot load 
current offset at snapshot 1615816462090596768, the snapshot was expired or 
removed`
   
   How can we readStream from an Iceberg table whose old snapshots are expiring?
   
   We tried to get timestamp from the newest snapshot to apply 
stream-from-timestamp configuration but it didn't work and we got the same error
   
   ```
       if 
(spark.catalog.tableExists(s"${icebergConf.icebergTableQualifier}.snapshots")) {
         val latestSnapshotTimestampDF = spark.read
           .table(s"${icebergConf.icebergTableQualifier}.snapshots")
           .agg(F.max("committed_at").as("latest_snapshot_timestamp"))
   
         // Get the latest snapshot timestamp as a Long
         val latestSnapshotTimestamp = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss.SSS")
           .parse(
             latestSnapshotTimestampDF
               .head()
               .getAs[Long]("latest_snapshot_timestamp")
               .toString)
           .getTime
   
         logger.info(s"Latest Snapshot Timestamp: ${latestSnapshotTimestamp}")
   
         spark.readStream
           .format("iceberg")
           .option("stream-from-timestamp", latestSnapshotTimestamp)
           .load(s"${icebergConf.icebergTableQualifier}")
       }
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to