This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 4e417520 [Improve] add test case for cdc sync (#580) 4e417520 is described below commit 4e417520885e3d507094bd2c84b61fa626ad6596 Author: wudi <676366...@qq.com> AuthorDate: Wed Mar 19 10:02:28 2025 +0800 [Improve] add test case for cdc sync (#580) --- .../jsondebezium/JsonDebeziumSchemaChange.java | 5 ++- .../JsonDebeziumSchemaChangeImplV2.java | 4 +- .../jsondebezium/SQLParserSchemaChange.java | 1 + .../flink/container/e2e/Mysql2DorisE2ECase.java | 45 ++++++++++++++++++---- .../doris/flink/lookup/DorisLookupTableITCase.java | 6 ++- .../container/e2e/mysql2doris/testMySQL2Doris.txt | 6 ++- .../e2e/mysql2doris/testMySQL2Doris_init.sql | 2 +- 7 files changed, 56 insertions(+), 13 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java index 16757d27..ff47880d 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java @@ -46,12 +46,13 @@ import java.util.regex.Pattern; /** * Synchronize the schema change in the upstream data source to the doris database table. * - * <p>There are two schema change modes:<br> + * <p>There are three schema change modes:<br> * 1. {@link JsonDebeziumSchemaChangeImpl} only supports table column name and column type changes, * and this mode is used by default. <br> * 2. {@link JsonDebeziumSchemaChangeImplV2} supports table column name, column type, default, * comment synchronization, supports multi-column changes, and supports column name rename. Need to - * be enabled by configuring use-new-schema-change. + * be enabled by configuring use-new-schema-change. <br> + * 3. {@link SQLParserSchemaChange} Schema Change by parsing upstream DDL. */ public abstract class JsonDebeziumSchemaChange extends CdcSchemaChange { private static final Logger LOG = LoggerFactory.getLogger(JsonDebeziumSchemaChange.class); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java index abd6f55b..63559e52 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java @@ -55,8 +55,9 @@ import java.util.regex.Pattern; /** * Extract the columns that need to be changed based on the change records of the upstream data - * source. + * source. Recommended use SQLParserSchemaChange. */ +@Deprecated public class JsonDebeziumSchemaChangeImplV2 extends JsonDebeziumSchemaChange { private static final Logger LOG = LoggerFactory.getLogger(JsonDebeziumSchemaChangeImplV2.class); private static final Pattern renameDDLPattern = @@ -183,6 +184,7 @@ public class JsonDebeziumSchemaChangeImplV2 extends JsonDebeziumSchemaChange { // remove backtick ddl = ddl.replace("`", ""); // rename ddl + // It is better to use sql_parser mode for rename Matcher renameDdlMatcher = renameDDLPattern.matcher(ddl); if (renameDdlMatcher.find()) { String oldColumnName = renameDdlMatcher.group(2); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java index eda223d6..1f145da4 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java @@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.List; +/** Schema changes are made by parsing upstream DDL statements. */ public class SQLParserSchemaChange extends JsonDebeziumSchemaChange { private static final Logger LOG = LoggerFactory.getLogger(SQLParserSchemaChange.class); private final SQLParserSchemaManager sqlParserSchemaManager; diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java index cb7d83ad..e1c8c807 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/e2e/Mysql2DorisE2ECase.java @@ -99,10 +99,22 @@ public class Mysql2DorisE2ECase extends AbstractE2EService { // wait 2 times checkpoint Thread.sleep(20000); + LOG.info("Start to verify create table result."); + String tblQuery = + String.format( + "SELECT TABLE_NAME \n" + + "FROM INFORMATION_SCHEMA.TABLES \n" + + "WHERE TABLE_SCHEMA = '%s'", + DATABASE); + List<String> expectedTables = + Arrays.asList("ods_tbl1_incr", "ods_tbl2_incr", "ods_tbl3_incr", "ods_tbl5_incr"); + ContainerUtils.checkResult( + getDorisQueryConnection(), LOG, expectedTables, tblQuery, 1, false); + LOG.info("Start to verify init result."); List<String> expected = Arrays.asList("doris_1,1", "doris_2,2", "doris_3,3", "doris_5,5"); String sql1 = - "select * from ( select * from test_e2e_mysql.tbl1 union all select * from test_e2e_mysql.tbl2 union all select * from test_e2e_mysql.tbl3 union all select * from test_e2e_mysql.tbl5) res order by 1"; + "select * from ( select * from test_e2e_mysql.ods_tbl1_incr union all select * from test_e2e_mysql.ods_tbl2_incr union all select * from test_e2e_mysql.ods_tbl3_incr union all select * from test_e2e_mysql.ods_tbl5_incr) res order by 1"; ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, sql1, 2); // add incremental data @@ -121,28 +133,47 @@ public class Mysql2DorisE2ECase extends AbstractE2EService { Arrays.asList( "doris_1,18", "doris_1_1,10", "doris_2_1,11", "doris_3,3", "doris_3_1,12"); String sql2 = - "select * from ( select * from test_e2e_mysql.tbl1 union all select * from test_e2e_mysql.tbl2 union all select * from test_e2e_mysql.tbl3 ) res order by 1"; + "select * from ( select * from test_e2e_mysql.ods_tbl1_incr union all select * from test_e2e_mysql.ods_tbl2_incr union all select * from test_e2e_mysql.ods_tbl3_incr ) res order by 1"; ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected2, sql2, 2); - // mock schema change + // mock schema change ALTER TABLE table_name RENAME COLUMN old_column_name TO + // new_column_name LOG.info("start to schema change in mysql."); ContainerUtils.executeSQLStatement( getMySQLQueryConnection(), LOG, "alter table test_e2e_mysql.tbl1 add column c1 varchar(128)", - "alter table test_e2e_mysql.tbl1 drop column age"); + "alter table test_e2e_mysql.tbl1 drop column age", + "alter table test_e2e_mysql.tbl2 rename column age to age_new_1", + "alter table test_e2e_mysql.tbl3 change column age age_new_2 int"); Thread.sleep(10000); ContainerUtils.executeSQLStatement( getMySQLQueryConnection(), LOG, - "insert into test_e2e_mysql.tbl1 values ('doris_1_1_1','c1_val')"); + "insert into test_e2e_mysql.tbl1 values ('doris_1_1_1','c1_val')", + "insert into test_e2e_mysql.tbl2 values ('doris_tbl2_rename_test',18)", + "insert into test_e2e_mysql.tbl3 values ('doris_tbl3_rename_test',38)"); Thread.sleep(20000); - LOG.info("verify tal1 schema change."); + LOG.info("verify tbl1 schema change."); List<String> schemaChangeExpected = Arrays.asList("doris_1,null", "doris_1_1,null", "doris_1_1_1,c1_val"); - String schemaChangeSql = "select * from test_e2e_mysql.tbl1 order by 1"; + String schemaChangeSql = "select name,c1 from test_e2e_mysql.ods_tbl1_incr order by 1"; + ContainerUtils.checkResult( + getDorisQueryConnection(), LOG, schemaChangeExpected, schemaChangeSql, 2); + + LOG.info("verify tbl2 schema change."); + schemaChangeExpected = Arrays.asList("doris_2_1,11", "doris_tbl2_rename_test,18"); + schemaChangeSql = "select name,age_new_1 from test_e2e_mysql.ods_tbl2_incr order by 1"; ContainerUtils.checkResult( getDorisQueryConnection(), LOG, schemaChangeExpected, schemaChangeSql, 2); + + LOG.info("verify tbl3 schema change."); + schemaChangeExpected = + Arrays.asList("doris_3,3", "doris_3_1,12", "doris_tbl3_rename_test,38"); + schemaChangeSql = "select name,age_new_2 from test_e2e_mysql.ods_tbl3_incr order by 1"; + ContainerUtils.checkResult( + getDorisQueryConnection(), LOG, schemaChangeExpected, schemaChangeSql, 2); + cancelE2EJob(jobName); } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/DorisLookupTableITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/DorisLookupTableITCase.java index 8ecb87da..d14666a6 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/DorisLookupTableITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/DorisLookupTableITCase.java @@ -353,7 +353,11 @@ public class DorisLookupTableITCase extends AbstractITCaseService { + "'password' = '%s'," + "'lookup.jdbc.async' = '%s'," + "'lookup.cache.ttl' = '10m'," - + "'lookup.cache.max-rows' = '3'" + + "'lookup.cache.max-rows' = '3'," + + "'lookup.max-retries' = '1'," + + "'lookup.jdbc.read.batch.size' = '2'," + + "'lookup.jdbc.read.batch.queue-size' = '25'," + + "'lookup.jdbc.read.thread-size' = '5'" + ")", getFenodes(), getDorisQueryUrl(), diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris.txt b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris.txt index 90a0eddc..6e248d19 100644 --- a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris.txt +++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris.txt @@ -1,8 +1,12 @@ mysql-sync-database --database test_e2e_mysql --mysql-conf database-name=test_e2e_mysql + --table-prefix ods_ + --table-suffix _incr --including-tables "tbl.*" + --excluding-tables "tbl4" --sink-conf sink.ignore.update-before=false --table-conf replication_num=1 --single-sink true - --ignore-default-value false \ No newline at end of file + --ignore-default-value false + --schema-change-mode sql_parser \ No newline at end of file diff --git a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris_init.sql b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris_init.sql index f1042491..8bdde210 100644 --- a/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris_init.sql +++ b/flink-doris-connector/src/test/resources/container/e2e/mysql2doris/testMySQL2Doris_init.sql @@ -29,7 +29,7 @@ CREATE TABLE test_e2e_mysql.tbl4 ( `name` varchar(256) primary key, `age` int ); - +insert into test_e2e_mysql.tbl4 values ('doris_4',4); DROP TABLE IF EXISTS test_e2e_mysql.tbl5; CREATE TABLE test_e2e_mysql.tbl5 ( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org