This is an automated email from the ASF dual-hosted git repository. zykkk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new c77f9d7 [fix](cdc) single sink add tableprefix and tablesuffix (#301) c77f9d7 is described below commit c77f9d7a23c55d2451cbff99f9c941491de35e78 Author: wudi <676366...@qq.com> AuthorDate: Tue Jan 23 14:41:28 2024 +0800 [fix](cdc) single sink add tableprefix and tablesuffix (#301) Currently, when single-sink is enabled and CDC automatically creates a table, it cannot automatically obtain the suffix and suffix. --- .../serializer/JsonDebeziumSchemaSerializer.java | 34 +++++++++++++++++++--- .../jsondebezium/JsonDebeziumChangeContext.java | 16 +++++++++- .../JsonDebeziumSchemaChangeImplV2.java | 12 +++++++- .../apache/doris/flink/tools/cdc/DatabaseSync.java | 2 ++ .../jsondebezium/TestJsonDebeziumDataChange.java | 8 +++-- .../TestJsonDebeziumSchemaChangeImpl.java | 4 ++- .../TestJsonDebeziumSchemaChangeImplV2.java | 4 ++- .../doris/flink/tools/cdc/MySQLDorisE2ECase.java | 2 +- 8 files changed, 71 insertions(+), 11 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java index 370bca7..5bea2d9 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java @@ -18,6 +18,7 @@ package org.apache.doris.flink.sink.writer.serializer; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.util.StringUtils; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.JsonNode; @@ -66,6 +67,8 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin // create table properties private Map<String, String> tableProperties; private String targetDatabase; + private String targetTablePrefix; + private String targetTableSuffix; private JsonDebeziumDataChange dataChange; private JsonDebeziumSchemaChange schemaChange; @@ -109,11 +112,15 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin DorisExecutionOptions executionOptions, Map<String, String> tableMapping, Map<String, String> tableProperties, - String targetDatabase) { + String targetDatabase, + String targetTablePrefix, + String targetTableSuffix) { this(dorisOptions, pattern, sourceTableName, newSchemaChange, executionOptions); this.tableMapping = tableMapping; this.tableProperties = tableProperties; this.targetDatabase = targetDatabase; + this.targetTablePrefix = targetTablePrefix; + this.targetTableSuffix = targetTableSuffix; init(); } @@ -128,8 +135,9 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin objectMapper, pattern, lineDelimiter, - ignoreUpdateBefore); - + ignoreUpdateBefore, + targetTablePrefix, + targetTableSuffix); this.schemaChange = newSchemaChange ? new JsonDebeziumSchemaChangeImplV2(changeContext) @@ -180,6 +188,8 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin private Map<String, String> tableMapping; private Map<String, String> tableProperties; private String targetDatabase; + private String targetTablePrefix = ""; + private String targetTableSuffix = ""; public JsonDebeziumSchemaSerializer.Builder setDorisOptions(DorisOptions dorisOptions) { this.dorisOptions = dorisOptions; @@ -221,6 +231,20 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin return this; } + public Builder setTargetTablePrefix(String tablePrefix) { + if (!StringUtils.isNullOrWhitespaceOnly(tablePrefix)) { + this.targetTablePrefix = tablePrefix; + } + return this; + } + + public Builder setTargetTableSuffix(String tableSuffix) { + if (!StringUtils.isNullOrWhitespaceOnly(tableSuffix)) { + this.targetTableSuffix = tableSuffix; + } + return this; + } + public JsonDebeziumSchemaSerializer build() { return new JsonDebeziumSchemaSerializer( dorisOptions, @@ -230,7 +254,9 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin executionOptions, tableMapping, tableProperties, - targetDatabase); + targetDatabase, + targetTablePrefix, + targetTableSuffix); } } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java index 9c59f14..a7253d2 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java @@ -38,6 +38,8 @@ public class JsonDebeziumChangeContext implements Serializable { private final Pattern pattern; private final String lineDelimiter; private final boolean ignoreUpdateBefore; + private String targetTablePrefix; + private String targetTableSuffix; public JsonDebeziumChangeContext( DorisOptions dorisOptions, @@ -48,7 +50,9 @@ public class JsonDebeziumChangeContext implements Serializable { ObjectMapper objectMapper, Pattern pattern, String lineDelimiter, - boolean ignoreUpdateBefore) { + boolean ignoreUpdateBefore, + String targetTablePrefix, + String targetTableSuffix) { this.dorisOptions = dorisOptions; this.tableMapping = tableMapping; this.sourceTableName = sourceTableName; @@ -58,6 +62,8 @@ public class JsonDebeziumChangeContext implements Serializable { this.pattern = pattern; this.lineDelimiter = lineDelimiter; this.ignoreUpdateBefore = ignoreUpdateBefore; + this.targetTablePrefix = targetTablePrefix; + this.targetTableSuffix = targetTableSuffix; } public DorisOptions getDorisOptions() { @@ -95,4 +101,12 @@ public class JsonDebeziumChangeContext implements Serializable { public boolean isIgnoreUpdateBefore() { return ignoreUpdateBefore; } + + public String getTargetTablePrefix() { + return targetTablePrefix; + } + + public String getTargetTableSuffix() { + return targetTableSuffix; + } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java index 5604da7..b3a90e6 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java @@ -70,6 +70,8 @@ public class JsonDebeziumSchemaChangeImplV2 extends JsonDebeziumSchemaChange { // create table properties private final Map<String, String> tableProperties; private String targetDatabase; + private String targetTablePrefix; + private String targetTableSuffix; public JsonDebeziumSchemaChangeImplV2(JsonDebeziumChangeContext changeContext) { this.addDropDDLPattern = Pattern.compile(addDropDDLRegex, Pattern.CASE_INSENSITIVE); @@ -81,6 +83,14 @@ public class JsonDebeziumSchemaChangeImplV2 extends JsonDebeziumSchemaChange { this.tableProperties = changeContext.getTableProperties(); this.tableMapping = changeContext.getTableMapping(); this.objectMapper = changeContext.getObjectMapper(); + this.targetTablePrefix = + changeContext.getTargetTablePrefix() == null + ? "" + : changeContext.getTargetTablePrefix(); + this.targetTableSuffix = + changeContext.getTargetTableSuffix() == null + ? "" + : changeContext.getTargetTableSuffix(); } @Override @@ -253,7 +263,7 @@ public class JsonDebeziumSchemaChangeImplV2 extends JsonDebeziumSchemaChange { private String getCreateTableIdentifier(JsonNode record) { String table = extractJsonNode(record.get("source"), "table"); - return targetDatabase + "." + table; + return targetDatabase + "." + targetTablePrefix + table + targetTableSuffix; } private boolean checkSchemaChange(String database, String table, DDLSchema ddlSchema) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index b6bb5e5..e153039 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -288,6 +288,8 @@ public abstract class DatabaseSync { .setTableMapping(tableMapping) .setTableProperties(tableConfig) .setTargetDatabase(database) + .setTargetTablePrefix(tablePrefix) + .setTargetTableSuffix(tableSuffix) .build()) .setDorisOptions(dorisBuilder.build()); return builder.build(); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java index d789807..8900339 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumDataChange.java @@ -48,7 +48,9 @@ public class TestJsonDebeziumDataChange extends TestJsonDebeziumChangeBase { objectMapper, null, lineDelimiter, - ignoreUpdateBefore); + ignoreUpdateBefore, + "", + ""); dataChange = new JsonDebeziumDataChange(changeContext); } @@ -109,7 +111,9 @@ public class TestJsonDebeziumDataChange extends TestJsonDebeziumChangeBase { objectMapper, null, lineDelimiter, - false); + false, + "", + ""); dataChange = new JsonDebeziumDataChange(changeContext); // update t1 set name='doris-update' WHERE id =1; diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java index 9b003a8..c8997a1 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImpl.java @@ -55,7 +55,9 @@ public class TestJsonDebeziumSchemaChangeImpl extends TestJsonDebeziumChangeBase objectMapper, null, lineDelimiter, - ignoreUpdateBefore); + ignoreUpdateBefore, + "", + ""); schemaChange = new JsonDebeziumSchemaChangeImpl(changeContext); } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java index c63267d..0ce60d3 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java @@ -58,7 +58,9 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends TestJsonDebeziumChangeBa objectMapper, null, lineDelimiter, - ignoreUpdateBefore); + ignoreUpdateBefore, + "", + ""); schemaChange = new JsonDebeziumSchemaChangeImplV2(changeContext); } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java index 242f93f..3390f75 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/MySQLDorisE2ECase.java @@ -72,7 +72,7 @@ public class MySQLDorisE2ECase extends DorisTestBase { private static final String TABLE_4 = "tbl4"; private static final MySQLContainer MYSQL_CONTAINER = - new MySQLContainer("mysql") + new MySQLContainer("mysql:8.0") .withDatabaseName(DATABASE) .withUsername(MYSQL_USER) .withPassword(MYSQL_PASSWD); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org