This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 994fcb19 [cdc] fix create doris table failed when load data from DB2 
(#451)
994fcb19 is described below

commit 994fcb19acc8c940441fe99c2ef4c17168a8d36e
Author: Petrichor <31833513+vinle...@users.noreply.github.com>
AuthorDate: Tue Aug 6 19:18:51 2024 +0800

    [cdc] fix create doris table failed when load data from DB2 (#451)
---
 .../doris/flink/tools/cdc/db2/Db2Schema.java       | 10 ++++++
 .../flink/tools/cdc/CdcDb2SyncDatabaseCase.java    | 42 +++++++++-------------
 2 files changed, 27 insertions(+), 25 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Schema.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Schema.java
index 5aaf8cea..c36777f3 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Schema.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Schema.java
@@ -17,9 +17,12 @@
 
 package org.apache.doris.flink.tools.cdc.db2;
 
+import org.apache.doris.flink.catalog.doris.FieldSchema;
 import org.apache.doris.flink.tools.cdc.JdbcSourceSchema;
 
 import java.sql.DatabaseMetaData;
+import java.sql.SQLException;
+import java.util.LinkedHashMap;
 
 public class Db2Schema extends JdbcSourceSchema {
     public Db2Schema(
@@ -41,4 +44,11 @@ public class Db2Schema extends JdbcSourceSchema {
     public String getCdcTableName() {
         return schemaName + "\\." + tableName;
     }
+
+    @Override
+    public LinkedHashMap<String, FieldSchema> getColumnInfo(
+            DatabaseMetaData metaData, String databaseName, String schemaName, 
String tableName)
+            throws SQLException {
+        return super.getColumnInfo(metaData, null, schemaName, tableName);
+    }
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java
index 77b8931d..0327079a 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java
@@ -17,9 +17,12 @@
 
 package org.apache.doris.flink.tools.cdc;
 
+import org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions;
+import org.apache.flink.cdc.connectors.base.options.SourceOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
+import org.apache.doris.flink.table.DorisConfigOptions;
 import org.apache.doris.flink.tools.cdc.db2.Db2DatabaseSync;
 
 import java.util.HashMap;
@@ -35,42 +38,31 @@ public class CdcDb2SyncDatabaseCase {
         env.disableOperatorChaining();
         env.enableCheckpointing(10000);
 
-        //  Map<String,String> flinkMap = new HashMap<>();
-        //  flinkMap.put("execution.checkpointing.interval","10s");
-        //  flinkMap.put("pipeline.operator-chaining","false");
-        //  flinkMap.put("parallelism.default","1");
-
-        //  Configuration configuration = Configuration.fromMap(flinkMap);
-        //  env.configure(configuration);
-
         String database = "db2_test";
         String tablePrefix = "";
         String tableSuffix = "";
         Map<String, String> sourceConfig = new HashMap<>();
-        sourceConfig.put("database-name", "testdb");
-        sourceConfig.put("schema-name", "DB2INST1");
-        sourceConfig.put("hostname", "127.0.0.1");
-        sourceConfig.put("port", "50000");
-        sourceConfig.put("username", "db2inst1");
-        sourceConfig.put("password", "=doris123456");
-        // 
sourceConfig.put("debezium.database.tablename.case.insensitive","false");
-        sourceConfig.put("scan.incremental.snapshot.enabled", "true");
-        // sourceConfig.put("debezium.include.schema.changes","false");
+        sourceConfig.put(JdbcSourceOptions.DATABASE_NAME.key(), "testdb");
+        sourceConfig.put(JdbcSourceOptions.SCHEMA_NAME.key(), "DB2INST1");
+        sourceConfig.put(JdbcSourceOptions.HOSTNAME.key(), "127.0.0.1");
+        sourceConfig.put(Db2DatabaseSync.PORT.key(), "50000");
+        sourceConfig.put(JdbcSourceOptions.USERNAME.key(), "db2inst1");
+        sourceConfig.put(JdbcSourceOptions.PASSWORD.key(), "=doris123456");
+        
sourceConfig.put(SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED.key(), "true");
 
         Configuration config = Configuration.fromMap(sourceConfig);
 
         Map<String, String> sinkConfig = new HashMap<>();
-        sinkConfig.put("fenodes", "127.0.0.1:8030");
-        // sinkConfig.put("benodes","10.20.30.1:8040, 10.20.30.2:8040, 
10.20.30.3:8040");
-        sinkConfig.put("username", "root");
-        sinkConfig.put("password", "123456");
-        sinkConfig.put("jdbc-url", "jdbc:mysql://127.0.0.1:9030");
-        sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString());
+        sinkConfig.put(DorisConfigOptions.FENODES.key(), "127.0.0.1:8030");
+        sinkConfig.put(DorisConfigOptions.USERNAME.key(), "root");
+        sinkConfig.put(DorisConfigOptions.PASSWORD.key(), "123456");
+        sinkConfig.put(DorisConfigOptions.JDBC_URL.key(), 
"jdbc:mysql://127.0.0.1:9030");
+        sinkConfig.put(DorisConfigOptions.SINK_LABEL_PREFIX.key(), 
UUID.randomUUID().toString());
         Configuration sinkConf = Configuration.fromMap(sinkConfig);
 
         Map<String, String> tableConfig = new HashMap<>();
-        tableConfig.put("replication_num", "1");
-        //        tableConfig.put("table-buckets", 
"tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
+        tableConfig.put(DatabaseSyncConfig.REPLICATION_NUM, "1");
+        tableConfig.put(DatabaseSyncConfig.TABLE_BUCKETS, 
"tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
         String includingTables = "FULL_TYPES";
         String excludingTables = null;
         String multiToOneOrigin = null;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to