dll02 opened a new issue, #10453: URL: https://github.com/apache/iceberg/issues/10453
### Apache Iceberg version 1.4.2 ### Query engine Flink ### Please describe the bug 🐞 I am using AWS manage Flink service to read data from Iceberg + Glue tables for real-time streaming joins. However, I have encountered an issue when monitoring the real-time ingestion of new data, particularly with tables that have unique key constraints, causing difficulties in reading the data properly. flink version: 1.18 and 1.16 iceberg version: 1.4.2 table : ```sql CREATE CATALOG pro_catalog WITH ( 'type'='iceberg', 'warehouse'='s3://xxxx-data/iceberg-data/', 'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog', 'io-impl'='org.apache.iceberg.aws.s3.S3FileIO' ); CREATE TABLE `pro_catalog`.`dim`.`dim_xxx_st` ( `col_1` VARCHAR NOT NULL, `col_2` BIGINT NOT NULL, `col_3` TIMESTAMP(6), `col_4` DECIMAL(38, 15), `col_5` VARCHAR NOT NULL, PRIMARY KEY (`col_1`, `col_5`, `col_2`) NOT ENFORCED ) PARTITIONED BY (`col_5`) WITH ( 'write.parquet.row-group-size-bytes' = '33554432', 'write.format.default' = 'parquet', 'write.parquet.compression-codec' = 'zstd', 'write.target-file-size-bytes' = '67108864', 'write.distribution-mode' = 'hash', 'read.split.target-size' = '33554432' ) ``` java code ```java public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env2 = StreamExecutionEnvironment.getExecutionEnvironment(); env2.setParallelism(40); env2.setMaxParallelism(40); final StreamTableEnvironment tenv = StreamTableEnvironment.create(env2); tenv.getConfig().getConfiguration().setString("execution.type", "streaming"); String database = "sink"; String catalogName = "pro_catalog"; String warehousePath = "s3://xxxxx/"; String catalogImpl = "org.apache.iceberg.aws.glue.GlueCatalog"; String ioImpl = "org.apache.iceberg.aws.s3.S3FileIO"; // Load GlueCatalog Configuration hadoopConf = new Configuration(false); Map<String, String> catalogProperties = new HashMap<>(); catalogProperties.put("warehouse", warehousePath); catalogProperties.put("catalog-impl", catalogImpl); catalogProperties.put("io-impl", ioImpl); CatalogLoader catalogLoader = CatalogLoader.custom(catalogName, catalogProperties, hadoopConf, catalogImpl); Catalog flinkCatalog = new FlinkCatalog(catalogName, database, Namespace.empty(), catalogLoader, true, -1); tenv.registerCatalog(catalogName, flinkCatalog); tenv.useCatalog(catalogName); String priceSql = "select * from pro_catalog.dim.dim_xxxxx /*+ OPTIONS('streaming'='true', 'monitor-interval'='60s')*/" ; Table sourceTable = tenv.sqlQuery(priceSql); DataStream<Row> resultStream = tenv.toAppendStream(sourceTable, Row.class); resultStream.print(); env2.execute("Print Flink SQL query result"); } ``` I can't read realtime data with incremental file. log like this: 14> +I[dao, 1717576320, 2024-06-05T08:32, 0.730100000000000, 2024-06-05] 2024-06-05 18:23:16,972 INFO [Legacy Source Thread - Source: Iceberg table (pro_catalog.dim.dim_xxxxxx) monitor (1/1)#0] iceberg.BaseMetastoreTableOperations (BaseMetastoreTableOperations.java:refreshFromMetadataLocation(199)) - Refreshing table metadata from new version: s3://xxxx-warehouse/dim.db/dim_xxxx/metadata/00013-9e576a92-20df-41ca-a35b-c1041098711a.metadata.json 2024-06-05 18:24:18,911 INFO [Legacy Source Thread - Source: Iceberg table (pro_catalog.dim.dim_xxxxxx) monitor (1/1)#0] iceberg.BaseMetastoreTableOperations (BaseMetastoreTableOperations.java:refreshFromMetadataLocation(199)) - Refreshing table metadata from new version: s3://xxxx-warehouse/dim.db/dim_xxxx/metadata/00014-3df59b12-76d6-4e19-9f4c-5656d8b491ec.metadata.json 2024-06-05 18:25:20,580 INFO [Legacy Source Thread - Source: Iceberg table (pro_catalog.dim.dim_xxxxxx) monitor (1/1)#0] iceberg.BaseMetastoreTableOperations (BaseMetast oreTableOperations.java:refreshFromMetadataLocation(199)) - Refreshing table metadata from new version: s3://xxxx-warehouse/dim.db/dim_xxxx/metadata/00015-6eef0b39-60da-4c1c-80b5-06095fd084a4.metadata.json 2024-06-05 18:26:22,788 INFO [Legacy Source Thread - Source: Iceberg table (pro_catalog.dim.dim_xxxxxx) monitor (1/1)#0] iceberg.BaseMetastoreTableOperations (BaseMetastoreTableOperations.java:refreshFromMetadataLocation(199)) - Refreshing table metadata from new version: s3://xxxx-warehouse/dim.db/dim_xxxx/metadata/00016-8ea0e74d-9676-4f49-9c57-568828eaa97b.metadata.json 2024-06-05 18:27:24,538 INFO [Legacy Source Thread - Source: Iceberg table (pro_catalog.dim.dim_xxxxxx) monitor (1/1)#0] iceberg.BaseMetastoreTableOperations (BaseMetastoreTableOperations.java:refreshFromMetadataLocation(199)) - Refreshing table metadata from new version: s3://pond-warehouse/dev.db/dim_main_token_price_st/metadata/00017-a5311b9d-0eea-4631-9130-2f2aa3787023.metadata.json I am not sure if this issue stems from AWS or Iceberg. Could you help me investigate how to resolve this problem? Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org