VidakM opened a new issue, #9948: URL: https://github.com/apache/iceberg/issues/9948
### Query engine Flink 1.17.2 with Iceberg 1.4.2 libraries ### Question I have a few Iceberg v2 tables defined and a Flink job that reads them in a streaming fashion before transforming to another Iceberg table. If the source tables are basic, then subscribing to them works great and the SQL query can continuously run. But if the tables are defined with `'write.upsert.enabled'='true'`, then the subscribing Flink SQL will read only once, and not react to new snapshots. Even if the SQL definition asks it to monitor intervals and the streaming strategy is any incremental version. Flink streaming query that normally works: ```sql INSERT INTO iceberg.target_packaging SELECT usr.`user_id` AS `user_id`, usr.`adress` AS `address`, ord.`item_id` AS `item_id`, .... FROM iceberg.source_users /*+ OPTIONS('streaming'='true', 'monitor-interval'='15s') */ usr JOIN iceberg.source_orders /*+ OPTIONS('streaming'='true', 'monitor-interval'='15s') */ ord ON usr.`user_id` = ord.`user_id`; ``` The streaming join works great if the source Iceberg tables are defined like this: ``` CREATE TABLE iceberg.source_users ( `user_id` STRING, `adress` STRING, .... PRIMARY KEY (`user_id`) NOT ENFORCED ) with ('format-version'='2'); Resulting table properties example: [current-snapshot-id=7980858807056176990,format=iceberg/parquet,format-version=2,identifier-fields=[user_id],write.parquet.compression-codec=zstd] ``` But the streaming join runs only once, and then stops triggering on new snapshots. It does not finish though, just stops reacting from source and produces no new records. ``` CREATE TABLE iceberg.source_users ( `user_id` STRING, `adress` STRING, .... PRIMARY KEY (`user_id`) NOT ENFORCED ) with ('format-version'='2', 'write.upsert.enabled'='true'); Resulting table properties example: [current-snapshot-id=3566387524956156231,format=iceberg/parquet,format-version=2,identifier-fields=[user_id],write.parquet.compression-codec=zstd,write.upsert.enabled=true] ``` In my Flink job i simply define the connector and run the SQL join/insert. Both source and target table is already defined. I also noticed that If I have an SQL Join, it too stops streaming if at least one table has upsert enabled. Looking at the documentation for both Iceberg and Flink I don't find any indication that enabling upsert should alter the behaviour - but I do remember reading somewhere that FLIP27 only supports append and not update / delete. Is this the reason I'm seeing this behaviour? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org