petartushev opened a new issue, #11445: URL: https://github.com/apache/iceberg/issues/11445
### Query engine I'm using flink as my engine. ### Question I have a java data streaming app that uses flink to process records from kafka that I want to store in iceberg tables with minio as blob storage using a REST iceberg catalog. I used part of the spark docker-compose from the iceberg docs to set up my iceberg and minio services, and my own kafka and flink services that are defined as: ``` version: "3" services: jobmanager: image: flink:1.19.1-java11 expose: - "6121" ports: - "8081:8081" command: jobmanager environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager taskmanager: image: flink:1.19.1-java11 expose: - "6122" - "6123" depends_on: - jobmanager command: taskmanager links: - "jobmanager:jobmanager" environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager zookeeper: image: wurstmeister/zookeeper:latest expose: - "2181" kafka: image: wurstmeister/kafka:2.13-2.8.1 depends_on: - zookeeper ports: - "9092:9092" expose: - "9093" environment: KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE rest: image: tabulario/iceberg-rest container_name: iceberg-rest networks: iceberg_net: ports: - 8181:8181 environment: - AWS_ACCESS_KEY_ID=admin - AWS_SECRET_ACCESS_KEY=password - AWS_REGION=us-east-1 - CATALOG_WAREHOUSE=s3://warehouse/ - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO - CATALOG_S3_ENDPOINT=http://minio:9000 minio: image: "minio/minio" container_name: "minio" environment: - MINIO_ROOT_USER=admin - MINIO_ROOT_PASSWORD=password - MINIO_DOMAIN=minio networks: iceberg_net: aliases: - warehouse.minio ports: - 9001:9001 - 9000:9000 volumes: - minio-data:/data command: ["server", "/data", "--console-address", ":9001"] mc: depends_on: - minio image: minio/mc container_name: mc networks: iceberg_net: environment: - AWS_ACCESS_KEY_ID=admin - AWS_SECRET_ACCESS_KEY=password - AWS_REGION=us-east-1 entrypoint: > /bin/sh -c " until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo 'Waiting...' && sleep 1; done; /usr/bin/mc rm -r --force minio/warehouse; /usr/bin/mc mb minio/warehouse; /usr/bin/mc policy set public minio/warehouse; tail -f /dev/null " networks: iceberg_net: volumes: minio-data: ``` My flink streaming application is defined as follows: ``` package org.example; import com.google.gson.Gson; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; import org.apache.iceberg.Table; import org.apache.iceberg.aws.s3.S3FileIO; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.flink.CatalogLoader; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.flink.sink.FlinkSink; import org.apache.hadoop.conf.Configuration; import pojo.Transaction; import timestamp_utils.TransactionWatermarkStrategy; import java.util.HashMap; import java.util.Map; public class ProcessTransactions { public static void main(String[] args) throws Exception { KafkaSource<String> kafkaSource = KafkaSource.<String>builder() .setBootstrapServers("localhost:9092") .setTopics("transaction") .setGroupId("transactions_group") .setStartingOffsets(OffsetsInitializer.latest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setAutoWatermarkInterval(2000L); env.enableCheckpointing(30000); // e.g., 60000 for every minute env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env); tableEnvironment.executeSql( "CREATE CATALOG bank_transactions_catalog WITH (" + " 'type'='iceberg'," + " 'catalog-impl'='org.apache.iceberg.rest.RESTCatalog'," + " 'uri'='http://localhost:8181'," + " 'warehouse'='s3://warehouse/'," + " 'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'," + " 's3.endpoint'='http://localhost:9000'," + " 's3.access-key-id'='admin'," + " 's3.secret-access-key'='password'," + " 's3.path-style-access'='true'" + ")" ); DataStream<String> dataStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka source"); DataStream<Transaction> transactionDataStream = dataStream.map(value -> new Gson().fromJson(value, Transaction.class)) .assignTimestampsAndWatermarks(new TransactionWatermarkStrategy()); TableSchema tableSchema = TableSchema.builder() .field("transactionId", DataTypes.STRING()) .field("sendingClientAccountNumber", DataTypes.STRING()) .field("receivingClientAccountNumber", DataTypes.STRING()) .field("amount", DataTypes.DOUBLE()) .build(); Map<String, String> catalogProperties = new HashMap<>(); catalogProperties.put("uri", "http://localhost:8181"); // REST Catalog URI catalogProperties.put("warehouse", "s3://warehouse/"); // S3 warehouse location catalogProperties.put("io-impl", S3FileIO.class.getName()); // Use S3 File IO catalogProperties.put("s3.endpoint", "http://minio:9000"); // MinIO endpoint catalogProperties.put("s3.access-key", "admin"); // MinIO access key catalogProperties.put("s3.secret-key", "password"); // MinIO secret key catalogProperties.put("s3.path-style-access", "true"); // Required for MinIO Configuration hadoopConf = new Configuration(); hadoopConf.set("fs.s3a.access.key", "admin"); hadoopConf.set("fs.s3a.secret.key", "password"); hadoopConf.set("fs.s3a.endpoint", "http://minio:9000"); hadoopConf.set("fs.s3a.path.style.access", "true"); CatalogLoader catalogLoader = CatalogLoader.custom( "bank_transactions_catalog", // Name of the catalog catalogProperties, // Catalog properties (e.g., S3 config) hadoopConf, // Hadoop configuration "org.apache.iceberg.rest.RESTCatalog" // Catalog implementation class (REST) ); tableEnvironment.executeSql( "CREATE TABLE transactions (" + "transactionId STRING NOT NULL, " + "sendingClientAccountNumber STRING NOT NULL, " + "receivingClientAccountNumber STRING NOT NULL, " + "amount FLOAT NOT NULL " + ")" + "WITH (" + " 'connector' = 'iceberg', " + " 'catalog-impl' = 'org.apache.iceberg.rest.RESTCatalog', " + " 'catalog-name' = 'bank_transactions_catalog', " + " 'database-name' = 'financial_transactions', " + " 'table-name' = 'transactions', " + " 'format' = 'parquet', " + " 'warehouse' = 's3://warehouse/', " + " 'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO', " + " 'catalog-rest.endpoint' = 'http://localhost:8181', " + " 's3.endpoint' = 'http://minio:9000', " + " 's3.access-key-id' = 'admin', " + " 's3.secret-access-key' = 'password', " + " 's3.path-style-access' = 'true' " + ")" ); TableLoader tableLoader = TableLoader.fromCatalog(catalogLoader, TableIdentifier.of("financial_transactions", "transactions")); DataStream<Row> rowDataStream = transactionDataStream.map( transaction -> Row.of(transaction.getTransactionId(), transaction.getSendingClientAccountNumber(), transaction.getReceivingClientAccountNumber(), transaction.getAmount()) ); FlinkSink.forRow(rowDataStream, tableSchema) .tableLoader(tableLoader) .overwrite(false) .append(); try{ env.execute(ProcessTransactions.class.getName()); } catch (Exception e){ e.printStackTrace(); } } } ``` If I run the application in debug mode, and I run the command: `tableEnvironment.executeSql("SELECT * FROM transactions").collect()` I get the error: ``` Unable to create a source for reading table 'bank_transactions_catalog.financial_transactions.transactions'. Table options are: 'catalog-impl'='org.apache.iceberg.rest.RESTCatalog' 'catalog-name'='bank_transactions_catalog' 'catalog-rest.endpoint'='http://localhost:8181' 'connector'='iceberg' 'database-name'='financial_transactions' 'format'='parquet' 'io-impl'='org.apache.iceberg.aws.s3.S3FileIO' 's3.access-key-id'='admin' 's3.endpoint'='http://minio:9000' 's3.path-style-access'='true' 's3.secret-access-key'='******' 'table-name'='transactions' 'warehouse'='s3://warehouse/' ``` with the cause being: `java.lang.NullPointerException: Invalid uri for http client: null`. However, if the program is run without debug mode it gives me the error: ``` Exception in thread "main" org.apache.iceberg.exceptions.NoSuchTableException: Table does not exist: financial_transactions.transactions ``` This is a newbie problem, but for some time I cannot figure how to debug this. Any help or general advice towards resolving this issue is welcome. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org