petartushev opened a new issue, #11445:
URL: https://github.com/apache/iceberg/issues/11445

   ### Query engine
   
   I'm using flink as my engine.
   
   ### Question
   
   I have a java data streaming app that uses flink to process records from 
kafka that I want to store in iceberg tables with minio as blob storage using a 
REST iceberg catalog. I used part of the spark docker-compose from the iceberg 
docs to set up my iceberg and minio services, and my own kafka and flink 
services that are defined as:
   
   ```
   version: "3"
   
   services:
       jobmanager:
           image: flink:1.19.1-java11
           expose:
               - "6121"
           ports:
               - "8081:8081"
           command: jobmanager
           environment:
               - JOB_MANAGER_RPC_ADDRESS=jobmanager
   
       taskmanager:
           image: flink:1.19.1-java11
           expose:
               - "6122"
               - "6123"
           depends_on:
               - jobmanager
           command: taskmanager
           links:
               - "jobmanager:jobmanager"
           environment:
               - JOB_MANAGER_RPC_ADDRESS=jobmanager
           
       zookeeper:
           image: wurstmeister/zookeeper:latest
           expose:
               - "2181"
   
       kafka:
           image: wurstmeister/kafka:2.13-2.8.1
           depends_on:
               - zookeeper
           ports:
               - "9092:9092"
           expose:
               - "9093"
           environment:
               KAFKA_ADVERTISED_LISTENERS: 
INSIDE://kafka:9093,OUTSIDE://localhost:9092
               KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 
INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
               KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
               KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
               KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
   
       rest:
           image: tabulario/iceberg-rest
           container_name: iceberg-rest
           networks:
               iceberg_net:
           ports: 
               - 8181:8181
           environment:
               - AWS_ACCESS_KEY_ID=admin
               - AWS_SECRET_ACCESS_KEY=password
               - AWS_REGION=us-east-1
               - CATALOG_WAREHOUSE=s3://warehouse/
               - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
               - CATALOG_S3_ENDPOINT=http://minio:9000
       minio:
           image: "minio/minio"
           container_name: "minio"
           environment:
               - MINIO_ROOT_USER=admin
               - MINIO_ROOT_PASSWORD=password
               - MINIO_DOMAIN=minio
           networks:
               iceberg_net:
                   aliases:
                       - warehouse.minio
           ports:
               - 9001:9001
               - 9000:9000
           volumes:
               - minio-data:/data
           command: ["server", "/data", "--console-address", ":9001"]
   
   
       mc:
           depends_on: 
               - minio
           image: minio/mc
           container_name: mc
           networks:
               iceberg_net:
           environment:
               - AWS_ACCESS_KEY_ID=admin
               - AWS_SECRET_ACCESS_KEY=password
               - AWS_REGION=us-east-1
           entrypoint: >
               /bin/sh -c "
               until (/usr/bin/mc config host add minio http://minio:9000 admin 
password) do echo 'Waiting...' && sleep 1; done;
               /usr/bin/mc rm -r --force minio/warehouse;
               /usr/bin/mc mb minio/warehouse;
               /usr/bin/mc policy set public minio/warehouse;
               tail -f /dev/null
               "
   
   
   networks:
       iceberg_net:
   
   volumes:
       minio-data:
   ```
   
   My flink streaming application is defined as follows:
   ```
   package org.example;
   
   import com.google.gson.Gson;
   import org.apache.flink.api.common.eventtime.WatermarkStrategy;
   import org.apache.flink.api.common.serialization.SimpleStringSchema;
   import org.apache.flink.connector.kafka.source.KafkaSource;
   import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
   import org.apache.flink.streaming.api.CheckpointingMode;
   import org.apache.flink.streaming.api.datastream.DataStream;
   import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
   import org.apache.flink.table.api.DataTypes;
   import org.apache.flink.table.api.EnvironmentSettings;
   import org.apache.flink.table.api.TableSchema;
   import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
   import org.apache.flink.types.Row;
   import org.apache.iceberg.Table;
   import org.apache.iceberg.aws.s3.S3FileIO;
   import org.apache.iceberg.catalog.TableIdentifier;
   import org.apache.iceberg.flink.CatalogLoader;
   import org.apache.iceberg.flink.TableLoader;
   import org.apache.iceberg.flink.sink.FlinkSink;
   import org.apache.hadoop.conf.Configuration;
   import pojo.Transaction;
   import timestamp_utils.TransactionWatermarkStrategy;
   
   import java.util.HashMap;
   import java.util.Map;
   
   
   public class ProcessTransactions {
       public static void main(String[] args) throws Exception {
   
           KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                   .setBootstrapServers("localhost:9092")
                   .setTopics("transaction")
                   .setGroupId("transactions_group")
                   .setStartingOffsets(OffsetsInitializer.latest())
                   .setValueOnlyDeserializer(new SimpleStringSchema())
                   .build();
   
           StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
   
           env.getConfig().setAutoWatermarkInterval(2000L);
           env.enableCheckpointing(30000);  // e.g., 60000 for every minute
           
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
   
           StreamTableEnvironment tableEnvironment = 
StreamTableEnvironment.create(env);
   
           tableEnvironment.executeSql(
                   "CREATE CATALOG bank_transactions_catalog WITH (" +
                           "  'type'='iceberg'," +
                           "  
'catalog-impl'='org.apache.iceberg.rest.RESTCatalog'," +
                           "  'uri'='http://localhost:8181'," +
                           "  'warehouse'='s3://warehouse/'," +
                           "  'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'," +
                           "  's3.endpoint'='http://localhost:9000'," +
                           "  's3.access-key-id'='admin'," +
                           "  's3.secret-access-key'='password'," +
                           "  's3.path-style-access'='true'" +
                           ")"
           );
           DataStream<String> dataStream = env.fromSource(kafkaSource, 
WatermarkStrategy.noWatermarks(), "Kafka source");
   
           DataStream<Transaction> transactionDataStream = dataStream.map(value 
->
                           new Gson().fromJson(value, Transaction.class))
                   .assignTimestampsAndWatermarks(new 
TransactionWatermarkStrategy());
   
   
           TableSchema tableSchema = TableSchema.builder()
                   .field("transactionId", DataTypes.STRING())
                   .field("sendingClientAccountNumber", DataTypes.STRING())
                   .field("receivingClientAccountNumber", DataTypes.STRING())
                   .field("amount", DataTypes.DOUBLE())
                   .build();
   
           Map<String, String> catalogProperties = new HashMap<>();
           catalogProperties.put("uri", "http://localhost:8181";);  // REST 
Catalog URI
           catalogProperties.put("warehouse", "s3://warehouse/");  // S3 
warehouse location
           catalogProperties.put("io-impl", S3FileIO.class.getName());  // Use 
S3 File IO
           catalogProperties.put("s3.endpoint", "http://minio:9000";);  // MinIO 
endpoint
           catalogProperties.put("s3.access-key", "admin");  // MinIO access key
           catalogProperties.put("s3.secret-key", "password");  // MinIO secret 
key
           catalogProperties.put("s3.path-style-access", "true");  // Required 
for MinIO
   
           Configuration hadoopConf = new Configuration();
           hadoopConf.set("fs.s3a.access.key", "admin");
           hadoopConf.set("fs.s3a.secret.key", "password");
           hadoopConf.set("fs.s3a.endpoint", "http://minio:9000";);
           hadoopConf.set("fs.s3a.path.style.access", "true");
   
           CatalogLoader catalogLoader = CatalogLoader.custom(
                   "bank_transactions_catalog",  // Name of the catalog
                   catalogProperties,  // Catalog properties (e.g., S3 config)
                   hadoopConf,  // Hadoop configuration
                   "org.apache.iceberg.rest.RESTCatalog"  // Catalog 
implementation class (REST)
           );
   
           tableEnvironment.executeSql(
                   "CREATE TABLE transactions (" +
                           "transactionId STRING NOT NULL, " +
                           "sendingClientAccountNumber STRING NOT NULL, " +
                           "receivingClientAccountNumber STRING NOT NULL, " +
                           "amount FLOAT NOT NULL " +
                           ")" +
                           "WITH (" +
                           " 'connector' = 'iceberg', " +
                           " 'catalog-impl' = 
'org.apache.iceberg.rest.RESTCatalog', " +
                           " 'catalog-name' = 'bank_transactions_catalog', " +
                           " 'database-name' = 'financial_transactions', " +
                           " 'table-name' = 'transactions', " +
                           " 'format' = 'parquet', " +
                           " 'warehouse' = 's3://warehouse/', " +
                           " 'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO', 
" +
                           " 'catalog-rest.endpoint' = 'http://localhost:8181', 
" +
                           " 's3.endpoint' = 'http://minio:9000', " +
                           " 's3.access-key-id' = 'admin', " +
                           " 's3.secret-access-key' = 'password', " +
                           " 's3.path-style-access' = 'true' " +
                           ")"
           );
   
           TableLoader tableLoader = TableLoader.fromCatalog(catalogLoader, 
TableIdentifier.of("financial_transactions", "transactions"));
   
           DataStream<Row> rowDataStream = transactionDataStream.map(
                   transaction -> Row.of(transaction.getTransactionId(),
                           transaction.getSendingClientAccountNumber(),
                           transaction.getReceivingClientAccountNumber(),
                           transaction.getAmount())
           );
   
           FlinkSink.forRow(rowDataStream, tableSchema)
                   .tableLoader(tableLoader)
                   .overwrite(false)
                   .append();
   
           try{
               env.execute(ProcessTransactions.class.getName());
           }
           catch (Exception e){
               e.printStackTrace();
           }
   
       }
   }
   
   ```
   If I run the application in debug mode, and I run the command: 
`tableEnvironment.executeSql("SELECT * FROM transactions").collect()` I get the 
error:
   ```
   Unable to create a source for reading table 
'bank_transactions_catalog.financial_transactions.transactions'.
   
   Table options are:
   
   'catalog-impl'='org.apache.iceberg.rest.RESTCatalog'
   'catalog-name'='bank_transactions_catalog'
   'catalog-rest.endpoint'='http://localhost:8181'
   'connector'='iceberg'
   'database-name'='financial_transactions'
   'format'='parquet'
   'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'
   's3.access-key-id'='admin'
   's3.endpoint'='http://minio:9000'
   's3.path-style-access'='true'
   's3.secret-access-key'='******'
   'table-name'='transactions'
   'warehouse'='s3://warehouse/'
   ```
   with the cause being: `java.lang.NullPointerException: Invalid uri for http 
client: null`. 
   However, if the program is run without debug mode it gives me the error:
   ```
   Exception in thread "main" 
org.apache.iceberg.exceptions.NoSuchTableException: Table does not exist: 
financial_transactions.transactions
   ```
   
   This is a newbie problem, but for some time I cannot figure how to debug 
this. Any help or general advice towards resolving this issue is welcome.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to