rmoff opened a new issue, #9947:
URL: https://github.com/apache/iceberg/issues/9947

   ### Query engine
   
   Flink 1.18.1
   Iceberg 1.5.0
   
   ### Question
   
   I populate an Iceberg table using `CREATE …AS SELECT FROM` as  a streaming 
query. On disk I see datafiles written. 
   However, if I select from the iceberg table through Flink SQL, **nothing is 
returned.** 
   If I then manually `INSERT` into the same table, when I query it I see the 
value that I inserted, but nothing else. 
   
   Why isn't the CTAS behaving as I would expect?
   
   ## Source table (Kafka topic)
   
   ```sql
   Flink SQL> CREATE TABLE t_k_test_topic (
   >   `msg` STRING
   > ) WITH (
   >   'connector' = 'kafka',
   >   'topic' = 'test_topic',
   >   'properties.bootstrap.servers' = 'broker:29092',
   >   'scan.startup.mode' = 'earliest-offset',
   >   'format' = 'raw'
   > );
   [INFO] Execute statement succeed.
   
   Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
   [INFO] Execute statement succeed.
   
   Flink SQL> SET 'execution.runtime-mode' = 'streaming';
   [INFO] Execute statement succeed.
   
   Flink SQL> SELECT * FROM t_k_test_topic;
   +----+--------------------------------+
   | op |                            msg |
   +----+--------------------------------+
   | +I |                         foobar |
   | +I |                   foobar again |
   ^CQuery terminated, received a total of 2 rows
   
   ```
   
   ## Create and populate Iceberg table
   
   ```sql
   Flink SQL> CREATE TABLE t_iceberg_test WITH (
   >   'connector' = 'iceberg',
   >   'catalog-type'='hive',
   >   'catalog-name'='dev',
   >   'warehouse' = 's3a://warehouse',
   >   'hive-conf-dir' = './conf')
   > AS
   >   SELECT * FROM t_k_test_topic;
   [INFO] Submitting SQL update statement to the cluster...
   [INFO] SQL update statement has been successfully submitted to the cluster:
   Job ID: 13cebde34af11b7fe032c4059a813cbf
   
   
   Flink SQL> show jobs;
   
+----------------------------------+-------------------------------------------------------------+----------+-------------------------+
   |                           job id |                                         
           job name |   status |              start time |
   
+----------------------------------+-------------------------------------------------------------+----------+-------------------------+
   | 13cebde34af11b7fe032c4059a813cbf | 
insert-into_default_catalog.default_database.t_iceberg_test |  RUNNING | 
2024-03-13T12:05:25.146 |
   
+----------------------------------+-------------------------------------------------------------+----------+-------------------------+
   1 rows in set
   
   Flink SQL> stop job '13cebde34af11b7fe032c4059a813cbf';
   [INFO] Execute statement succeed.
   ```
   
   Look on disk - there are data files, with data in: 
   
   ```
   ❯ docker exec mc bash -c \
           "mc ls -r minio/warehouse/default_database.db/"
   [2024-03-13 12:05:46 UTC]   438B STANDARD 
t_iceberg_test/data/00000-0-20461db1-c6c4-4e88-95e2-463a354786a8-00001.parquet
   [2024-03-13 12:05:24 UTC] 1.0KiB STANDARD 
t_iceberg_test/metadata/00000-63f36419-05e4-4892-b55a-ce4bc2a764b0.metadata.json
   ```
   
   ## ❌Query the Iceberg table - it's empty
   
   ```sql
   Flink SQL> SET 'execution.runtime-mode' = 'batch';
   [INFO] Execute statement succeed.
   
   Flink SQL> select * from t_iceberg_test;
   Empty set
   ```
   
   ### But - there is data in the iceberg-generated parquet file on disk
   
   ```
   ❯ docker exec mc bash -c \
           "mc cat 
minio/warehouse/default_database.db/t_iceberg_test/data/00000-0-20461db1-c6c4-4e88-95e2-463a354786a8-00001.parquet"
 > /tmp/data.parquet && \
           duckdb :memory: "SELECT * FROM read_parquet('/tmp/data.parquet')"
   -- Loading resources from /Users/rmoff/.duckdbrc
   ┌──────────────┐
   │     msg      │
   │   varchar    │
   ├──────────────┤
   │ foobar       │
   │ foobar again │
   └──────────────┘
   ```
   
   ## Manually insert a row
   
   ```sql
   Flink SQL> DESCRIBE t_iceberg_test;
   +------+--------+------+-----+--------+-----------+
   | name |   type | null | key | extras | watermark |
   +------+--------+------+-----+--------+-----------+
   |  msg | STRING | TRUE |     |        |           |
   +------+--------+------+-----+--------+-----------+
   1 row in set
   
   Flink SQL> INSERT INTO t_iceberg_test VALUES ('snafu');
   [INFO] Submitting SQL update statement to the cluster...
   [INFO] SQL update statement has been successfully submitted to the cluster:
   Job ID: c01c01e1f1eee815173895d7b52771c5
   ```
   
   ## ✅ The manually inserted row shows up as the only row in the table
   
   ```sql
   Flink SQL> select * from t_iceberg_test;
   +-------+
   |   msg |
   +-------+
   | snafu |
   +-------+
   1 row in set
   ```
   
   ## …but the data files on disk have three rows (two from the `CTAS`, one 
