VidakM opened a new issue, #9948:
URL: https://github.com/apache/iceberg/issues/9948

   ### Query engine
   
   Flink 1.17.2 with Iceberg 1.4.2 libraries
   
   ### Question
   
   I have a few Iceberg v2 tables defined and a Flink job that reads them in a 
streaming fashion before transforming to another Iceberg table.
   
   If the source tables are basic, then subscribing to them works great and the 
SQL query can continuously run. 
   
   But if the tables are defined with `'write.upsert.enabled'='true'`, then the 
subscribing Flink SQL will read only once, and not react to new snapshots. Even 
if the SQL definition asks it to monitor intervals and the streaming strategy 
is any incremental version.
   
   Flink streaming query that normally works: 
   ```sql
   INSERT INTO iceberg.target_packaging
   SELECT
     usr.`user_id` AS `user_id`,
     usr.`adress` AS `address`,
     ord.`item_id` AS `item_id`,
     ....
   FROM
     iceberg.source_users /*+  OPTIONS('streaming'='true', 
'monitor-interval'='15s')  */ usr
   JOIN
     iceberg.source_orders /*+  OPTIONS('streaming'='true', 
'monitor-interval'='15s')  */ ord ON usr.`user_id` = ord.`user_id`;
   
   ``` 
   
   The streaming join works great if the source Iceberg tables are defined like 
this:
   ```
   CREATE TABLE iceberg.source_users (
     `user_id` STRING,
     `adress` STRING,
     ....
     PRIMARY KEY (`user_id`) NOT ENFORCED
   ) with ('format-version'='2');
   
   Resulting table properties example: 
   
[current-snapshot-id=7980858807056176990,format=iceberg/parquet,format-version=2,identifier-fields=[user_id],write.parquet.compression-codec=zstd]
   ```
   
   But the streaming join runs only once, and then stops triggering on new 
snapshots. It does not finish though, just stops reacting from source and 
produces no new records.
   ```
   CREATE TABLE iceberg.source_users (
     `user_id` STRING,
     `adress` STRING,
     ....
     PRIMARY KEY (`user_id`) NOT ENFORCED
   ) with ('format-version'='2', 'write.upsert.enabled'='true');
   
   Resulting table properties example: 
   
[current-snapshot-id=3566387524956156231,format=iceberg/parquet,format-version=2,identifier-fields=[user_id],write.parquet.compression-codec=zstd,write.upsert.enabled=true]
   ```
   
   In my Flink job i simply define the connector and run the SQL join/insert. 
Both source and target table is already defined.
   
   I also noticed that If I have an SQL Join, it too stops streaming if at 
least one table has upsert enabled.
   
   Looking at the documentation for both Iceberg and Flink I don't find any 
indication that enabling upsert should alter the behaviour - but I do remember 
reading somewhere that FLIP27 only supports append and not update / delete. Is 
this the reason I'm seeing this behaviour?  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to