This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e417520 [Improve] add test case for cdc sync (#580)
4e417520 is described below

commit 4e417520885e3d507094bd2c84b61fa626ad6596
Author: wudi <676366...@qq.com>
AuthorDate: Wed Mar 19 10:02:28 2025 +0800

    [Improve] add test case for cdc sync (#580)
---
 .../jsondebezium/JsonDebeziumSchemaChange.java     |  5 ++-
 .../JsonDebeziumSchemaChangeImplV2.java            |  4 +-
 .../jsondebezium/SQLParserSchemaChange.java        |  1 +
 .../flink/container/e2e/Mysql2DorisE2ECase.java    | 45 ++++++++++++++++++----
 .../doris/flink/lookup/DorisLookupTableITCase.java |  6 ++-
 .../container/e2e/mysql2doris/testMySQL2Doris.txt  |  6 ++-
 .../e2e/mysql2doris/testMySQL2Doris_init.sql       |  2 +-
 7 files changed, 56 insertions(+), 13 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
index 16757d27..ff47880d 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
@@ -46,12 +46,13 @@ import java.util.regex.Pattern;
 /**
  * Synchronize the schema change in the upstream data source to the doris 
database table.
  *
- * <p>There are two schema change modes:<br>
+ * <p>There are three schema change modes:<br>
  * 1. {@link JsonDebeziumSchemaChangeImpl} only supports table column name and 
column type changes,
  * and this mode is used by default. <br>
  * 2. {@link JsonDebeziumSchemaChangeImplV2} supports table column name, 
column type, default,
  * comment synchronization, supports multi-column changes, and supports column 
name rename. Need to
- * be enabled by configuring use-new-schema-change.
+ * be enabled by configuring use-new-schema-change. <br>
+ * 3. {@link SQLParserSchemaChange} Schema Change by parsing upstream DDL.
  */
 public abstract class JsonDebeziumSchemaChange extends CdcSchemaChange {
     private static final Logger LOG = 
LoggerFactory.getLogger(JsonDebeziumSchemaChange.class);
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
index abd6f55b..63559e52 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
@@ -55,8 +55,9 @@ import java.util.regex.Pattern;
 
 /**
  * Extract the columns that need to be changed based on the change records of 
the upstream data
- * source.
+ * source. Recommended use SQLParserSchemaChange.
  */
+@Deprecated
 public class JsonDebeziumSchemaChangeImplV2 extends JsonDebeziumSchemaChange {
     private static final Logger LOG = 
LoggerFactory.getLogger(JsonDebeziumSchemaChangeImplV2.class);
     private static final Pattern renameDDLPattern =
@@ -183,6 +184,7 @@ public class JsonDebeziumSchemaChangeImplV2 extends 
JsonDebeziumSchemaChange {
         // remove backtick
         ddl = ddl.replace("`", "");
         // rename ddl
+        // It is better to use sql_parser mode for rename
         Matcher renameDdlMatcher = renameDDLPattern.matcher(ddl);
         if (renameDdlMatcher.find()) {
             String oldColumnName = renameDdlMatcher.group(2);
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
index eda223d6..1f145da4 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.List;
 
+/** Schema changes are made by parsing upstream DDL statements. */
 public class SQLParserSchemaChange extends JsonDebeziumSchemaChange {
     private static final Logger LOG = 
LoggerFactory.getLogger(SQLParserSchemaChange.class);
     private final SQLParserSchemaManager sqlParserSchemaManager;
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java
index cb7d83ad..e1c8c807 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java
@@ -99,10 +99,22 @@ public class Mysql2DorisE2ECase extends AbstractE2EService {
 
         // wait 2 times checkpoint
         Thread.sleep(20000);
+        LOG.info("Start to verify create table result.");
+        String tblQuery =
+                String.format(
+                        "SELECT TABLE_NAME \n"
+                                + "FROM INFORMATION_SCHEMA.TABLES \n"
+                                + "WHERE TABLE_SCHEMA = '%s'",
+                        DATABASE);
+        List<String> expectedTables =
+                Arrays.asList("ods_tbl1_incr", "ods_tbl2_incr", 
"ods_tbl3_incr", "ods_tbl5_incr");
+        ContainerUtils.checkResult(
+                getDorisQueryConnection(), LOG, expectedTables, tblQuery, 1, 
false);
+
         LOG.info("Start to verify init result.");
         List<String> expected = Arrays.asList("doris_1,1", "doris_2,2", 
"doris_3,3", "doris_5,5");
         String sql1 =
-                "select * from ( select * from test_e2e_mysql.tbl1 union all 
select * from test_e2e_mysql.tbl2 union all select * from test_e2e_mysql.tbl3 
union all select * from test_e2e_mysql.tbl5) res order by 1";
+                "select * from ( select * from test_e2e_mysql.ods_tbl1_incr 
union all select * from test_e2e_mysql.ods_tbl2_incr union all select * from 
test_e2e_mysql.ods_tbl3_incr union all select * from 
test_e2e_mysql.ods_tbl5_incr) res order by 1";
         ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, 
sql1, 2);
 
         // add incremental data
@@ -121,28 +133,47 @@ public class Mysql2DorisE2ECase extends 
AbstractE2EService {
                 Arrays.asList(
                         "doris_1,18", "doris_1_1,10", "doris_2_1,11", 
"doris_3,3", "doris_3_1,12");
         String sql2 =
-                "select * from ( select * from test_e2e_mysql.tbl1 union all 
select * from test_e2e_mysql.tbl2 union all select * from test_e2e_mysql.tbl3 ) 
res order by 1";
+                "select * from ( select * from test_e2e_mysql.ods_tbl1_incr 
union all select * from test_e2e_mysql.ods_tbl2_incr union all select * from 
test_e2e_mysql.ods_tbl3_incr ) res order by 1";
         ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected2, 
sql2, 2);
 
-        // mock schema change
+        // mock schema change ALTER TABLE table_name RENAME COLUMN 
old_column_name TO
+        // new_column_name
         LOG.info("start to schema change in mysql.");
         ContainerUtils.executeSQLStatement(
                 getMySQLQueryConnection(),
                 LOG,
                 "alter table test_e2e_mysql.tbl1 add column c1 varchar(128)",
-                "alter table test_e2e_mysql.tbl1 drop column age");
+                "alter table test_e2e_mysql.tbl1 drop column age",
+                "alter table test_e2e_mysql.tbl2 rename column age to 
age_new_1",
+                "alter table test_e2e_mysql.tbl3 change column age age_new_2 
int");
         Thread.sleep(10000);
         ContainerUtils.executeSQLStatement(
                 getMySQLQueryConnection(),
                 LOG,
-                "insert into test_e2e_mysql.tbl1  values 
('doris_1_1_1','c1_val')");
+                "insert into test_e2e_mysql.tbl1  values 
('doris_1_1_1','c1_val')",
+                "insert into test_e2e_mysql.tbl2  values 
('doris_tbl2_rename_test',18)",
+                "insert into test_e2e_mysql.tbl3  values 
('doris_tbl3_rename_test',38)");
         Thread.sleep(20000);
-        LOG.info("verify tal1 schema change.");
+        LOG.info("verify tbl1 schema change.");
         List<String> schemaChangeExpected =
                 Arrays.asList("doris_1,null", "doris_1_1,null", 
"doris_1_1_1,c1_val");
-        String schemaChangeSql = "select * from test_e2e_mysql.tbl1 order by 
1";
+        String schemaChangeSql = "select name,c1 from 
test_e2e_mysql.ods_tbl1_incr order by 1";
+        ContainerUtils.checkResult(
+                getDorisQueryConnection(), LOG, schemaChangeExpected, 
schemaChangeSql, 2);
+
+        LOG.info("verify tbl2 schema change.");
+        schemaChangeExpected = Arrays.asList("doris_2_1,11", 
"doris_tbl2_rename_test,18");
+        schemaChangeSql = "select name,age_new_1 from 
test_e2e_mysql.ods_tbl2_incr order by 1";
         ContainerUtils.checkResult(
                 getDorisQueryConnection(), LOG, schemaChangeExpected, 
schemaChangeSql, 2);
+
+        LOG.info("verify tbl3 schema change.");
+        schemaChangeExpected =
+                Arrays.asList("doris_3,3", "doris_3_1,12", 
"doris_tbl3_rename_test,38");
+        schemaChangeSql = "select name,age_new_2 from 
test_e2e_mysql.ods_tbl3_incr order by 1";
+        ContainerUtils.checkResult(
+                getDorisQueryConnection(), LOG, schemaChangeExpected, 
schemaChangeSql, 2);
+
         cancelE2EJob(jobName);
     }
 
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/DorisLookupTableITCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/DorisLookupTableITCase.java
index 8ecb87da..d14666a6 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/DorisLookupTableITCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/DorisLookupTableITCase.java
@@ -353,7 +353,11 @@ public class DorisLookupTableITCase extends 
AbstractITCaseService {
                                 + "'password' = '%s',"
                                 + "'lookup.jdbc.async' = '%s',"
                                 + "'lookup.cache.ttl' = '10m',"
-                                + "'lookup.cache.max-rows' = '3'"
+                                + "'lookup.cache.max-rows' = '3',"
+                                + "'lookup.max-retries' = '1',"
+                                + "'lookup.jdbc.read.batch.size' = '2',"
+                                + "'lookup.jdbc.read.batch.queue-size' = '25',"
+                                + "'lookup.jdbc.read.thread-size' = '5'"
                                 + ")",
                         getFenodes(),
                         getDorisQueryUrl(),
diff --git 
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris.txt
 
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris.txt
index 90a0eddc..6e248d19 100644
--- 
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris.txt
+++ 
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris.txt
@@ -1,8 +1,12 @@
 mysql-sync-database
     --database test_e2e_mysql
     --mysql-conf database-name=test_e2e_mysql
+    --table-prefix ods_
+    --table-suffix _incr
     --including-tables "tbl.*"
+    --excluding-tables "tbl4"
     --sink-conf sink.ignore.update-before=false
     --table-conf replication_num=1
     --single-sink true
-    --ignore-default-value false
\ No newline at end of file
+    --ignore-default-value false
+    --schema-change-mode sql_parser
\ No newline at end of file
diff --git 
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris_init.sql
 
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris_init.sql
index f1042491..8bdde210 100644
--- 
a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris_init.sql
+++ 
b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris_init.sql
@@ -29,7 +29,7 @@ CREATE TABLE test_e2e_mysql.tbl4 (
     `name` varchar(256) primary key,
     `age` int
 );
-
+insert into test_e2e_mysql.tbl4 values ('doris_4',4);
 
 DROP TABLE IF EXISTS  test_e2e_mysql.tbl5;
 CREATE TABLE test_e2e_mysql.tbl5 (


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to