JuiceFV opened a new issue, #9097: URL: https://github.com/apache/iceberg/issues/9097
### Query engine Flink 1.16 ### Question I'm trying sink data from PostgreSQL table to iceberg using Flink. I'm currently using Flink 1.16.2 (PyFlink) as it mentioned [here](https://iceberg.apache.org/docs/latest/flink/#preparation-when-using-flink-sql-client). <details><summary>Here a `pom.xml` </summary> <p> ``` <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd" > <modelVersion>4.0.0</modelVersion> <groupId>com.diginetica.etl</groupId> <version>1.0-SNAPSHOT</version> <artifactId>streaming</artifactId> <packaging>jar</packaging> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <!-- Dependencies versions --> <postgresql.version>42.6.0</postgresql.version> <awssdk.version>2.21.23</awssdk.version> <guava.version>32.1.3-jre</guava.version> <hadoop.version>3.3.1</hadoop.version> <flink.version>1.16.2</flink.version> <flink-cdc-connector.version>2.4.1</flink-cdc-connector.version> <iceberg.version>1.4.1</iceberg.version> <!-- Dependencies scopes --> <guava.deps.scope>provided</guava.deps.scope> <!-- Binaries versions --> <flink.binary.version>1.16</flink.binary.version> <scala.binary.version>2.12</scala.binary.version> </properties> <dependencies> <!-- Flink Dependencies --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- Flink CDC Connector for PostgreSQL --> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-postgres-cdc</artifactId> <version>${flink-cdc-connector.version}</version> </dependency> <!-- Apache Iceberg Dependency --> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-flink-runtime-${flink.binary.version}</artifactId> <version>${iceberg.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <!-- PostgreSQL dependecies --> <dependency> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> <version>${postgresql.version}</version> </dependency> <!-- AWS SDK dependecies: Worth to use com.amazonaws it's more optimized--> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>bundle</artifactId> <version>${aawssdk.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-aws</artifactId> <version>${hadoop.version}</version> </dependency> <!-- Guava: Use scope provided if it's provided by the runtime --> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>${guava.version}</version> <scope>${guava.deps.scope}</scope> </dependency> </dependencies> <build> <plugins> <!-- Maven Shade Plugin: Configured for creating an uber JAR --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.1.1</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> </transformers> <artifactSet> <excludes> <exclude>com.google.code.findbugs:jsr305</exclude> </excludes> </artifactSet> <filters> <filter> <!-- Do not copy the signatures in the META-INF folder. Otherwise, this might cause SecurityExceptions when using the JAR. --> <artifact>*:*</artifact> <excludes> <exclude>module-info.class</exclude> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> </execution> </executions> </plugin> </plugins> </build> </project> ``` </p> </details> And here a simplified version of pyflink code: ```python FLINK_JAR_FROM_ROOT_DIR = "etl/streaming/target/streaming-1.0-SNAPSHOT.jar" FLINK_JAR = join(dirname(dirname(package.__file__)), FLINK_JAR_FROM_ROOT_DIR) env_settings = EnvironmentSettings.new_instance().in_streaming_mode().build() t_env = StreamTableEnvironment.create(environment_settings=env_settings) t_env.get_config().set("pipeline.jars", f"file://{FLINK_JAR}") t_env.get_config().set("pipeline.classpaths", f"file://{FLINK_JAR}") catalog_dll = """ CREATE CATALOG my_catalog WITH ( 'type'='iceberg', 'catalog-impl'='org.apache.iceberg.jdbc.JdbcCatalog', 'uri'='jdbc:postgresql://host:port/db', 'jdbc.user'='user', 'jdbc.password'='password', 'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO', 'warehouse' = 's3://bucket/iceberg/warehouse', 's3.secret.key' = '*********************************************', 's3.access.key' = '*************', 's3.endpoint' = '***********************' ); """ t_res = t_env.execute_sql(catalog_dll) t_res = t_env.execute_sql("USE CATALOG my_catalog;") sink_ddl = """ CREATE TABLE if not exists my_table ( id INT, data STRING ); """ t_res = t_env.execute_sql(sink_ddl) t_res = t_env.execute_sql("INSERT INTO my_table VALUES (1, 'a'), (2, 'b'), (3, 'c');") t_env.execute_sql("SELECT * FROM my_table;").print() ``` <details><summary>The error message I get in the last line</summary> <p> ``` TableException: org.apache.flink.table.api.TableException: Failed to execute sql at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:903) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1382) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'collect'. at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2203) at org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95) at org.apache.flink.table.executor.python.ChainingOptimizingExecutor.executeAsync(ChainingOptimizingExecutor.java:73) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:884) ... 13 more Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster. at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321) at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75) at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) at java.base/java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:479) at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster. at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.util.concurrent.CompletionException: java.lang.RuntimeException: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Failed to connect: jdbc:postgresql://host:port/db at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702) ... 3 more Caused by: java.lang.RuntimeException: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Failed to connect: jdbc:postgresql://host:port/db at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321) at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) ... 3 more Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Failed to connect: jdbc:postgresql://host:port/db at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:259) at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:901) at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:891) at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:848) at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:830) at org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:203) at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:156) at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:361) at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:206) at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:134) at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:152) at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:119) at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:369) at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:346) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:123) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95) at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) ... 4 more Caused by: org.apache.iceberg.jdbc.UncheckedSQLException: Failed to connect: jdbc:postgresql://host:port/db at org.apache.iceberg.jdbc.JdbcClientPool.newClient(JdbcClientPool.java:57) at org.apache.iceberg.jdbc.JdbcClientPool.newClient(JdbcClientPool.java:30) at org.apache.iceberg.ClientPoolImpl.get(ClientPoolImpl.java:125) at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:56) at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:51) at org.apache.iceberg.jdbc.JdbcCatalog.initializeCatalogTables(JdbcCatalog.java:146) at org.apache.iceberg.jdbc.JdbcCatalog.initialize(JdbcCatalog.java:130) at org.apache.iceberg.CatalogUtil.loadCatalog(CatalogUtil.java:239) at org.apache.iceberg.flink.CatalogLoader$CustomCatalogLoader.loadCatalog(CatalogLoader.java:201) at org.apache.iceberg.flink.TableLoader$CatalogTableLoader.open(TableLoader.java:122) at org.apache.iceberg.flink.source.FlinkInputFormat.createInputSplits(FlinkInputFormat.java:94) at org.apache.iceberg.flink.source.FlinkInputFormat.createInputSplits(FlinkInputFormat.java:41) at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:246) ... 20 more Caused by: java.sql.SQLException: No suitable driver found for jdbc:postgresql://host:port/db at java.sql/java.sql.DriverManager.getConnection(DriverManager.java:702) at java.sql/java.sql.DriverManager.getConnection(DriverManager.java:189) at org.apache.iceberg.jdbc.JdbcClientPool.newClient(JdbcClientPool.java:55) ``` </p> </details> My attempts: 1. I've double checked URI, it's correct (for example postgres-cdc successfully prints a table). Moreover, the job writes metadata to a postgresql db and s3, but no actual data: | | catalog_name character varying | table_namespace character varying | table_name character varying | metadata_location character varying | previous_metadata_location character varying | |:-:|:------------------------------:|:---------------------------------:|:----------------------------:|:------------------------------------------------------------------------------------------------------------------------:|:--------------------------------------------:| | 1 | my_catalog | default | my_table | s3://bucket/iceberg/warehouse/default/my_table/metadata/00000-26120b43-09af-46ab-b9d6-9c8015884eae.metadata.json | null | 3. Postgres driver is placed at `${FLINK_HOME}/lib` even though it's a dependency in the shaded jar. 4. I've tried to explicitly specify `'jdbc.driver' = 'org.postgresql.Driver'` in the catalog options I'm pretty new in all of these stuff, so probably I just made a mistake in dependency versions or something else? I would appreciate any advice :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org