This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 10232359 [Fix] fix multi database sync npe error (#534)
10232359 is described below

commit 1023235936c4df6d49a959ccff3f5261582c8c85
Author: wudi <676366...@qq.com>
AuthorDate: Fri Jan 3 15:21:38 2025 +0800

    [Fix] fix multi database sync npe error (#534)
---
 .../apache/doris/flink/tools/cdc/DatabaseSync.java |   6 +-
 .../flink/tools/cdc/ParsingProcessFunction.java    |  28 +++-
 .../tools/cdc/mongodb/MongoDBDatabaseSync.java     |   2 +-
 .../cdc/mongodb/MongoParsingProcessFunction.java   |   4 +-
 .../doris/flink/container/ContainerUtils.java      |  19 ++-
 .../flink/container/e2e/Mysql2DorisE2ECase.java    | 144 ++++++++++++++++++---
 .../mongodb/MongoParsingProcessFunctionTest.java   |   2 +-
 .../container/e2e/mysql2doris/testAutoAddTable.txt |   2 +
 .../e2e/mysql2doris/testAutoAddTable_init.sql      |   1 +
 .../container/e2e/mysql2doris/testMySQL2Doris.txt  |   2 +
 .../e2e/mysql2doris/testMySQL2DorisByDefault.txt   |   2 +
 .../mysql2doris/testMySQL2DorisByDefault_init.sql  |   1 +
 .../e2e/mysql2doris/testMySQL2DorisCreateTable.txt |   2 +
 .../testMySQL2DorisCreateTable_init.sql            |   1 +
 .../mysql2doris/testMySQL2DorisEnableDelete.txt    |   2 +
 .../testMySQL2DorisEnableDelete_init.sql           |   1 +
 .../e2e/mysql2doris/testMySQL2DorisMultiDb2One.txt |   7 +
 .../testMySQL2DorisMultiDb2One_init.sql            |  70 ++++++++++
 .../e2e/mysql2doris/testMySQL2DorisMultiDbSync.txt |   4 +
 .../testMySQL2DorisMultiDbSync_init.sql            |  51 ++++++++
 .../e2e/mysql2doris/testMySQL2DorisSQLParse.txt    |   2 +
 .../mysql2doris/testMySQL2DorisSQLParse_init.sql   |   1 +
 .../e2e/mysql2doris/testMySQL2Doris_init.sql       |   1 +
 23 files changed, 325 insertions(+), 30 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 0c1b860a..d6c69c0b 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -170,7 +170,7 @@ public abstract class DatabaseSync {
                     streamSource.process(buildProcessFunction());
             for (Tuple2<String, String> dbTbl : dorisTables) {
                 OutputTag<String> recordOutputTag =
-                        ParsingProcessFunction.createRecordOutputTag(dbTbl.f1);
+                        ParsingProcessFunction.createRecordOutputTag(dbTbl.f0, 
dbTbl.f1);
                 DataStream<String> sideOutput = 
parsedStream.getSideOutput(recordOutputTag);
                 int sinkParallel =
                         sinkConfig.getInteger(
@@ -230,7 +230,7 @@ public abstract class DatabaseSync {
     }
 
     public ParsingProcessFunction buildProcessFunction() {
-        return new ParsingProcessFunction(converter);
+        return new ParsingProcessFunction(database, converter);
     }
 
     /** create doris sink. */
@@ -479,7 +479,7 @@ public abstract class DatabaseSync {
             }
             TableSchema dorisSchema =
                     DorisSchemaFactory.createTableSchema(
-                            database,
+                            targetDb,
                             dorisTable,
                             schema.getFields(),
                             schema.getPrimaryKeys(),
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java
index 787d0ae1..22e2b9bc 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java
@@ -21,7 +21,9 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.StringUtils;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 
@@ -32,8 +34,10 @@ public class ParsingProcessFunction extends 
ProcessFunction<String, Void> {
     protected ObjectMapper objectMapper = new ObjectMapper();
     private transient Map<String, OutputTag<String>> recordOutputTags;
     private DatabaseSync.TableNameConverter converter;
+    private String database;
 
-    public ParsingProcessFunction(DatabaseSync.TableNameConverter converter) {
+    public ParsingProcessFunction(String database, 
DatabaseSync.TableNameConverter converter) {
+        this.database = database;
         this.converter = converter;
     }
 
@@ -47,8 +51,17 @@ public class ParsingProcessFunction extends 
ProcessFunction<String, Void> {
             String record, ProcessFunction<String, Void>.Context context, 
Collector<Void> collector)
             throws Exception {
         String tableName = getRecordTableName(record);
-        String dorisName = converter.convert(tableName);
-        context.output(getRecordOutputTag(dorisName), record);
+        String dorisTableName = converter.convert(tableName);
+        String dorisDbName = database;
+        if (StringUtils.isNullOrWhitespaceOnly(database)) {
+            dorisDbName = getRecordDatabaseName(record);
+        }
+        context.output(getRecordOutputTag(dorisDbName, dorisTableName), 
record);
+    }
+
+    private String getRecordDatabaseName(String record) throws 
JsonProcessingException {
+        JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);
+        return extractJsonNode(recordRoot.get("source"), "db");
     }
 
     protected String getRecordTableName(String record) throws Exception {
@@ -60,12 +73,13 @@ public class ParsingProcessFunction extends 
ProcessFunction<String, Void> {
         return record != null && record.get(key) != null ? 
record.get(key).asText() : null;
     }
 
-    private OutputTag<String> getRecordOutputTag(String tableName) {
+    private OutputTag<String> getRecordOutputTag(String databaseName, String 
tableName) {
+        String tableIdentifier = databaseName + "." + tableName;
         return recordOutputTags.computeIfAbsent(
-                tableName, ParsingProcessFunction::createRecordOutputTag);
+                tableIdentifier, k -> createRecordOutputTag(databaseName, 
tableName));
     }
 
-    public static OutputTag<String> createRecordOutputTag(String tableName) {
-        return new OutputTag<String>("record-" + tableName) {};
+    public static OutputTag<String> createRecordOutputTag(String databaseName, 
String tableName) {
+        return new OutputTag<String>(String.format("record-%s-%s", 
databaseName, tableName)) {};
     }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
index 79c261d0..3526c075 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
@@ -198,7 +198,7 @@ public class MongoDBDatabaseSync extends DatabaseSync {
 
     @Override
     public ParsingProcessFunction buildProcessFunction() {
-        return new MongoParsingProcessFunction(converter);
+        return new MongoParsingProcessFunction(database, converter);
     }
 
     @Override
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunction.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunction.java
index 737617a0..72c61567 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunction.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunction.java
@@ -27,8 +27,8 @@ import org.slf4j.LoggerFactory;
 public class MongoParsingProcessFunction extends ParsingProcessFunction {
     private static final Logger LOG = 
LoggerFactory.getLogger(MongoParsingProcessFunction.class);
 
-    public MongoParsingProcessFunction(TableNameConverter converter) {
-        super(converter);
+    public MongoParsingProcessFunction(String databaseName, TableNameConverter 
converter) {
+        super(databaseName, converter);
     }
 
     @Override
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/ContainerUtils.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/ContainerUtils.java
index e4c99d5a..f87e498c 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/ContainerUtils.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/ContainerUtils.java
@@ -104,6 +104,16 @@ public class ContainerUtils {
             List<String> expected,
             String query,
             int columnSize) {
+        checkResult(connection, logger, expected, query, columnSize, true);
+    }
+
+    public static void checkResult(
+            Connection connection,
+            Logger logger,
+            List<String> expected,
+            String query,
+            int columnSize,
+            boolean ordered) {
         List<String> actual = new ArrayList<>();
         try (Statement statement = connection.createStatement()) {
             ResultSet sinkResultSet = statement.executeQuery(query);
@@ -131,6 +141,13 @@ public class ContainerUtils {
                 "checking test result. expected={}, actual={}",
                 String.join(",", expected),
                 String.join(",", actual));
-        Assert.assertArrayEquals(expected.toArray(), actual.toArray());
+        if (ordered) {
+            Assert.assertArrayEquals(expected.toArray(), actual.toArray());
+        } else {
+            Assert.assertEquals(expected.size(), actual.size());
+            Assert.assertArrayEquals(
+                    expected.stream().sorted().toArray(Object[]::new),
+                    actual.stream().sorted().toArray(Object[]::new));
+        }
     }
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java
index fe715f62..cb7d83ad 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java
@@ -36,6 +36,7 @@ public class Mysql2DorisE2ECase extends AbstractE2EService {
     private static final Logger LOG = 
LoggerFactory.getLogger(Mysql2DorisE2ECase.class);
     private static final String DATABASE = "test_e2e_mysql";
     private static final String CREATE_DATABASE = "CREATE DATABASE IF NOT 
EXISTS " + DATABASE;
+    private static final String DROP_DATABASE = "DROP DATABASE IF EXISTS " + 
DATABASE;
     private static final String MYSQL_CONF = "--" + 
DatabaseSyncConfig.MYSQL_CONF;
 
     @Before
@@ -56,13 +57,8 @@ public class Mysql2DorisE2ECase extends AbstractE2EService {
         argList.add(MYSQL_CONF);
         argList.add(PASSWORD + "=" + getMySQLPassword());
         argList.add(MYSQL_CONF);
-        argList.add(DATABASE_NAME + "=" + DATABASE);
-        // argList.add(MYSQL_CONF);
-        // argList.add("server-time-zone=UTC");
+        argList.add("server-time-zone=UTC");
 
-        // set doris database
-        argList.add(DORIS_DATABASE);
-        argList.add(DATABASE);
         setSinkConfDefaultConfig(argList);
         return argList;
     }
@@ -82,15 +78,7 @@ public class Mysql2DorisE2ECase extends AbstractE2EService {
 
     private void initDorisEnvironment() {
         LOG.info("Initializing Doris environment.");
-        ContainerUtils.executeSQLStatement(getDorisQueryConnection(), LOG, 
CREATE_DATABASE);
-        ContainerUtils.executeSQLStatement(
-                getDorisQueryConnection(),
-                LOG,
-                "DROP TABLE IF EXISTS test_e2e_mysql.tbl1",
-                "DROP TABLE IF EXISTS test_e2e_mysql.tbl2",
-                "DROP TABLE IF EXISTS test_e2e_mysql.tbl3",
-                "DROP TABLE IF EXISTS test_e2e_mysql.tbl4",
-                "DROP TABLE IF EXISTS test_e2e_mysql.tbl5");
+        ContainerUtils.executeSQLStatement(getDorisQueryConnection(), LOG, 
DROP_DATABASE);
     }
 
     private void initEnvironment(String jobName, String mysqlSourcePath) {
@@ -436,6 +424,132 @@ public class Mysql2DorisE2ECase extends 
AbstractE2EService {
         throw new RuntimeException("Table not exist " + table);
     }
 
+    @Test
+    public void testMySQL2DorisMultiDatabaseSync() throws Exception {
+        String jobName = "testMySQL2DorisMultiDatabaseSync";
+        ContainerUtils.executeSQLStatement(
+                getDorisQueryConnection(), LOG, "DROP DATABASE IF EXISTS 
test_e2e_mysql_db1");
+        ContainerUtils.executeSQLStatement(
+                getDorisQueryConnection(), LOG, "DROP DATABASE IF EXISTS 
test_e2e_mysql_db2");
+        initEnvironment(jobName, 
"container/e2e/mysql2doris/testMySQL2DorisMultiDbSync_init.sql");
+        startMysql2DorisJob(jobName, 
"container/e2e/mysql2doris/testMySQL2DorisMultiDbSync.txt");
+
+        // wait 3 times checkpoint
+        Thread.sleep(30000);
+        LOG.info("Start to verify init result.");
+        List<String> initExpected1 = Arrays.asList("1,db1_tb1,18");
+        String sql1 = "SELECT * FROM test_e2e_mysql_db1.tbl1";
+        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, 
initExpected1, sql1, 3, false);
+
+        List<String> initExpected2 = Arrays.asList("1,db1_tb2,19");
+        String sql2 = "SELECT * FROM test_e2e_mysql_db1.tbl2";
+        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, 
initExpected2, sql2, 3, false);
+
+        List<String> initExpected3 = Arrays.asList("1,db2_tb1,20");
+        String sql3 = "SELECT * FROM test_e2e_mysql_db2.tbl1";
+        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, 
initExpected3, sql3, 3, false);
+
+        List<String> initExpected4 = Arrays.asList("1,db2_tb2,21");
+        String sql4 = "SELECT * FROM test_e2e_mysql_db2.tbl2";
+        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, 
initExpected4, sql4, 3, false);
+
+        List<String> initExpected5 = Arrays.asList("1,db2_tb3,22");
+        String sql5 = "SELECT * FROM test_e2e_mysql_db2.tbl3";
+        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, 
initExpected5, sql5, 3, false);
+
+        // add incremental data
+        ContainerUtils.executeSQLStatement(
+                getMySQLQueryConnection(),
+                LOG,
+                "insert into test_e2e_mysql_db1.tbl1 values (2,'db1_tb1',180)",
+                "insert into test_e2e_mysql_db1.tbl2 values (2,'db1_tb2',190)",
+                "insert into test_e2e_mysql_db2.tbl1 values (2,'db2_tb1',200)",
+                "insert into test_e2e_mysql_db2.tbl2 values (2,'db2_tb2',210)",
+                "insert into test_e2e_mysql_db2.tbl3 values 
(2,'db2_tb3',220)");
+
+        Thread.sleep(20000);
+        List<String> incrExpected1 = Arrays.asList("1,db1_tb1,18", 
"2,db1_tb1,180");
+        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, 
incrExpected1, sql1, 3, false);
+
+        List<String> incrExpected2 = Arrays.asList("1,db1_tb2,19", 
"2,db1_tb2,190");
+        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, 
incrExpected2, sql2, 3, false);
+
+        List<String> incrExpected3 = Arrays.asList("1,db2_tb1,20", 
"2,db2_tb1,200");
+        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, 
incrExpected3, sql3, 3, false);
+
+        List<String> incrExpected4 = Arrays.asList("1,db2_tb2,21", 
"2,db2_tb2,210");
+        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, 
incrExpected4, sql4, 3, false);
+
+        List<String> incrExpected5 = Arrays.asList("1,db2_tb3,22", 
"2,db2_tb3,220");
+        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, 
incrExpected5, sql5, 3, false);
+
+        cancelE2EJob(jobName);
+    }
+
+    /**
+     * Separate databases and tables to write to the same database and table
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testMySQL2DorisMultiDatabase2OneSync() throws Exception {
+        String jobName = "testMySQL2DorisMultiDatabase2OneSync";
+        initEnvironment(jobName, 
"container/e2e/mysql2doris/testMySQL2DorisMultiDb2One_init.sql");
+        startMysql2DorisJob(jobName, 
"container/e2e/mysql2doris/testMySQL2DorisMultiDb2One.txt");
+
+        // wait 3 times checkpoint
+        Thread.sleep(30000);
+        LOG.info("Start to verify init result.");
+        List<String> initExpected = Arrays.asList("1,db1_tb1,18", 
"2,db2_tb1,20");
+        String sql1 = "SELECT * FROM test_e2e_mysql.tbl1";
+        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, 
initExpected, sql1, 3, false);
+
+        List<String> initExpected2 =
+                Arrays.asList(
+                        "1,db1_tb2_1,19", "2,db1_tb2_2,191", "3,db2_tb2_2,21", 
"4,db2_tbl2_2,211");
+        String sql2 = "SELECT * FROM test_e2e_mysql.tbl2_merge";
+        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, 
initExpected2, sql2, 3, false);
+
+        List<String> initExpected3 = Arrays.asList("1,db2_tb3,22");
+        String sql3 = "SELECT * FROM test_e2e_mysql.tbl3";
+        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, 
initExpected3, sql3, 3, false);
+
+        // add incremental data
+        ContainerUtils.executeSQLStatement(
+                getMySQLQueryConnection(),
+                LOG,
+                "insert into test_e2e_mysql_db1.tbl1 values (3,'db1_tb1',180)",
+                "insert into test_e2e_mysql_db2.tbl1 values (4,'db2_tb1',200)",
+                "insert into test_e2e_mysql_db1.tbl2_1 values 
(5,'db1_tb2_1',1901)",
+                "insert into test_e2e_mysql_db1.tbl2_2 values 
(6,'db1_tb2_2',1902)",
+                "insert into test_e2e_mysql_db2.tbl2_1 values 
(7,'db2_tb2_1',2101)",
+                "insert into test_e2e_mysql_db2.tbl2_2 values 
(8,'db2_tb2_2',2102)",
+                "insert into test_e2e_mysql_db2.tbl3 values 
(3,'db2_tb3',220)");
+
+        Thread.sleep(20000);
+
+        List<String> incrExpected =
+                Arrays.asList("1,db1_tb1,18", "2,db2_tb1,20", "3,db1_tb1,180", 
"4,db2_tb1,200");
+        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, 
incrExpected, sql1, 3, false);
+
+        List<String> incrExpected2 =
+                Arrays.asList(
+                        "1,db1_tb2_1,19",
+                        "2,db1_tb2_2,191",
+                        "3,db2_tb2_2,21",
+                        "4,db2_tbl2_2,211",
+                        "5,db1_tb2_1,1901",
+                        "6,db1_tb2_2,1902",
+                        "7,db2_tb2_1,2101",
+                        "8,db2_tb2_2,2102");
+        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, 
incrExpected2, sql2, 3, false);
+
+        List<String> incrExpected3 = Arrays.asList("1,db2_tb3,22", 
"3,db2_tb3,220");
+        ContainerUtils.checkResult(getDorisQueryConnection(), LOG, 
incrExpected3, sql3, 3, false);
+
+        cancelE2EJob(jobName);
+    }
+
     @After
     public void close() {
         try {
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunctionTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunctionTest.java
index e0c09b0f..3fca7b83 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunctionTest.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunctionTest.java
@@ -28,7 +28,7 @@ public class MongoParsingProcessFunctionTest {
         String record =
                 "{\"_id\":\"{\\\"_id\\\": {\\\"$oid\\\": 
\\\"66583533791a67a6f8c5a339\\\"}}\",\"operationType\":\"insert\",\"fullDocument\":\"{\\\"_id\\\":
 {\\\"$oid\\\": \\\"66583533791a67a6f8c5a339\\\"}, \\\"key1\\\": 
\\\"value1\\\"}\",\"source\":{\"ts_ms\":0,\"snapshot\":\"true\"},\"ts_ms\":1717065582062,\"ns\":{\"db\":\"test\",\"coll\":\"cdc_test\"},\"to\":null,\"documentKey\":\"{\\\"_id\\\":
 {\\\"$oid\\\": 
\\\"66583533791a67a6f8c5a339\\\"}}\",\"updateDescription\":null,\"clusterTime 
[...]
         MongoParsingProcessFunction mongoParsingProcessFunction =
-                new MongoParsingProcessFunction(null);
+                new MongoParsingProcessFunction(null, null);
         String recordTableName = 
mongoParsingProcessFunction.getRecordTableName(record);
         assertEquals("cdc_test", recordTableName);
     }
diff --git 
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testAutoAddTable.txt
 
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testAutoAddTable.txt
index 88ec4541..0b9a6247 100644
--- 
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testAutoAddTable.txt
+++ 
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testAutoAddTable.txt
@@ -1,4 +1,6 @@
 mysql-sync-database
+    --database test_e2e_mysql
+    --mysql-conf database-name=test_e2e_mysql
     --including-tables "tbl.*|auto_add"
     --table-conf replication_num=1
     --single-sink true
diff --git 
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testAutoAddTable_init.sql
 
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testAutoAddTable_init.sql
index ec617f30..f1042491 100644
--- 
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testAutoAddTable_init.sql
+++ 
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testAutoAddTable_init.sql
@@ -1,3 +1,4 @@
+DROP DATABASE if EXISTS test_e2e_mysql;
 CREATE DATABASE if NOT EXISTS test_e2e_mysql;
 DROP TABLE IF EXISTS test_e2e_mysql.tbl1;
 CREATE TABLE test_e2e_mysql.tbl1 (
diff --git 
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris.txt
 
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris.txt
index 601d0831..d88b2088 100644
--- 
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris.txt
+++ 
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris.txt
@@ -1,4 +1,6 @@
 mysql-sync-database
+    --database test_e2e_mysql
+    --mysql-conf database-name=test_e2e_mysql
     --including-tables "tbl.*"
     --table-conf replication_num=1
     --single-sink true
diff --git 
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisByDefault.txt
 
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisByDefault.txt
index 6f69a75b..b8b2974e 100644
--- 
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisByDefault.txt
+++ 
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisByDefault.txt
@@ -1,3 +1,5 @@
 mysql-sync-database
+    --database test_e2e_mysql
+    --mysql-conf database-name=test_e2e_mysql
     --including-tables "tbl1|tbl2|tbl3|tbl5"
     --table-conf replication_num=1
\ No newline at end of file
diff --git 
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisByDefault_init.sql
 
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisByDefault_init.sql
index ec617f30..f1042491 100644
--- 
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisByDefault_init.sql
+++ 
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisByDefault_init.sql
@@ -1,3 +1,4 @@
+DROP DATABASE if EXISTS test_e2e_mysql;
 CREATE DATABASE if NOT EXISTS test_e2e_mysql;
 DROP TABLE IF EXISTS test_e2e_mysql.tbl1;
 CREATE TABLE test_e2e_mysql.tbl1 (
diff --git 
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable.txt
 
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable.txt
index 053dc9ef..60511be2 100644
--- 
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable.txt
+++ 
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable.txt
@@ -1,4 +1,6 @@
 mysql-sync-database
+    --database test_e2e_mysql
+    --mysql-conf database-name=test_e2e_mysql
     --including-tables "create_tbl_.*"
     --create-table-only
     --table-conf 
table-buckets=create_tbl_uniq:10,create_tbl_from_uniqindex.*:30
diff --git 
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable_init.sql
 
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable_init.sql
index cc3c16a6..4b65306d 100644
--- 
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable_init.sql
+++ 
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisCreateTable_init.sql
@@ -1,3 +1,4 @@
+DROP DATABASE if EXISTS test_e2e_mysql;
 CREATE DATABASE if NOT EXISTS test_e2e_mysql;
 DROP TABLE IF EXISTS test_e2e_mysql.create_tbl_uniq;
 CREATE TABLE test_e2e_mysql.create_tbl_uniq (
diff --git 
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisEnableDelete.txt
 
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisEnableDelete.txt
index 1048916c..4f71f00c 100644
--- 
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisEnableDelete.txt
+++ 
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisEnableDelete.txt
@@ -1,4 +1,6 @@
 mysql-sync-database
+    --database test_e2e_mysql
+    --mysql-conf database-name=test_e2e_mysql
     --including-tables "tbl1|tbl2|tbl3|tbl5"
     --table-conf replication_num=1
     --sink-conf sink.enable-delete=false
diff --git 
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisEnableDelete_init.sql
 
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisEnableDelete_init.sql
index ec617f30..f1042491 100644
--- 
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisEnableDelete_init.sql
+++ 
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisEnableDelete_init.sql
@@ -1,3 +1,4 @@
+DROP DATABASE if EXISTS test_e2e_mysql;
 CREATE DATABASE if NOT EXISTS test_e2e_mysql;
 DROP TABLE IF EXISTS test_e2e_mysql.tbl1;
 CREATE TABLE test_e2e_mysql.tbl1 (
diff --git 
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisMultiDb2One.txt
 
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisMultiDb2One.txt
new file mode 100644
index 00000000..42e671e5
--- /dev/null
+++ 
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisMultiDb2One.txt
@@ -0,0 +1,7 @@
+mysql-sync-database
+    --database test_e2e_mysql
+    --mysql-conf database-name=test_e2e_mysql_db.*
+    --including-tables ".*"
+    --multi-to-one-origin "tbl2.*"
+    --multi-to-one-target "tbl2_merge"
+    --table-conf replication_num=1
\ No newline at end of file
diff --git 
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisMultiDb2One_init.sql
 
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisMultiDb2One_init.sql
new file mode 100644
index 00000000..81d659d1
--- /dev/null
+++ 
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisMultiDb2One_init.sql
@@ -0,0 +1,70 @@
+-- tbl1
+DROP DATABASE if EXISTS test_e2e_mysql_db1;
+CREATE DATABASE if NOT EXISTS test_e2e_mysql_db1;
+DROP TABLE IF EXISTS test_e2e_mysql_db1.tbl1;
+CREATE TABLE test_e2e_mysql_db1.tbl1 (
+`id` int NOT NULL,
+`name` varchar(255) DEFAULT NULL,
+`age` bigint DEFAULT NULL,
+PRIMARY KEY (`id`) USING BTREE
+);
+insert into test_e2e_mysql_db1.tbl1 values (1,'db1_tb1',18);
+
+DROP DATABASE if EXISTS test_e2e_mysql_db2;
+CREATE DATABASE if NOT EXISTS test_e2e_mysql_db2;
+DROP TABLE IF EXISTS test_e2e_mysql_db2.tbl1;
+CREATE TABLE test_e2e_mysql_db2.tbl1 (
+`id` int NOT NULL,
+`name` varchar(255) DEFAULT NULL,
+`age` bigint DEFAULT NULL,
+PRIMARY KEY (`id`) USING BTREE
+);
+insert into test_e2e_mysql_db2.tbl1 values (2,'db2_tb1',20);
+
+-- tbl2_1 tbl2_2
+DROP TABLE IF EXISTS test_e2e_mysql_db1.tbl2_1;
+CREATE TABLE test_e2e_mysql_db1.tbl2_1 (
+`id` int NOT NULL,
+`name` varchar(255) DEFAULT NULL,
+`age` bigint DEFAULT NULL,
+PRIMARY KEY (`id`) USING BTREE
+);
+insert into test_e2e_mysql_db1.tbl2_1 values (1,'db1_tb2_1',19);
+
+DROP TABLE IF EXISTS test_e2e_mysql_db1.tbl2_2;
+CREATE TABLE test_e2e_mysql_db1.tbl2_2 (
+`id` int NOT NULL,
+`name` varchar(255) DEFAULT NULL,
+`age` bigint DEFAULT NULL,
+PRIMARY KEY (`id`) USING BTREE
+);
+insert into test_e2e_mysql_db1.tbl2_2 values (2,'db1_tb2_2',191);
+
+-- db2
+DROP TABLE IF EXISTS test_e2e_mysql_db2.tbl2_1;
+CREATE TABLE test_e2e_mysql_db2.tbl2_1 (
+`id` int NOT NULL,
+`name` varchar(255) DEFAULT NULL,
+`age` bigint DEFAULT NULL,
+PRIMARY KEY (`id`) USING BTREE
+);
+insert into test_e2e_mysql_db2.tbl2_1 values (3,'db2_tb2_2',21);
+
+DROP TABLE IF EXISTS test_e2e_mysql_db2.tbl2_2;
+CREATE TABLE test_e2e_mysql_db2.tbl2_2 (
+`id` int NOT NULL,
+`name` varchar(255) DEFAULT NULL,
+`age` bigint DEFAULT NULL,
+PRIMARY KEY (`id`) USING BTREE
+);
+insert into test_e2e_mysql_db2.tbl2_2 values (4,'db2_tbl2_2',211);
+
+
+DROP TABLE IF EXISTS test_e2e_mysql_db2.tbl3;
+CREATE TABLE test_e2e_mysql_db2.tbl3 (
+`id` int NOT NULL,
+`name` varchar(255) DEFAULT NULL,
+`age` bigint DEFAULT NULL,
+PRIMARY KEY (`id`) USING BTREE
+);
+insert into test_e2e_mysql_db2.tbl3 values (1,'db2_tb3',22);
\ No newline at end of file
diff --git 
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisMultiDbSync.txt
 
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisMultiDbSync.txt
new file mode 100644
index 00000000..c12902e3
--- /dev/null
+++ 
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisMultiDbSync.txt
@@ -0,0 +1,4 @@
+mysql-sync-database
+    --mysql-conf database-name=test_e2e_mysql_db.*
+    --including-tables ".*"
+    --table-conf replication_num=1
\ No newline at end of file
diff --git 
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisMultiDbSync_init.sql
 
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisMultiDbSync_init.sql
new file mode 100644
index 00000000..60b022c5
--- /dev/null
+++ 
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisMultiDbSync_init.sql
@@ -0,0 +1,51 @@
+-- db1
+DROP DATABASE if EXISTS test_e2e_mysql_db1;
+CREATE DATABASE if NOT EXISTS test_e2e_mysql_db1;
+DROP TABLE IF EXISTS test_e2e_mysql_db1.tbl1;
+CREATE TABLE test_e2e_mysql_db1.tbl1 (
+`id` int NOT NULL,
+`name` varchar(255) DEFAULT NULL,
+`age` bigint DEFAULT NULL,
+PRIMARY KEY (`id`) USING BTREE
+);
+insert into test_e2e_mysql_db1.tbl1 values (1,'db1_tb1',18);
+
+
+DROP TABLE IF EXISTS test_e2e_mysql_db1.tbl2;
+CREATE TABLE test_e2e_mysql_db1.tbl2 (
+`id` int NOT NULL,
+`name` varchar(255) DEFAULT NULL,
+`age` bigint DEFAULT NULL,
+PRIMARY KEY (`id`) USING BTREE
+);
+insert into test_e2e_mysql_db1.tbl2 values (1,'db1_tb2',19);
+
+-- db2
+DROP DATABASE if EXISTS test_e2e_mysql_db2;
+CREATE DATABASE if NOT EXISTS test_e2e_mysql_db2;
+DROP TABLE IF EXISTS test_e2e_mysql_db2.tbl1;
+CREATE TABLE test_e2e_mysql_db2.tbl1 (
+`id` int NOT NULL,
+`name` varchar(255) DEFAULT NULL,
+`age` bigint DEFAULT NULL,
+PRIMARY KEY (`id`) USING BTREE
+);
+insert into test_e2e_mysql_db2.tbl1 values (1,'db2_tb1',20);
+
+DROP TABLE IF EXISTS test_e2e_mysql_db2.tbl2;
+CREATE TABLE test_e2e_mysql_db2.tbl2 (
+`id` int NOT NULL,
+`name` varchar(255) DEFAULT NULL,
+`age` bigint DEFAULT NULL,
+PRIMARY KEY (`id`) USING BTREE
+);
+insert into test_e2e_mysql_db2.tbl2 values (1,'db2_tb2',21);
+
+DROP TABLE IF EXISTS test_e2e_mysql_db2.tbl3;
+CREATE TABLE test_e2e_mysql_db2.tbl3 (
+`id` int NOT NULL,
+`name` varchar(255) DEFAULT NULL,
+`age` bigint DEFAULT NULL,
+PRIMARY KEY (`id`) USING BTREE
+);
+insert into test_e2e_mysql_db2.tbl3 values (1,'db2_tb3',22);
\ No newline at end of file
diff --git 
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisSQLParse.txt
 
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisSQLParse.txt
index d863ecfa..6876afae 100644
--- 
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisSQLParse.txt
+++ 
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisSQLParse.txt
@@ -1,4 +1,6 @@
 mysql-sync-database
+    --database test_e2e_mysql
+    --mysql-conf database-name=test_e2e_mysql
     --including-tables "tbl.*|add_tbl"
     --table-conf replication_num=1
     --schema-change-mode sql_parser
diff --git 
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisSQLParse_init.sql
 
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisSQLParse_init.sql
index ec617f30..f1042491 100644
--- 
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisSQLParse_init.sql
+++ 
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2DorisSQLParse_init.sql
@@ -1,3 +1,4 @@
+DROP DATABASE if EXISTS test_e2e_mysql;
 CREATE DATABASE if NOT EXISTS test_e2e_mysql;
 DROP TABLE IF EXISTS test_e2e_mysql.tbl1;
 CREATE TABLE test_e2e_mysql.tbl1 (
diff --git 
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris_init.sql
 
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris_init.sql
index ec617f30..f1042491 100644
--- 
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris_init.sql
+++ 
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris_init.sql
@@ -1,3 +1,4 @@
+DROP DATABASE if EXISTS test_e2e_mysql;
 CREATE DATABASE if NOT EXISTS test_e2e_mysql;
 DROP TABLE IF EXISTS test_e2e_mysql.tbl1;
 CREATE TABLE test_e2e_mysql.tbl1 (


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to