JuiceFV opened a new issue, #9097:
URL: https://github.com/apache/iceberg/issues/9097

   ### Query engine
   
   Flink 1.16
   
   ### Question
   
   I'm trying sink data from PostgreSQL table to iceberg using Flink.
   
   I'm currently using Flink 1.16.2 (PyFlink) as it mentioned 
[here](https://iceberg.apache.org/docs/latest/flink/#preparation-when-using-flink-sql-client).
   
   <details><summary>Here a `pom.xml` </summary>
   <p>
   
   ```
   <project
       xmlns="http://maven.apache.org/POM/4.0.0";
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";
       >
       <modelVersion>4.0.0</modelVersion>
       <groupId>com.diginetica.etl</groupId>
       <version>1.0-SNAPSHOT</version>
       <artifactId>streaming</artifactId>
       <packaging>jar</packaging>
   
       <properties>
           <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
           
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
           <!-- Dependencies versions -->
           <postgresql.version>42.6.0</postgresql.version>
           <awssdk.version>2.21.23</awssdk.version>
           <guava.version>32.1.3-jre</guava.version>
           <hadoop.version>3.3.1</hadoop.version>
           <flink.version>1.16.2</flink.version>
           <flink-cdc-connector.version>2.4.1</flink-cdc-connector.version>
           <iceberg.version>1.4.1</iceberg.version>
           <!-- Dependencies scopes -->
           <guava.deps.scope>provided</guava.deps.scope>
           <!-- Binaries versions -->
           <flink.binary.version>1.16</flink.binary.version>
           <scala.binary.version>2.12</scala.binary.version>
       </properties>
   
       <dependencies>
           <!-- Flink Dependencies -->
           <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-java</artifactId>
               <version>${flink.version}</version>
           </dependency>
           <dependency>
               <groupId>org.apache.flink</groupId>
               <artifactId>flink-streaming-java</artifactId>
               <version>${flink.version}</version>
           </dependency>
           <dependency>
               <groupId>org.apache.flink</groupId>
               
<artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
               <version>${flink.version}</version>
           </dependency>
   
           <!-- Flink CDC Connector for PostgreSQL -->
           <dependency>
               <groupId>com.ververica</groupId>
               <artifactId>flink-connector-postgres-cdc</artifactId>
               <version>${flink-cdc-connector.version}</version>
           </dependency>
   
           <!-- Apache Iceberg Dependency -->
           <dependency>
               <groupId>org.apache.iceberg</groupId>
               
<artifactId>iceberg-flink-runtime-${flink.binary.version}</artifactId>
               <version>${iceberg.version}</version>
           </dependency>
   
           <dependency>
               <groupId>org.apache.hadoop</groupId>
               <artifactId>hadoop-common</artifactId>
               <version>${hadoop.version}</version>
           </dependency>
           <dependency>
               <groupId>org.apache.hadoop</groupId>
               <artifactId>hadoop-client</artifactId>
               <version>${hadoop.version}</version>
           </dependency>
   
           <!-- PostgreSQL dependecies -->
           <dependency>
               <groupId>org.postgresql</groupId>
               <artifactId>postgresql</artifactId>
               <version>${postgresql.version}</version>
           </dependency>
   
           <!-- AWS SDK dependecies: Worth to use com.amazonaws it's more 
optimized-->
           <dependency>
               <groupId>software.amazon.awssdk</groupId>
               <artifactId>bundle</artifactId>
               <version>${aawssdk.version}</version>
           </dependency>
           <dependency>
               <groupId>org.apache.hadoop</groupId>
               <artifactId>hadoop-aws</artifactId>
               <version>${hadoop.version}</version>
           </dependency>
           <!-- Guava: Use scope provided if it's provided by the runtime -->
           <dependency>
               <groupId>com.google.guava</groupId>
               <artifactId>guava</artifactId>
               <version>${guava.version}</version>
               <scope>${guava.deps.scope}</scope>
           </dependency>
       </dependencies>
   
       <build>
           <plugins>
               <!-- Maven Shade Plugin: Configured for creating an uber JAR -->
               <plugin>
                   <groupId>org.apache.maven.plugins</groupId>
                   <artifactId>maven-shade-plugin</artifactId>
                   <version>3.1.1</version>
                   <executions>
                       <execution>
                           <phase>package</phase>
                           <goals>
                               <goal>shade</goal>
                           </goals>
                           <configuration>
                               <transformers>
                                   <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"/>
                                   <transformer 
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                               </transformers>
                               <artifactSet>
                                   <excludes>
                                       
<exclude>com.google.code.findbugs:jsr305</exclude>
                                   </excludes>
                               </artifactSet>
                               <filters>
                                   <filter>
                                       <!-- Do not copy the signatures in the 
META-INF folder.
                                       Otherwise, this might cause 
SecurityExceptions when using the JAR. -->
                                       <artifact>*:*</artifact>
                                       <excludes>
                                           <exclude>module-info.class</exclude>
                                           <exclude>META-INF/*.SF</exclude>
                                           <exclude>META-INF/*.DSA</exclude>
                                           <exclude>META-INF/*.RSA</exclude>
                                       </excludes>
                                   </filter>
                               </filters>
                           </configuration>
                       </execution>
                   </executions>
               </plugin>
           </plugins>
       </build>
   </project>
   ```
   
   </p>
   </details> 
   
   
   And here a simplified version of pyflink code:
   
   ```python
   FLINK_JAR_FROM_ROOT_DIR = "etl/streaming/target/streaming-1.0-SNAPSHOT.jar"
   FLINK_JAR = join(dirname(dirname(package.__file__)), FLINK_JAR_FROM_ROOT_DIR)
   
   env_settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
   t_env = StreamTableEnvironment.create(environment_settings=env_settings)
   t_env.get_config().set("pipeline.jars", f"file://{FLINK_JAR}")
   t_env.get_config().set("pipeline.classpaths", f"file://{FLINK_JAR}")
   
   catalog_dll = """
   CREATE CATALOG my_catalog WITH (
       'type'='iceberg',
       'catalog-impl'='org.apache.iceberg.jdbc.JdbcCatalog',
       'uri'='jdbc:postgresql://host:port/db',
       'jdbc.user'='user',
       'jdbc.password'='password',
       'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO',
       'warehouse' = 's3://bucket/iceberg/warehouse',
       's3.secret.key' = '*********************************************',
       's3.access.key' = '*************',
       's3.endpoint' = '***********************'
   );
   """
   t_res = t_env.execute_sql(catalog_dll)
   t_res = t_env.execute_sql("USE CATALOG my_catalog;")
   
   sink_ddl = """
   CREATE TABLE if not exists my_table (
       id INT,
       data STRING
   );
   """
   t_res = t_env.execute_sql(sink_ddl)
   
   t_res = t_env.execute_sql("INSERT INTO my_table VALUES (1, 'a'), (2, 'b'), 
(3, 'c');")
   
   t_env.execute_sql("SELECT * FROM my_table;").print()
   ```
   
   <details><summary>The error message I get in the last line</summary>
   <p>
   
   ```
   TableException: org.apache.flink.table.api.TableException: Failed to execute 
sql
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:903)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1382)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:829)
   Caused by: org.apache.flink.util.FlinkException: Failed to execute job 
'collect'.
        at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2203)
        at 
org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95)
        at 
org.apache.flink.table.executor.python.ChainingOptimizingExecutor.executeAsync(ChainingOptimizingExecutor.java:73)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:884)
        ... 13 more
   Caused by: java.lang.RuntimeException: 
org.apache.flink.runtime.client.JobInitializationException: Could not start the 
JobMaster.
        at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
        at 
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)
        at 
java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
        at 
java.base/java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:479)
        at 
java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
        at 
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
        at 
java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
        at 
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
        at 
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
   Caused by: org.apache.flink.runtime.client.JobInitializationException: Could 
not start the JobMaster.
        at 
org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
        at 
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
        at 
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
        at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
        at 
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
   Caused by: java.util.concurrent.CompletionException: 
java.lang.RuntimeException: org.apache.flink.runtime.JobException: Creating the 
input splits caused an error: Failed to connect:  jdbc:postgresql://host:port/db
        at 
java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
        at 
java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
        at 
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
        ... 3 more
   Caused by: java.lang.RuntimeException: 
org.apache.flink.runtime.JobException: Creating the input splits caused an 
error: Failed to connect: jdbc:postgresql://host:port/db
        at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
        at 
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114)
        at 
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
        ... 3 more
   Caused by: org.apache.flink.runtime.JobException: Creating the input splits 
caused an error: Failed to connect: jdbc:postgresql://host:port/db
        at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:259)
        at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:901)
        at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:891)
        at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:848)
        at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:830)
        at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:203)
        at 
org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:156)
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:361)
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:206)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:134)
        at 
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:152)
        at 
org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:119)
        at 
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:369)
        at 
org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:346)
        at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:123)
        at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
        at 
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
        ... 4 more
   Caused by: org.apache.iceberg.jdbc.UncheckedSQLException: Failed to connect: 
jdbc:postgresql://host:port/db
        at 
org.apache.iceberg.jdbc.JdbcClientPool.newClient(JdbcClientPool.java:57)
        at 
org.apache.iceberg.jdbc.JdbcClientPool.newClient(JdbcClientPool.java:30)
        at org.apache.iceberg.ClientPoolImpl.get(ClientPoolImpl.java:125)
        at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:56)
        at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:51)
        at 
org.apache.iceberg.jdbc.JdbcCatalog.initializeCatalogTables(JdbcCatalog.java:146)
        at org.apache.iceberg.jdbc.JdbcCatalog.initialize(JdbcCatalog.java:130)
        at org.apache.iceberg.CatalogUtil.loadCatalog(CatalogUtil.java:239)
        at 
org.apache.iceberg.flink.CatalogLoader$CustomCatalogLoader.loadCatalog(CatalogLoader.java:201)
        at 
org.apache.iceberg.flink.TableLoader$CatalogTableLoader.open(TableLoader.java:122)
        at 
org.apache.iceberg.flink.source.FlinkInputFormat.createInputSplits(FlinkInputFormat.java:94)
        at 
org.apache.iceberg.flink.source.FlinkInputFormat.createInputSplits(FlinkInputFormat.java:41)
        at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:246)
        ... 20 more
   Caused by: java.sql.SQLException: No suitable driver found for 
jdbc:postgresql://host:port/db
        at java.sql/java.sql.DriverManager.getConnection(DriverManager.java:702)
        at java.sql/java.sql.DriverManager.getConnection(DriverManager.java:189)
        at 
org.apache.iceberg.jdbc.JdbcClientPool.newClient(JdbcClientPool.java:55)
   ```
   
   </p>
   </details>
   
   My attempts:
   
   1. I've double checked URI, it's correct (for example postgres-cdc 
successfully prints a table). Moreover, the job writes metadata to a postgresql 
db and s3, but no actual data:
   
   |   | catalog_name character varying | table_namespace character varying | 
table_name character varying |                                            
metadata_location character varying                                           | 
previous_metadata_location character varying |
   
|:-:|:------------------------------:|:---------------------------------:|:----------------------------:|:------------------------------------------------------------------------------------------------------------------------:|:--------------------------------------------:|
   | 1 | my_catalog                     | default                           | 
my_table                     | 
s3://bucket/iceberg/warehouse/default/my_table/metadata/00000-26120b43-09af-46ab-b9d6-9c8015884eae.metadata.json
   | null                                         |
   
   3. Postgres driver is placed at `${FLINK_HOME}/lib` even though it's a 
dependency in the shaded jar.
   4. I've tried to explicitly specify `'jdbc.driver' = 
'org.postgresql.Driver'` in the catalog options
   
   I'm pretty new in all of these stuff, so probably I just made a mistake in 
dependency versions or something else? I would appreciate any advice :)  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to