This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 994fcb19 [cdc] fix create doris table failed when load data from DB2 (#451) 994fcb19 is described below commit 994fcb19acc8c940441fe99c2ef4c17168a8d36e Author: Petrichor <31833513+vinle...@users.noreply.github.com> AuthorDate: Tue Aug 6 19:18:51 2024 +0800 [cdc] fix create doris table failed when load data from DB2 (#451) --- .../doris/flink/tools/cdc/db2/Db2Schema.java | 10 ++++++ .../flink/tools/cdc/CdcDb2SyncDatabaseCase.java | 42 +++++++++------------- 2 files changed, 27 insertions(+), 25 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Schema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Schema.java index 5aaf8cea..c36777f3 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Schema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Schema.java @@ -17,9 +17,12 @@ package org.apache.doris.flink.tools.cdc.db2; +import org.apache.doris.flink.catalog.doris.FieldSchema; import org.apache.doris.flink.tools.cdc.JdbcSourceSchema; import java.sql.DatabaseMetaData; +import java.sql.SQLException; +import java.util.LinkedHashMap; public class Db2Schema extends JdbcSourceSchema { public Db2Schema( @@ -41,4 +44,11 @@ public class Db2Schema extends JdbcSourceSchema { public String getCdcTableName() { return schemaName + "\\." + tableName; } + + @Override + public LinkedHashMap<String, FieldSchema> getColumnInfo( + DatabaseMetaData metaData, String databaseName, String schemaName, String tableName) + throws SQLException { + return super.getColumnInfo(metaData, null, schemaName, tableName); + } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java index 77b8931d..0327079a 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java @@ -17,9 +17,12 @@ package org.apache.doris.flink.tools.cdc; +import org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions; +import org.apache.flink.cdc.connectors.base.options.SourceOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.doris.flink.table.DorisConfigOptions; import org.apache.doris.flink.tools.cdc.db2.Db2DatabaseSync; import java.util.HashMap; @@ -35,42 +38,31 @@ public class CdcDb2SyncDatabaseCase { env.disableOperatorChaining(); env.enableCheckpointing(10000); - // Map<String,String> flinkMap = new HashMap<>(); - // flinkMap.put("execution.checkpointing.interval","10s"); - // flinkMap.put("pipeline.operator-chaining","false"); - // flinkMap.put("parallelism.default","1"); - - // Configuration configuration = Configuration.fromMap(flinkMap); - // env.configure(configuration); - String database = "db2_test"; String tablePrefix = ""; String tableSuffix = ""; Map<String, String> sourceConfig = new HashMap<>(); - sourceConfig.put("database-name", "testdb"); - sourceConfig.put("schema-name", "DB2INST1"); - sourceConfig.put("hostname", "127.0.0.1"); - sourceConfig.put("port", "50000"); - sourceConfig.put("username", "db2inst1"); - sourceConfig.put("password", "=doris123456"); - // sourceConfig.put("debezium.database.tablename.case.insensitive","false"); - sourceConfig.put("scan.incremental.snapshot.enabled", "true"); - // sourceConfig.put("debezium.include.schema.changes","false"); + sourceConfig.put(JdbcSourceOptions.DATABASE_NAME.key(), "testdb"); + sourceConfig.put(JdbcSourceOptions.SCHEMA_NAME.key(), "DB2INST1"); + sourceConfig.put(JdbcSourceOptions.HOSTNAME.key(), "127.0.0.1"); + sourceConfig.put(Db2DatabaseSync.PORT.key(), "50000"); + sourceConfig.put(JdbcSourceOptions.USERNAME.key(), "db2inst1"); + sourceConfig.put(JdbcSourceOptions.PASSWORD.key(), "=doris123456"); + sourceConfig.put(SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED.key(), "true"); Configuration config = Configuration.fromMap(sourceConfig); Map<String, String> sinkConfig = new HashMap<>(); - sinkConfig.put("fenodes", "127.0.0.1:8030"); - // sinkConfig.put("benodes","10.20.30.1:8040, 10.20.30.2:8040, 10.20.30.3:8040"); - sinkConfig.put("username", "root"); - sinkConfig.put("password", "123456"); - sinkConfig.put("jdbc-url", "jdbc:mysql://127.0.0.1:9030"); - sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString()); + sinkConfig.put(DorisConfigOptions.FENODES.key(), "127.0.0.1:8030"); + sinkConfig.put(DorisConfigOptions.USERNAME.key(), "root"); + sinkConfig.put(DorisConfigOptions.PASSWORD.key(), "123456"); + sinkConfig.put(DorisConfigOptions.JDBC_URL.key(), "jdbc:mysql://127.0.0.1:9030"); + sinkConfig.put(DorisConfigOptions.SINK_LABEL_PREFIX.key(), UUID.randomUUID().toString()); Configuration sinkConf = Configuration.fromMap(sinkConfig); Map<String, String> tableConfig = new HashMap<>(); - tableConfig.put("replication_num", "1"); - // tableConfig.put("table-buckets", "tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50"); + tableConfig.put(DatabaseSyncConfig.REPLICATION_NUM, "1"); + tableConfig.put(DatabaseSyncConfig.TABLE_BUCKETS, "tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50"); String includingTables = "FULL_TYPES"; String excludingTables = null; String multiToOneOrigin = null; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org