This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 43f9e014 [Fix](cdc) fix enable-delete option not work (#455) 43f9e014 is described below commit 43f9e014c4742949e9625106252a375a1e51b82c Author: Petrichor <31833513+vinle...@users.noreply.github.com> AuthorDate: Mon Aug 5 18:19:53 2024 +0800 [Fix](cdc) fix enable-delete option not work (#455) --- .../serializer/JsonDebeziumSchemaSerializer.java | 5 +- .../jsondebezium/JsonDebeziumChangeContext.java | 9 +- .../jsondebezium/JsonDebeziumDataChange.java | 4 +- .../MongoDBJsonDebeziumSchemaSerializer.java | 5 +- .../serializer/MongoJsonDebeziumDataChange.java | 4 +- .../jsondebezium/TestJsonDebeziumDataChange.java | 6 +- .../TestJsonDebeziumSchemaChangeImpl.java | 3 +- .../TestJsonDebeziumSchemaChangeImplV2.java | 3 +- .../jsondebezium/TestSQLParserSchemaChange.java | 3 +- .../doris/flink/tools/cdc/MySQLDorisE2ECase.java | 116 +++++++++++++++++++++ 10 files changed, 148 insertions(+), 10 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java index 83dec58b..9c89fce3 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java @@ -70,6 +70,7 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin private final boolean newSchemaChange; private String lineDelimiter = LINE_DELIMITER_DEFAULT; private boolean ignoreUpdateBefore = true; + private boolean enableDelete = true; // <cdc db.schema.table, doris db.table> private Map<String, String> tableMapping; // create table properties @@ -111,6 +112,7 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin .getStreamLoadProp() .getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT); this.ignoreUpdateBefore = executionOptions.getIgnoreUpdateBefore(); + this.enableDelete = executionOptions.getDeletable(); } } @@ -149,7 +151,8 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin lineDelimiter, ignoreUpdateBefore, targetTablePrefix, - targetTableSuffix); + targetTableSuffix, + enableDelete); initSchemaChangeInstance(changeContext); this.dataChange = new JsonDebeziumDataChange(changeContext); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java index d1326c72..2f7764f3 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java @@ -41,6 +41,7 @@ public class JsonDebeziumChangeContext implements Serializable { private final Pattern pattern; private final String lineDelimiter; private final boolean ignoreUpdateBefore; + private final boolean enableDelete; private final String targetTablePrefix; private final String targetTableSuffix; @@ -55,7 +56,8 @@ public class JsonDebeziumChangeContext implements Serializable { String lineDelimiter, boolean ignoreUpdateBefore, String targetTablePrefix, - String targetTableSuffix) { + String targetTableSuffix, + boolean enableDelete) { this.dorisOptions = dorisOptions; this.tableMapping = tableMapping; this.sourceTableName = sourceTableName; @@ -65,6 +67,7 @@ public class JsonDebeziumChangeContext implements Serializable { this.pattern = pattern; this.lineDelimiter = lineDelimiter; this.ignoreUpdateBefore = ignoreUpdateBefore; + this.enableDelete = enableDelete; this.targetTablePrefix = targetTablePrefix; this.targetTableSuffix = targetTableSuffix; } @@ -116,6 +119,10 @@ public class JsonDebeziumChangeContext implements Serializable { return targetTableSuffix; } + public boolean enableDelete() { + return enableDelete; + } + public DorisTableConfig getDorisTableConf() { return dorisTableConfig; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java index 5075adf8..298cfb95 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumDataChange.java @@ -50,6 +50,7 @@ public class JsonDebeziumDataChange extends CdcDataChange { private final ObjectMapper objectMapper; private final DorisOptions dorisOptions; private final boolean ignoreUpdateBefore; + private final boolean enableDelete; private final String lineDelimiter; private final JsonDebeziumChangeContext changeContext; @@ -59,6 +60,7 @@ public class JsonDebeziumDataChange extends CdcDataChange { this.objectMapper = changeContext.getObjectMapper(); this.ignoreUpdateBefore = changeContext.isIgnoreUpdateBefore(); this.lineDelimiter = changeContext.getLineDelimiter(); + this.enableDelete = changeContext.enableDelete(); } public DorisRecord serialize(String record, JsonNode recordRoot, String op) throws IOException { @@ -87,7 +89,7 @@ public class JsonDebeziumDataChange extends CdcDataChange { return DorisRecord.of(dorisTableIdentifier, extractUpdate(recordRoot)); case OP_DELETE: valueMap = extractBeforeRow(recordRoot); - addDeleteSign(valueMap, true); + addDeleteSign(valueMap, enableDelete); break; default: LOG.error("parse record fail, unknown op {} in {}", op, record); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java index b7faec2f..d4a87ff8 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java @@ -50,6 +50,7 @@ public class MongoDBJsonDebeziumSchemaSerializer implements DorisRecordSerialize private final String sourceTableName; private String lineDelimiter = LINE_DELIMITER_DEFAULT; private boolean ignoreUpdateBefore = true; + private boolean enableDelete = true; // <cdc db.schema.table, doris db.table> private Map<String, String> tableMapping; // create table properties @@ -90,6 +91,7 @@ public class MongoDBJsonDebeziumSchemaSerializer implements DorisRecordSerialize .getStreamLoadProp() .getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT); this.ignoreUpdateBefore = executionOptions.getIgnoreUpdateBefore(); + this.enableDelete = executionOptions.getDeletable(); } init(); } @@ -107,7 +109,8 @@ public class MongoDBJsonDebeziumSchemaSerializer implements DorisRecordSerialize lineDelimiter, ignoreUpdateBefore, targetTablePrefix, - targetTableSuffix); + targetTableSuffix, + enableDelete); this.dataChange = new MongoJsonDebeziumDataChange(changeContext); this.schemaChange = new MongoJsonDebeziumSchemaChange(changeContext); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java index 8048e38a..9dbe7ffe 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java @@ -61,6 +61,7 @@ public class MongoJsonDebeziumDataChange extends CdcDataChange implements Change public JsonDebeziumChangeContext changeContext; public ObjectMapper objectMapper; public Map<String, String> tableMapping; + private final boolean enableDelete; public MongoJsonDebeziumDataChange(JsonDebeziumChangeContext changeContext) { this.changeContext = changeContext; @@ -68,6 +69,7 @@ public class MongoJsonDebeziumDataChange extends CdcDataChange implements Change this.objectMapper = changeContext.getObjectMapper(); this.lineDelimiter = changeContext.getLineDelimiter(); this.tableMapping = changeContext.getTableMapping(); + this.enableDelete = changeContext.enableDelete(); } @Override @@ -93,7 +95,7 @@ public class MongoJsonDebeziumDataChange extends CdcDataChange implements Change break; case OP_DELETE: valueMap = extractDeleteRow(recordRoot); - addDeleteSign(valueMap, true); + addDeleteSign(valueMap, enableDelete); break; default: LOG.error("parse record fail, unknown op {} in {}", op, record); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java index 4891d820..f8098ccc 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java @@ -50,7 +50,8 @@ public class TestJsonDebeziumDataChange extends TestJsonDebeziumChangeBase { lineDelimiter, ignoreUpdateBefore, "", - ""); + "", + true); dataChange = new JsonDebeziumDataChange(changeContext); } @@ -113,7 +114,8 @@ public class TestJsonDebeziumDataChange extends TestJsonDebeziumChangeBase { lineDelimiter, false, "", - ""); + "", + true); dataChange = new JsonDebeziumDataChange(changeContext); // update t1 set name='doris-update' WHERE id =1; diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java index caf5542c..e66ecaab 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java @@ -63,7 +63,8 @@ public class TestJsonDebeziumSchemaChangeImpl extends TestJsonDebeziumChangeBase lineDelimiter, ignoreUpdateBefore, "", - ""); + "", + true); schemaChange = new JsonDebeziumSchemaChangeImpl(changeContext); } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java index 68239618..3eec0fb6 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java @@ -83,7 +83,8 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends TestJsonDebeziumChangeBa lineDelimiter, ignoreUpdateBefore, "", - ""); + "", + true); schemaChange = new JsonDebeziumSchemaChangeImplV2(changeContext); } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java index d3194f81..a7958b70 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestSQLParserSchemaChange.java @@ -46,7 +46,8 @@ public class TestSQLParserSchemaChange extends TestJsonDebeziumChangeBase { lineDelimiter, ignoreUpdateBefore, "", - ""); + "", + true); schemaChange = new SQLParserSchemaChange(changeContext); } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java index 6a613841..aeb17c29 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java @@ -503,6 +503,122 @@ public class MySQLDorisE2ECase extends DorisTestBase { jobClient.cancel().get(); } + @Test + public void testMySQL2DorisEnableDelete() throws Exception { + printClusterStatus(); + initializeMySQLTable(); + initializeDorisTable(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(RestartStrategies.noRestart()); + Map<String, String> flinkMap = new HashMap<>(); + flinkMap.put("execution.checkpointing.interval", "10s"); + flinkMap.put("pipeline.operator-chaining", "false"); + flinkMap.put("parallelism.default", "1"); + + Configuration configuration = Configuration.fromMap(flinkMap); + env.configure(configuration); + + String database = DATABASE; + Map<String, String> mysqlConfig = new HashMap<>(); + mysqlConfig.put("database-name", DATABASE); + mysqlConfig.put("hostname", MYSQL_CONTAINER.getHost()); + mysqlConfig.put("port", MYSQL_CONTAINER.getMappedPort(3306) + ""); + mysqlConfig.put("username", MYSQL_USER); + mysqlConfig.put("password", MYSQL_PASSWD); + mysqlConfig.put("server-time-zone", "Asia/Shanghai"); + Configuration config = Configuration.fromMap(mysqlConfig); + + Map<String, String> sinkConfig = new HashMap<>(); + sinkConfig.put("fenodes", getFenodes()); + sinkConfig.put("username", USERNAME); + sinkConfig.put("password", PASSWORD); + sinkConfig.put("jdbc-url", String.format(DorisTestBase.URL, DORIS_CONTAINER.getHost())); + sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString()); + sinkConfig.put("sink.check-interval", "5000"); + sinkConfig.put("sink.enable-delete", "false"); + Configuration sinkConf = Configuration.fromMap(sinkConfig); + + Map<String, String> tableConfig = new HashMap<>(); + tableConfig.put("replication_num", "1"); + + String includingTables = "tbl1|tbl2|tbl3|tbl5"; + String excludingTables = ""; + DatabaseSync databaseSync = new MysqlDatabaseSync(); + databaseSync + .setEnv(env) + .setDatabase(database) + .setConfig(config) + .setIncludingTables(includingTables) + .setExcludingTables(excludingTables) + .setIgnoreDefaultValue(false) + .setSinkConfig(sinkConf) + .setTableConfig(tableConfig) + .setCreateTableOnly(false) + .setNewSchemaChange(true) + // no single sink + .setSingleSink(false) + .create(); + databaseSync.build(); + JobClient jobClient = env.executeAsync(); + waitForJobStatus( + jobClient, + Collections.singletonList(RUNNING), + Deadline.fromNow(Duration.ofSeconds(10))); + + // wait 2 times checkpoint + Thread.sleep(20000); + List<String> expected = Arrays.asList("doris_1,1", "doris_2,2", "doris_3,3", "doris_5,5"); + String sql = + "select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s union all select * from %s.%s) res order by 1"; + String query1 = + String.format( + sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3, DATABASE, + TABLE_5); + checkResult(expected, query1, 2); + + // add incremental data + try (Connection connection = + DriverManager.getConnection( + MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWD); + Statement statement = connection.createStatement()) { + statement.execute( + String.format("insert into %s.%s values ('doris_1_1',10)", DATABASE, TABLE_1)); + statement.execute( + String.format("insert into %s.%s values ('doris_2_1',11)", DATABASE, TABLE_2)); + statement.execute( + String.format("insert into %s.%s values ('doris_3_1',12)", DATABASE, TABLE_3)); + + statement.execute( + String.format( + "update %s.%s set age=18 where name='doris_1'", DATABASE, TABLE_1)); + statement.execute( + String.format("delete from %s.%s where name='doris_2'", DATABASE, TABLE_2)); + statement.execute( + String.format("delete from %s.%s where name='doris_3'", DATABASE, TABLE_3)); + statement.execute( + String.format("delete from %s.%s where name='doris_5'", DATABASE, TABLE_5)); + } + + Thread.sleep(20000); + List<String> expected2 = + Arrays.asList( + "doris_1,18", + "doris_1_1,10", + "doris_2,2", + "doris_2_1,11", + "doris_3,3", + "doris_3_1,12", + "doris_5,5"); + sql = + "select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s union all select * from %s.%s) res order by 1"; + String query2 = + String.format( + sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3, DATABASE, + TABLE_5); + checkResult(expected2, query2, 2); + jobClient.cancel().get(); + } + private void initializeDorisTable() throws Exception { try (Connection connection = DriverManager.getConnection( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org