dll02 opened a new issue, #10453:
URL: https://github.com/apache/iceberg/issues/10453

   ### Apache Iceberg version
   
   1.4.2
   
   ### Query engine
   
   Flink
   
   ### Please describe the bug 🐞
   
   I am using AWS manage Flink service to read data from Iceberg + Glue tables 
for real-time streaming joins. However, I have encountered an issue when 
monitoring the real-time ingestion of new data, particularly with tables that 
have unique key constraints, causing difficulties in reading the data properly.
   
   flink version: 1.18 and 1.16
   
   iceberg version: 1.4.2
   
   table :
   ```sql
   
   CREATE CATALOG pro_catalog WITH (
      'type'='iceberg',
      'warehouse'='s3://xxxx-data/iceberg-data/',
      'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',
      'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'
     );
   
   
    CREATE TABLE `pro_catalog`.`dim`.`dim_xxx_st` (
     `col_1` VARCHAR NOT NULL,
     `col_2` BIGINT NOT NULL,
     `col_3` TIMESTAMP(6),
     `col_4` DECIMAL(38, 15),
     `col_5` VARCHAR NOT NULL,
     PRIMARY KEY (`col_1`, `col_5`, `col_2`) NOT ENFORCED
   ) PARTITIONED BY (`col_5`)
   WITH (
     'write.parquet.row-group-size-bytes' = '33554432',
     'write.format.default' = 'parquet',
     'write.parquet.compression-codec' = 'zstd',
     'write.target-file-size-bytes' = '67108864',
     'write.distribution-mode' = 'hash',
     'read.split.target-size' = '33554432'
   )
   ```
   
   java code 
   ```java
       public static void main(String[] args) throws Exception {
   
           final StreamExecutionEnvironment env2 = 
StreamExecutionEnvironment.getExecutionEnvironment();
           env2.setParallelism(40);
           env2.setMaxParallelism(40);
           final StreamTableEnvironment tenv = 
StreamTableEnvironment.create(env2);
   
           tenv.getConfig().getConfiguration().setString("execution.type", 
"streaming");
       
   
           String database = "sink";
           String catalogName = "pro_catalog";
           String warehousePath = "s3://xxxxx/";
           String catalogImpl = "org.apache.iceberg.aws.glue.GlueCatalog";
           String ioImpl = "org.apache.iceberg.aws.s3.S3FileIO";
   
           // Load GlueCatalog
           Configuration hadoopConf = new Configuration(false);
           Map<String, String> catalogProperties = new HashMap<>();
           catalogProperties.put("warehouse", warehousePath);
           catalogProperties.put("catalog-impl", catalogImpl);
           catalogProperties.put("io-impl", ioImpl);
   
           CatalogLoader catalogLoader = CatalogLoader.custom(catalogName, 
catalogProperties, hadoopConf, catalogImpl);
           Catalog flinkCatalog = new FlinkCatalog(catalogName, database, 
Namespace.empty(), catalogLoader, true, -1);
   
           tenv.registerCatalog(catalogName, flinkCatalog);
           tenv.useCatalog(catalogName);
   
           String priceSql = "select * from pro_catalog.dim.dim_xxxxx   /*+ 
OPTIONS('streaming'='true', 'monitor-interval'='60s')*/" ;
           Table sourceTable = tenv.sqlQuery(priceSql);
           DataStream<Row> resultStream = tenv.toAppendStream(sourceTable, 
Row.class);
           
           resultStream.print();
   
           env2.execute("Print Flink SQL query result");
       }
   ```
   
   I can't read realtime data with incremental file.
   log like this:
   14> +I[dao, 1717576320, 2024-06-05T08:32, 0.730100000000000, 2024-06-05] 
2024-06-05 18:23:16,972 INFO  [Legacy Source Thread - Source: Iceberg table 
(pro_catalog.dim.dim_xxxxxx) monitor (1/1)#0] 
iceberg.BaseMetastoreTableOperations 
(BaseMetastoreTableOperations.java:refreshFromMetadataLocation(199)) - 
Refreshing table metadata from new version: 
s3://xxxx-warehouse/dim.db/dim_xxxx/metadata/00013-9e576a92-20df-41ca-a35b-c1041098711a.metadata.json
 2024-06-05 18:24:18,911 INFO  [Legacy Source Thread - Source: Iceberg table 
(pro_catalog.dim.dim_xxxxxx) monitor (1/1)#0] 
iceberg.BaseMetastoreTableOperations 
(BaseMetastoreTableOperations.java:refreshFromMetadataLocation(199)) - 
Refreshing table metadata from new version: 
s3://xxxx-warehouse/dim.db/dim_xxxx/metadata/00014-3df59b12-76d6-4e19-9f4c-5656d8b491ec.metadata.json
 2024-06-05 18:25:20,580 INFO  [Legacy Source Thread - Source: Iceberg table 
(pro_catalog.dim.dim_xxxxxx) monitor (1/1)#0] 
iceberg.BaseMetastoreTableOperations (BaseMetast
 oreTableOperations.java:refreshFromMetadataLocation(199)) - Refreshing table 
metadata from new version: 
s3://xxxx-warehouse/dim.db/dim_xxxx/metadata/00015-6eef0b39-60da-4c1c-80b5-06095fd084a4.metadata.json
 2024-06-05 18:26:22,788 INFO  [Legacy Source Thread - Source: Iceberg table 
(pro_catalog.dim.dim_xxxxxx) monitor (1/1)#0] 
iceberg.BaseMetastoreTableOperations 
(BaseMetastoreTableOperations.java:refreshFromMetadataLocation(199)) - 
Refreshing table metadata from new version: 
s3://xxxx-warehouse/dim.db/dim_xxxx/metadata/00016-8ea0e74d-9676-4f49-9c57-568828eaa97b.metadata.json
 2024-06-05 18:27:24,538 INFO  [Legacy Source Thread - Source: Iceberg table 
(pro_catalog.dim.dim_xxxxxx) monitor (1/1)#0] 
iceberg.BaseMetastoreTableOperations 
(BaseMetastoreTableOperations.java:refreshFromMetadataLocation(199)) - 
Refreshing table metadata from new version: 
s3://pond-warehouse/dev.db/dim_main_token_price_st/metadata/00017-a5311b9d-0eea-4631-9130-2f2aa3787023.metadata.json
   
   
   I am not sure if this issue stems from AWS or Iceberg. Could you help me 
investigate how to resolve this problem? 
   Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to