elopezal opened a new issue, #9504: URL: https://github.com/apache/iceberg/issues/9504
### Apache Iceberg version 1.4.0 ### Query engine Spark ### Please describe the bug 🐞 We are reading with spark streaming in Scala an Iceberg table as source to write into another Iceberg table. The source Iceberg table receives the information from kafka and it has compact and expiring snapshot maintenance options. We are using this code to read the source table ``` spark.readStream .format("iceberg") .load(s"${icebergConf.icebergTableQualifier}") ``` Source Iceberg table after different writeStream processes and compact maintenance operations has the following snapshots <img width="551" alt="Captura de pantalla 2024-01-18 a las 11 17 20" src="https://github.com/apache/iceberg/assets/157010260/9b8cca78-63f1-47df-917a-ad12e56154e8"> This situation works fine and we are able to readStream from this Iceberg table to write into another one, but once the expire snapshot maintenance operations are done and the oldest snapshots are removed the process fails and we have the following error. <img width="550" alt="Captura de pantalla 2024-01-18 a las 11 24 10" src="https://github.com/apache/iceberg/assets/157010260/80a2ad44-efe1-44db-8a0e-0b7e03e36d86"> `Exception in thread "main" ERROR: [STREAM_FAILED] Query [id = c084caa6-0907-4781-b6a7-8f8991929f97, runId = 7781194d-298d-4929-be8e-c5407bd98566] terminated with exception: Cannot load current offset at snapshot 1615816462090596768, the snapshot was expired or removed` How can we readStream from an Iceberg table whose old snapshots are expiring? We tried to get timestamp from the newest snapshot to apply stream-from-timestamp configuration but it didn't work and we got the same error ``` if (spark.catalog.tableExists(s"${icebergConf.icebergTableQualifier}.snapshots")) { val latestSnapshotTimestampDF = spark.read .table(s"${icebergConf.icebergTableQualifier}.snapshots") .agg(F.max("committed_at").as("latest_snapshot_timestamp")) // Get the latest snapshot timestamp as a Long val latestSnapshotTimestamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS") .parse( latestSnapshotTimestampDF .head() .getAs[Long]("latest_snapshot_timestamp") .toString) .getTime logger.info(s"Latest Snapshot Timestamp: ${latestSnapshotTimestamp}") spark.readStream .format("iceberg") .option("stream-from-timestamp", latestSnapshotTimestamp) .load(s"${icebergConf.icebergTableQualifier}") } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org