LilMonk opened a new issue, #6734:
URL: https://github.com/apache/iceberg/issues/6734

   ### Query engine
   
   Flink
   
   ### Question
   
   I'm trying to create a CDC pipeline for Mysql to Iceberg (S3). For this, I 
have written an example code that seems to get the CDC data and print it to the 
console but cannot push the data into Minio. It only creates the **metadata** 
folder with one json file. 
   
   This is the code repo: https://github.com/LilMonk/flink-iceberg-cdc
   
   ```scala
   
   
   import org.apache.flink.streaming.api.CheckpointingMode
   import org.apache.flink.streaming.api.scala._
   import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
   import org.apache.flink.table.api.{EnvironmentSettings, FieldExpression}
   import org.apache.hadoop.conf.Configuration
   import org.apache.iceberg.catalog.TableIdentifier
   import org.apache.iceberg.flink.sink.FlinkSink
   import org.apache.iceberg.flink.{CatalogLoader, FlinkSchemaUtil, TableLoader}
   import org.apache.iceberg.types.Types
   import org.apache.iceberg.{DistributionMode, PartitionSpec, Schema}
   
   import java.util
   
   object DataStreamCDC {
     def main(args: Array[String]) {
       val settings = 
EnvironmentSettings.newInstance().inStreamingMode().build()
       val env = StreamExecutionEnvironment.getExecutionEnvironment
       env.enableCheckpointing(1000L, CheckpointingMode.EXACTLY_ONCE)
       //    env.setStateBackend(new HashMapStateBackend())
       //    
env.getCheckpointConfig.setCheckpointStorage("file:///tmp/flink-checkpoint")
       val tableEnv = StreamTableEnvironment.create(env, settings)
   
   
       val customersSQL =
         """
           |CREATE TABLE customers (
           |    database_name STRING METADATA VIRTUAL,
           |    table_name STRING METADATA VIRTUAL,
           |    `id` DECIMAL(20, 0) NOT NULL,
           |    first_name STRING,
           |    last_name STRING,
           |    email STRING,
           |    PRIMARY KEY (`id`) NOT ENFORCED
           |  ) WITH (
           |    'connector' = 'mysql-cdc',
           |    'hostname' = 'localhost',
           |    'port' = '3306',
           |    'username' = 'root',
           |    'password' = 'root_pass',
           |    'database-name' = 'inventory',
           |    'table-name' = 'customers'
           |  );
           |""".stripMargin
       tableEnv.executeSql(customersSQL)
   
   
       val customersSinkSQL =
         """
           |CREATE TABLE customers_sink (
           |    database_name STRING,
           |    table_name STRING,
           |    `id` DECIMAL(20, 0) NOT NULL,
           |    first_name STRING,
           |    last_name STRING,
           |    email STRING,
           |    PRIMARY KEY (`id`) NOT ENFORCED
           |  ) WITH (
           |    'connector'='iceberg',
           |    'catalog-name'='iceberg_catalog',
           |    'catalog-impl'='org.apache.iceberg.rest.RESTCatalog',
           |    'uri'='http://localhost:8181',
           |    'io-impl'='org.apache.iceberg.aws.s3.S3FileIO',
           |    's3-endpoint'='http://localhost:9000',
           |    'warehouse'='s3://warehouse/wh/'
           |  );
           |""".stripMargin
       tableEnv.executeSql(customersSinkSQL)
   
   
       val customersTable = tableEnv.from("customers").select($"*")
   
   
       // To print cdc data
       // customersTable.execute().print() // You can either print the cdc data 
or push to sink.
   
   
       val hadoopConf = new Configuration()
       val catalogProperties = new util.HashMap[String, String]()
       catalogProperties.put("uri", "http://localhost:8181";)
       catalogProperties.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
       catalogProperties.put("warehouse", "s3://warehouse/wh/")
       catalogProperties.put("s3.endpoint", "http://localhost:9000";)
       val catalogLoader = CatalogLoader.custom("demo", catalogProperties, 
hadoopConf, "org.apache.iceberg.rest.RESTCatalog")
       val schema = new Schema(
         Types.NestedField.required(1, "id", Types.DecimalType.of(20, 0)),
         Types.NestedField.optional(2, "first_name", Types.StringType.get),
         Types.NestedField.required(3, "last_name", Types.StringType.get),
         Types.NestedField.required(4, "email", Types.StringType.get),
         Types.NestedField.required(5, "database_name", Types.StringType.get),
         Types.NestedField.required(6, "table_name", Types.StringType.get)
       )
       val catalog = catalogLoader.loadCatalog
       val databaseName = "default"
       val tableName = "customers_sink"
       val outputTable = TableIdentifier.of(databaseName, tableName)
       if (!catalog.tableExists(outputTable)) catalog.createTable(outputTable, 
schema, PartitionSpec.unpartitioned)
   
   
       val customerDS = tableEnv.toChangelogStream(customersTable).javaStream
       FlinkSink.forRow(customerDS, FlinkSchemaUtil.toSchema(schema))
         .tableLoader(TableLoader.fromCatalog(catalogLoader, outputTable))
         .distributionMode(DistributionMode.HASH)
         .writeParallelism(2)
         .append()
   
   
       // execute the program
       env.execute("Flink DataStream CDC POC in Scala")
     }
   }
   ```
   
   Need some help in debugging the issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to