from the `INSERT`)
   
   ```bash
   ❯ docker exec mc bash -c \
           "mc ls -r minio/warehouse/default_database.db/t_iceberg_test/"
   [2024-03-13 12:05:46 UTC]   438B STANDARD 
data/00000-0-20461db1-c6c4-4e88-95e2-463a354786a8-00001.parquet
   [2024-03-13 12:06:15 UTC]   421B STANDARD 
data/00000-0-f7410475-58e7-4a50-9866-e3938096ea8d-00001.parquet
   [2024-03-13 12:05:24 UTC] 1.0KiB STANDARD 
metadata/00000-63f36419-05e4-4892-b55a-ce4bc2a764b0.metadata.json
   [2024-03-13 12:06:15 UTC] 2.3KiB STANDARD 
metadata/00001-f4e1c293-57b4-4883-897a-ff998a371449.metadata.json
   [2024-03-13 12:06:15 UTC] 6.5KiB STANDARD 
metadata/bb7d5d60-4e38-4cbd-bb66-b42e956e57fd-m0.avro
   [2024-03-13 12:06:15 UTC] 4.1KiB STANDARD 
metadata/snap-859291865824980759-1-bb7d5d60-4e38-4cbd-bb66-b42e956e57fd.avro
   ```
   
   ```bash
   ❯ docker exec mc bash -c \
           "mc cat 
minio/warehouse/default_database.db/t_iceberg_test/data/00000-0-f7410475-58e7-4a50-9866-e3938096ea8d-00001.parquet"
 \
           > /tmp/data.parquet && \
           duckdb :memory: "SELECT * FROM read_parquet('/tmp/data.parquet')"
   -- Loading resources from /Users/rmoff/.duckdbrc
   ┌─────────┐
   │   msg   │
   │ varchar │
   ├─────────┤
   │ snafu   │
   └─────────┘
   
   ❯ docker exec mc bash -c \
           "mc cat 
minio/warehouse/default_database.db/t_iceberg_test/data/00000-0-20461db1-c6c4-4e88-95e2-463a354786a8-00001.parquet"
 > /tmp/data.parquet && \
           duckdb :memory: "SELECT * FROM read_parquet('/tmp/data.parquet')"
   -- Loading resources from /Users/rmoff/.duckdbrc
   ┌──────────────┐
   │     msg      │
   │   varchar    │
   ├──────────────┤
   │ foobar       │
   │ foobar again │
   └──────────────┘
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to