sheveg commented on issue #9958:
URL: https://github.com/apache/iceberg/issues/9958#issuecomment-1997175547

   We deploy the flink cluster via Kubernetes via the operator. This is the 
resource file for it:
   ```yaml
   ---
   # Source: flink-demo/templates/flink-session-cluster.yaml
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: flink-session-cluster
   spec:
     image: ghcr.io/steadforce/applications/flink-steadforce:sha-796959d
     flinkVersion: v1_17
     flinkConfiguration:
       taskmanager.numberOfTaskSlots: "2"
     serviceAccount: flink
     podTemplate:
       spec:
         containers:
           - name: flink-main-container
             env:
               - name: AWS_ACCESS_KEY_ID
                 valueFrom:
                   secretKeyRef:
                     key: accesskey
                     name: minio-cred-secret-mirror
               - name: AWS_REGION
                 value: us-east-1
               - name: AWS_SECRET_ACCESS_KEY
                 valueFrom:
                   secretKeyRef:
                     key: secretkey
                     name: minio-cred-secret-mirror
               - name: KAFKA_BOOTSTRAP_SERVERS
                 value: kafka-kafka-bootstrap.kafka.svc.cluster.local:9092
               - name: KAFKA_CONSUMER_GROUP
                 value: demo-consumer-group
               - name: KAFKA_SCHEMA_REGISTRY_URL
                 value: 
http://kafka-schema-registry.kafka.svc.cluster.local:8081
               - name: KAFKA_TOPIC
                 value: steadforce.tools
               - name: MINIO_HOST
                 value: http://minio.svc.cluster.local:80
               - name: NESSIE_HOST
                 value: http://nessie.nessie.svc.cluster.local:19120/api/v1
               - name: WAREHOUSE
                 value: 
s3://steadops-playground-bucket/steadops-playground-bucket
     jobManager:
       resource:
         cpu: 1
         memory: 2048m
     taskManager:
       resource:
         cpu: 1
         memory: 2048m
   
   ```
   
   The `flink-conf.yaml`:
   
   ```
   blob.server.port: 6124
   kubernetes.jobmanager.annotations: 
flinkdeployment.flink.apache.org/generation:2
   taskmanager.memory.process.size: 2 gb
   kubernetes.internal.jobmanager.entrypoint.class: 
org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint
   kubernetes.jobmanager.replicas: 1
   jobmanager.rpc.address: flink-session-cluster.flink-demo
   kubernetes.pod-template-file.taskmanager: 
/tmp/flink_op_generated_podTemplate_679649916655451497.yaml
   web.cancel.enable: false
   execution.target: kubernetes-session
   jobmanager.memory.process.size: 2 gb
   kubernetes.service-account: flink
   kubernetes.cluster-id: flink-session-cluster
   taskmanager.rpc.port: 6122
   kubernetes.jobmanager.cpu.amount: 1.0
   kubernetes.taskmanager.cpu.amount: 1.0
   internal.cluster.execution-mode: NORMAL
   parallelism.default: 1
   kubernetes.namespace: flink-demo
   taskmanager.numberOfTaskSlots: 2
   kubernetes.rest-service.exposed.type: ClusterIP
   kubernetes.jobmanager.owner.reference: 
apiVersion:flink.apache.org/v1beta1,kind:FlinkDeployment,uid:9dc2c0e8-3de5-4627-bf6b-83f3f056f78b,name:flink-session-cluster,controller:false,blockOwnerDeletion:true
   $internal.flink.version: v1_17
   kubernetes.pod-template-file.jobmanager: 
/tmp/flink_op_generated_podTemplate_14004324014081248936.yaml
   kubernetes.container.image.ref: 
ghcr.io/steadforce/applications/flink-steadforce:sha-796959d
   ```
   
   We execute the job using the Java API:
   
   ```java
   public static void main(String[] args) throws Exception {
           ParameterTool parameter = ParameterTool.fromArgs(args);
   
           String kafkaBootstrapServers = 
System.getenv("KAFKA_BOOTSTRAP_SERVERS");
           String kafkaTopic = System.getenv("KAFKA_TOPIC");
           String kafkaSchemaRegistryUrl = 
System.getenv("KAFKA_SCHEMA_REGISTRY_URL");
           String kafkaConsumerGroup = System.getenv("KAFKA_CONSUMER_GROUP");
   
           String nessieHost = System.getenv("NESSIE_HOST");
           String warehouse = System.getenv("WAREHOUSE");
           String minioHost = System.getenv("MINIO_HOST");
   
           // set up the execution environment
           final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
           env.enableCheckpointing(5000);
           // set up the table environment
           final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(
                   env,
                   EnvironmentSettings.newInstance().inStreamingMode().build());
   
           // Create KafkaSource builder
           KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                   .setBootstrapServers(kafkaBootstrapServers)
                   .setGroupId(kafkaConsumerGroup)
                   .setTopics(kafkaTopic)
                   .setStartingOffsets(OffsetsInitializer.earliest())
                   .setValueOnlyDeserializer(new SimpleStringSchema())
                   .build();
   
           // create the Nessie catalog
           tableEnv.executeSql(
                   String.format(
                   "CREATE CATALOG iceberg WITH ("
                           + "'type'='iceberg',"
                           + 
"'catalog-impl'='org.apache.iceberg.nessie.NessieCatalog',"
                           + "'io-impl'='org.apache.iceberg.aws.s3.S3FileIO',"
                           + "'uri'='%s',"
                           + "'authentication.type'='none',"
                           + "'ref'='main',"
                           + "'client.assume-role.region'='us-east-1',"
                           + "'warehouse'='%s',"
                           + "'s3.endpoint'='%s'"
                           + ")", nessieHost, warehouse, minioHost));
   
           // List all catalogs
           TableResult result = tableEnv.executeSql("SHOW CATALOGS");
   
           // Print the result to standard out
           result.print();
   
           // Set the current catalog to the new catalog
           tableEnv.useCatalog("iceberg");
   
           // Create a database in the current catalog
           tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS db");
   
           // Add Kafka source as a data source
           DataStream<String> kafkaStream = env.fromSource(kafkaSource, 
WatermarkStrategy.noWatermarks(), "Kafka Source");
   
           // // Filter manipulated and complete rows
           Random random = new Random();
           DataStream<String> manipulatedRowsStream = kafkaStream.filter(row -> 
isManipulatedRow(row, random));
           DataStream<String> completeRowsStream = kafkaStream.filter(row -> 
!isManipulatedRow(row, random));
           // apply a map transformation to convert the Tuple2 to an JobData 
object
           DataStream<JobData> mappedManipulatedStream = 
manipulatedRowsStream.map(new MapFunction<String, JobData>() {
               @Override
               public JobData map(String value) throws Exception {
                   // perform your mapping logic here and return a JobData 
instance
                   // for example:
                   return new JobData(random.nextLong(), value);
               }
           });
   
           DataStream<JobData> mappedCompleteStream = 
completeRowsStream.map(new MapFunction<String, JobData>() {
               @Override
               public JobData map(String value) throws Exception {
                   // perform your mapping logic here and return a JobData 
instance
                   // for example:
                   return new JobData(random.nextLong(), value);
               }
           });
   
           // Convert the DataStream to a Table
           Table manipulated_table = 
tableEnv.fromDataStream(mappedManipulatedStream, $("id"), $("data"));
           Table complete_table = 
tableEnv.fromDataStream(mappedCompleteStream,$("id"), $("data"));
   
           // Register the Table as a temporary view
           tableEnv.createTemporaryView("my_complete_table", complete_table);
           tableEnv.createTemporaryView("my_manipulated_table", 
manipulated_table);
   
           // Create the tables
           tableEnv.executeSql(
                  "CREATE TABLE IF NOT EXISTS db.complete_table ("
                          + "id BIGINT COMMENT 'unique id',"
                          + "data STRING"
                          + ")");
           tableEnv.executeSql(
                  "CREATE TABLE IF NOT EXISTS db.manipulated_table ("
                         + "id BIGINT COMMENT 'unique id',"
                          + "data STRING"
                          + ")");
           // Write Table to Iceberg
           manipulated_table.executeInsert("db.manipulated_table");
           complete_table.executeInsert("db.complete_table");
   
           // Execute the job
           env.execute("Flink Job");
   ```
   
   Source: 
https://github.com/steadforce/flink-job/blob/3d40983fd6dc74d1004837b772c073d66f0bec4f/flink_job/src/main/java/com/steadforce/flink_job/App.java
   
   We get the above mentioned error from the trino cli, when deploying a pod 
manually and connecting to our trino server. We use trino version `440`, flink 
version `1.17` and apache iceberg version `1.4.3` using nessie catalog with 
image `0.77.1`. Which log entries do you mean exactly?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to