sheveg commented on issue #9958: URL: https://github.com/apache/iceberg/issues/9958#issuecomment-1997175547
We deploy the flink cluster via Kubernetes via the operator. This is the resource file for it: ```yaml --- # Source: flink-demo/templates/flink-session-cluster.yaml apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: flink-session-cluster spec: image: ghcr.io/steadforce/applications/flink-steadforce:sha-796959d flinkVersion: v1_17 flinkConfiguration: taskmanager.numberOfTaskSlots: "2" serviceAccount: flink podTemplate: spec: containers: - name: flink-main-container env: - name: AWS_ACCESS_KEY_ID valueFrom: secretKeyRef: key: accesskey name: minio-cred-secret-mirror - name: AWS_REGION value: us-east-1 - name: AWS_SECRET_ACCESS_KEY valueFrom: secretKeyRef: key: secretkey name: minio-cred-secret-mirror - name: KAFKA_BOOTSTRAP_SERVERS value: kafka-kafka-bootstrap.kafka.svc.cluster.local:9092 - name: KAFKA_CONSUMER_GROUP value: demo-consumer-group - name: KAFKA_SCHEMA_REGISTRY_URL value: http://kafka-schema-registry.kafka.svc.cluster.local:8081 - name: KAFKA_TOPIC value: steadforce.tools - name: MINIO_HOST value: http://minio.svc.cluster.local:80 - name: NESSIE_HOST value: http://nessie.nessie.svc.cluster.local:19120/api/v1 - name: WAREHOUSE value: s3://steadops-playground-bucket/steadops-playground-bucket jobManager: resource: cpu: 1 memory: 2048m taskManager: resource: cpu: 1 memory: 2048m ``` The `flink-conf.yaml`: ``` blob.server.port: 6124 kubernetes.jobmanager.annotations: flinkdeployment.flink.apache.org/generation:2 taskmanager.memory.process.size: 2 gb kubernetes.internal.jobmanager.entrypoint.class: org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint kubernetes.jobmanager.replicas: 1 jobmanager.rpc.address: flink-session-cluster.flink-demo kubernetes.pod-template-file.taskmanager: /tmp/flink_op_generated_podTemplate_679649916655451497.yaml web.cancel.enable: false execution.target: kubernetes-session jobmanager.memory.process.size: 2 gb kubernetes.service-account: flink kubernetes.cluster-id: flink-session-cluster taskmanager.rpc.port: 6122 kubernetes.jobmanager.cpu.amount: 1.0 kubernetes.taskmanager.cpu.amount: 1.0 internal.cluster.execution-mode: NORMAL parallelism.default: 1 kubernetes.namespace: flink-demo taskmanager.numberOfTaskSlots: 2 kubernetes.rest-service.exposed.type: ClusterIP kubernetes.jobmanager.owner.reference: apiVersion:flink.apache.org/v1beta1,kind:FlinkDeployment,uid:9dc2c0e8-3de5-4627-bf6b-83f3f056f78b,name:flink-session-cluster,controller:false,blockOwnerDeletion:true $internal.flink.version: v1_17 kubernetes.pod-template-file.jobmanager: /tmp/flink_op_generated_podTemplate_14004324014081248936.yaml kubernetes.container.image.ref: ghcr.io/steadforce/applications/flink-steadforce:sha-796959d ``` We execute the job using the Java API: ```java public static void main(String[] args) throws Exception { ParameterTool parameter = ParameterTool.fromArgs(args); String kafkaBootstrapServers = System.getenv("KAFKA_BOOTSTRAP_SERVERS"); String kafkaTopic = System.getenv("KAFKA_TOPIC"); String kafkaSchemaRegistryUrl = System.getenv("KAFKA_SCHEMA_REGISTRY_URL"); String kafkaConsumerGroup = System.getenv("KAFKA_CONSUMER_GROUP"); String nessieHost = System.getenv("NESSIE_HOST"); String warehouse = System.getenv("WAREHOUSE"); String minioHost = System.getenv("MINIO_HOST"); // set up the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); // set up the table environment final StreamTableEnvironment tableEnv = StreamTableEnvironment.create( env, EnvironmentSettings.newInstance().inStreamingMode().build()); // Create KafkaSource builder KafkaSource<String> kafkaSource = KafkaSource.<String>builder() .setBootstrapServers(kafkaBootstrapServers) .setGroupId(kafkaConsumerGroup) .setTopics(kafkaTopic) .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); // create the Nessie catalog tableEnv.executeSql( String.format( "CREATE CATALOG iceberg WITH (" + "'type'='iceberg'," + "'catalog-impl'='org.apache.iceberg.nessie.NessieCatalog'," + "'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'," + "'uri'='%s'," + "'authentication.type'='none'," + "'ref'='main'," + "'client.assume-role.region'='us-east-1'," + "'warehouse'='%s'," + "'s3.endpoint'='%s'" + ")", nessieHost, warehouse, minioHost)); // List all catalogs TableResult result = tableEnv.executeSql("SHOW CATALOGS"); // Print the result to standard out result.print(); // Set the current catalog to the new catalog tableEnv.useCatalog("iceberg"); // Create a database in the current catalog tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS db"); // Add Kafka source as a data source DataStream<String> kafkaStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source"); // // Filter manipulated and complete rows Random random = new Random(); DataStream<String> manipulatedRowsStream = kafkaStream.filter(row -> isManipulatedRow(row, random)); DataStream<String> completeRowsStream = kafkaStream.filter(row -> !isManipulatedRow(row, random)); // apply a map transformation to convert the Tuple2 to an JobData object DataStream<JobData> mappedManipulatedStream = manipulatedRowsStream.map(new MapFunction<String, JobData>() { @Override public JobData map(String value) throws Exception { // perform your mapping logic here and return a JobData instance // for example: return new JobData(random.nextLong(), value); } }); DataStream<JobData> mappedCompleteStream = completeRowsStream.map(new MapFunction<String, JobData>() { @Override public JobData map(String value) throws Exception { // perform your mapping logic here and return a JobData instance // for example: return new JobData(random.nextLong(), value); } }); // Convert the DataStream to a Table Table manipulated_table = tableEnv.fromDataStream(mappedManipulatedStream, $("id"), $("data")); Table complete_table = tableEnv.fromDataStream(mappedCompleteStream,$("id"), $("data")); // Register the Table as a temporary view tableEnv.createTemporaryView("my_complete_table", complete_table); tableEnv.createTemporaryView("my_manipulated_table", manipulated_table); // Create the tables tableEnv.executeSql( "CREATE TABLE IF NOT EXISTS db.complete_table (" + "id BIGINT COMMENT 'unique id'," + "data STRING" + ")"); tableEnv.executeSql( "CREATE TABLE IF NOT EXISTS db.manipulated_table (" + "id BIGINT COMMENT 'unique id'," + "data STRING" + ")"); // Write Table to Iceberg manipulated_table.executeInsert("db.manipulated_table"); complete_table.executeInsert("db.complete_table"); // Execute the job env.execute("Flink Job"); ``` Source: https://github.com/steadforce/flink-job/blob/3d40983fd6dc74d1004837b772c073d66f0bec4f/flink_job/src/main/java/com/steadforce/flink_job/App.java We get the above mentioned error from the trino cli, when deploying a pod manually and connecting to our trino server. We use trino version `440`, flink version `1.17` and apache iceberg version `1.4.3` using nessie catalog with image `0.77.1`. Which log entries do you mean exactly? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org