This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new dbed8562 [Feature] support auto create table for shard table (#564) dbed8562 is described below commit dbed856246f2d035df544381c4d6208516810d0d Author: wudi <676366...@qq.com> AuthorDate: Wed Feb 26 10:13:08 2025 +0800 [Feature] support auto create table for shard table (#564) --- .../flink/catalog/doris/DorisSchemaFactory.java | 6 +- .../serializer/JsonDebeziumSchemaSerializer.java | 18 +++++- .../jsondebezium/JsonDebeziumChangeContext.java | 43 +++++++++++++ .../jsondebezium/JsonDebeziumSchemaChange.java | 10 ++- .../JsonDebeziumSchemaChangeImplV2.java | 1 + .../jsondebezium/SQLParserSchemaChange.java | 1 + .../apache/doris/flink/tools/cdc/DatabaseSync.java | 50 +-------------- .../flink/tools/cdc/ParsingProcessFunction.java | 5 +- .../tools/cdc/converter/TableNameConverter.java | 73 ++++++++++++++++++++++ .../cdc/mongodb/MongoParsingProcessFunction.java | 2 +- .../TestJsonDebeziumSchemaChangeImplV2.java | 60 ++++++++++++++++++ 11 files changed, 212 insertions(+), 57 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactory.java index dd42f803..07b05e5e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactory.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactory.java @@ -255,10 +255,12 @@ public class DorisSchemaFactory { } public static String quoteDefaultValue(String defaultValue) { - // DEFAULT current_timestamp not need quote - if (defaultValue.equalsIgnoreCase("current_timestamp")) { + // DEFAULT current_timestamp or null not need quote + if (defaultValue.equalsIgnoreCase("current_timestamp") + || defaultValue.equalsIgnoreCase("null")) { return defaultValue; } + return "'" + defaultValue + "'"; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java index 9c89fce3..93b7efcd 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java @@ -38,6 +38,7 @@ import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumSc import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumSchemaChangeImplV2; import org.apache.doris.flink.sink.writer.serializer.jsondebezium.SQLParserSchemaChange; import org.apache.doris.flink.tools.cdc.DorisTableConfig; +import org.apache.doris.flink.tools.cdc.converter.TableNameConverter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,6 +82,7 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin private JsonDebeziumDataChange dataChange; private JsonDebeziumSchemaChange schemaChange; private SchemaChangeMode schemaChangeMode; + private TableNameConverter tableNameConverter; private final Set<String> initTableSet = new HashSet<>(); public JsonDebeziumSchemaSerializer( @@ -127,7 +129,8 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin String targetDatabase, String targetTablePrefix, String targetTableSuffix, - SchemaChangeMode schemaChangeMode) { + SchemaChangeMode schemaChangeMode, + TableNameConverter tableNameConverter) { this(dorisOptions, pattern, sourceTableName, newSchemaChange, executionOptions); this.tableMapping = tableMapping; this.targetDatabase = targetDatabase; @@ -135,6 +138,7 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin this.targetTableSuffix = targetTableSuffix; this.schemaChangeMode = schemaChangeMode; this.dorisTableConfig = dorisTableConfig; + this.tableNameConverter = tableNameConverter; init(); } @@ -152,7 +156,8 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin ignoreUpdateBefore, targetTablePrefix, targetTableSuffix, - enableDelete); + enableDelete, + tableNameConverter); initSchemaChangeInstance(changeContext); this.dataChange = new JsonDebeziumDataChange(changeContext); } @@ -233,6 +238,7 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin private String targetDatabase; private String targetTablePrefix = ""; private String targetTableSuffix = ""; + private TableNameConverter tableNameConverter; public JsonDebeziumSchemaSerializer.Builder setDorisOptions(DorisOptions dorisOptions) { this.dorisOptions = dorisOptions; @@ -302,6 +308,11 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin return this; } + public Builder setTableNameConverter(TableNameConverter tableNameConverter) { + this.tableNameConverter = tableNameConverter; + return this; + } + public JsonDebeziumSchemaSerializer build() { return new JsonDebeziumSchemaSerializer( dorisOptions, @@ -314,7 +325,8 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin targetDatabase, targetTablePrefix, targetTableSuffix, - schemaChangeMode); + schemaChangeMode, + tableNameConverter); } } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java index 2f7764f3..c9811189 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java @@ -17,9 +17,12 @@ package org.apache.doris.flink.sink.writer.serializer.jsondebezium; +import org.apache.flink.annotation.VisibleForTesting; + import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.tools.cdc.DorisTableConfig; +import org.apache.doris.flink.tools.cdc.converter.TableNameConverter; import java.io.Serializable; import java.util.HashMap; @@ -44,6 +47,7 @@ public class JsonDebeziumChangeContext implements Serializable { private final boolean enableDelete; private final String targetTablePrefix; private final String targetTableSuffix; + private TableNameConverter tableNameConverter; public JsonDebeziumChangeContext( DorisOptions dorisOptions, @@ -72,6 +76,36 @@ public class JsonDebeziumChangeContext implements Serializable { this.targetTableSuffix = targetTableSuffix; } + public JsonDebeziumChangeContext( + DorisOptions dorisOptions, + Map<String, String> tableMapping, + String sourceTableName, + String targetDatabase, + DorisTableConfig dorisTableConfig, + ObjectMapper objectMapper, + Pattern pattern, + String lineDelimiter, + boolean ignoreUpdateBefore, + String targetTablePrefix, + String targetTableSuffix, + boolean enableDelete, + TableNameConverter tableNameConverter) { + this( + dorisOptions, + tableMapping, + sourceTableName, + targetDatabase, + dorisTableConfig, + objectMapper, + pattern, + lineDelimiter, + ignoreUpdateBefore, + targetTablePrefix, + targetTableSuffix, + enableDelete); + this.tableNameConverter = tableNameConverter; + } + public DorisOptions getDorisOptions() { return dorisOptions; } @@ -119,6 +153,10 @@ public class JsonDebeziumChangeContext implements Serializable { return targetTableSuffix; } + public TableNameConverter getTableNameConverter() { + return tableNameConverter; + } + public boolean enableDelete() { return enableDelete; } @@ -126,4 +164,9 @@ public class JsonDebeziumChangeContext implements Serializable { public DorisTableConfig getDorisTableConf() { return dorisTableConfig; } + + @VisibleForTesting + public void setTableNameConverter(TableNameConverter tableNameConverter) { + this.tableNameConverter = tableNameConverter; + } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java index 91be21fa..16757d27 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java @@ -33,6 +33,7 @@ import org.apache.doris.flink.sink.writer.EventType; import org.apache.doris.flink.tools.cdc.DorisTableConfig; import org.apache.doris.flink.tools.cdc.SourceConnector; import org.apache.doris.flink.tools.cdc.SourceSchema; +import org.apache.doris.flink.tools.cdc.converter.TableNameConverter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,6 +72,7 @@ public abstract class JsonDebeziumSchemaChange extends CdcSchemaChange { protected String targetTablePrefix; protected String targetTableSuffix; protected DorisTableConfig dorisTableConfig; + protected TableNameConverter tableNameConverter; public abstract boolean schemaChange(JsonNode recordRoot); @@ -196,7 +198,13 @@ public abstract class JsonDebeziumSchemaChange extends CdcSchemaChange { protected String getCreateTableIdentifier(JsonNode record) { String table = extractJsonNode(record.get("source"), "table"); - return targetDatabase + "." + targetTablePrefix + table + targetTableSuffix; + String createTblName; + if (tableNameConverter != null) { + createTblName = tableNameConverter.convert(table); + } else { + createTblName = targetTablePrefix + table + targetTableSuffix; + } + return targetDatabase + "." + createTblName; } public Map<String, String> getTableMapping() { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java index 0b9172e8..abd6f55b 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java @@ -88,6 +88,7 @@ public class JsonDebeziumSchemaChangeImplV2 extends JsonDebeziumSchemaChange { changeContext.getTargetTableSuffix() == null ? "" : changeContext.getTargetTableSuffix(); + this.tableNameConverter = changeContext.getTableNameConverter(); } @Override diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java index fd10ba53..eda223d6 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java @@ -53,6 +53,7 @@ public class SQLParserSchemaChange extends JsonDebeziumSchemaChange { changeContext.getTargetTableSuffix() == null ? "" : changeContext.getTargetTableSuffix(); + this.tableNameConverter = changeContext.getTableNameConverter(); } @Override diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index d6c69c0b..06e49e24 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -42,11 +42,11 @@ import org.apache.doris.flink.sink.writer.WriteMode; import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer; import org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerializer; import org.apache.doris.flink.table.DorisConfigOptions; +import org.apache.doris.flink.tools.cdc.converter.TableNameConverter; import org.apache.doris.flink.tools.cdc.oracle.OracleDatabaseSync; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Serializable; import java.sql.Connection; import java.sql.SQLException; import java.sql.SQLSyntaxErrorException; @@ -343,6 +343,7 @@ public abstract class DatabaseSync { .setTargetDatabase(database) .setTargetTablePrefix(tablePrefix) .setTargetTableSuffix(tableSuffix) + .setTableNameConverter(converter) .build(); } @@ -618,51 +619,4 @@ public abstract class DatabaseSync { this.tableSuffix = tableSuffix; return this; } - - public static class TableNameConverter implements Serializable { - private static final long serialVersionUID = 1L; - private final String prefix; - private final String suffix; - private Map<Pattern, String> multiToOneRulesPattern; - - TableNameConverter() { - this("", ""); - } - - TableNameConverter(String prefix, String suffix) { - this.prefix = prefix == null ? "" : prefix; - this.suffix = suffix == null ? "" : suffix; - } - - TableNameConverter( - String prefix, String suffix, Map<Pattern, String> multiToOneRulesPattern) { - this.prefix = prefix == null ? "" : prefix; - this.suffix = suffix == null ? "" : suffix; - this.multiToOneRulesPattern = multiToOneRulesPattern; - } - - public String convert(String tableName) { - if (multiToOneRulesPattern == null) { - return prefix + tableName + suffix; - } - - String target = null; - - for (Map.Entry<Pattern, String> patternStringEntry : - multiToOneRulesPattern.entrySet()) { - if (patternStringEntry.getKey().matcher(tableName).matches()) { - target = patternStringEntry.getValue(); - } - } - /** - * If multiToOneRulesPattern is not null and target is not assigned, then the - * synchronization task contains both multi to one and one to one , prefixes and - * suffixes are added to common one-to-one mapping tables - */ - if (target == null) { - return prefix + tableName + suffix; - } - return target; - } - } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java index 22e2b9bc..8315f2da 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java @@ -26,6 +26,7 @@ import org.apache.flink.util.StringUtils; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.doris.flink.tools.cdc.converter.TableNameConverter; import java.util.HashMap; import java.util.Map; @@ -33,10 +34,10 @@ import java.util.Map; public class ParsingProcessFunction extends ProcessFunction<String, Void> { protected ObjectMapper objectMapper = new ObjectMapper(); private transient Map<String, OutputTag<String>> recordOutputTags; - private DatabaseSync.TableNameConverter converter; + private TableNameConverter converter; private String database; - public ParsingProcessFunction(String database, DatabaseSync.TableNameConverter converter) { + public ParsingProcessFunction(String database, TableNameConverter converter) { this.database = database; this.converter = converter; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/converter/TableNameConverter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/converter/TableNameConverter.java new file mode 100644 index 00000000..a9df452c --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/converter/TableNameConverter.java @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.tools.cdc.converter; + +import java.io.Serializable; +import java.util.Map; +import java.util.regex.Pattern; + +/* + * Convert the table name of the upstream data source to the table name of the doris database. + * */ +public class TableNameConverter implements Serializable { + + private static final long serialVersionUID = 1L; + private final String prefix; + private final String suffix; + + // tbl_.*, tbl + private Map<Pattern, String> routeRules; + + public TableNameConverter() { + this("", ""); + } + + public TableNameConverter(String prefix, String suffix) { + this.prefix = prefix == null ? "" : prefix; + this.suffix = suffix == null ? "" : suffix; + } + + public TableNameConverter(String prefix, String suffix, Map<Pattern, String> routeRules) { + this.prefix = prefix == null ? "" : prefix; + this.suffix = suffix == null ? "" : suffix; + this.routeRules = routeRules; + } + + public String convert(String tableName) { + if (routeRules == null) { + return prefix + tableName + suffix; + } + + String target = null; + + for (Map.Entry<Pattern, String> patternStringEntry : routeRules.entrySet()) { + if (patternStringEntry.getKey().matcher(tableName).matches()) { + target = patternStringEntry.getValue(); + } + } + /** + * If routeRules is not null and target is not assigned, then the synchronization task + * contains both multi to one and one to one , prefixes and suffixes are added to common + * one-to-one mapping tables + */ + if (target == null) { + return prefix + tableName + suffix; + } + return target; + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunction.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunction.java index 72c61567..704af781 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunction.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunction.java @@ -19,8 +19,8 @@ package org.apache.doris.flink.tools.cdc.mongodb; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.NullNode; -import org.apache.doris.flink.tools.cdc.DatabaseSync.TableNameConverter; import org.apache.doris.flink.tools.cdc.ParsingProcessFunction; +import org.apache.doris.flink.tools.cdc.converter.TableNameConverter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java index 39241f94..fc3f6ffb 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java @@ -28,6 +28,7 @@ import org.apache.doris.flink.rest.RestService; import org.apache.doris.flink.rest.models.Schema; import org.apache.doris.flink.tools.cdc.DorisTableConfig; import org.apache.doris.flink.tools.cdc.SourceConnector; +import org.apache.doris.flink.tools.cdc.converter.TableNameConverter; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -42,6 +43,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.regex.Pattern; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mockStatic; @@ -50,7 +52,10 @@ import static org.mockito.Mockito.mockStatic; public class TestJsonDebeziumSchemaChangeImplV2 extends TestJsonDebeziumChangeBase { private JsonDebeziumSchemaChangeImplV2 schemaChange; + private JsonDebeziumSchemaChangeImplV2 schemaChangeWithConvert; private JsonDebeziumChangeContext changeContext; + private JsonDebeziumChangeContext convertContext; + private MockedStatic<RestService> mockRestService; @Before @@ -86,6 +91,23 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends TestJsonDebeziumChangeBa "", true); schemaChange = new JsonDebeziumSchemaChangeImplV2(changeContext); + + convertContext = + new JsonDebeziumChangeContext( + dorisOptions, + tableMapping, + sourceTableName, + targetDatabase, + new DorisTableConfig(tableProperties), + objectMapper, + null, + lineDelimiter, + ignoreUpdateBefore, + "ods_", + "_dt", + true, + new TableNameConverter("ods_", "_dt")); + schemaChangeWithConvert = new JsonDebeziumSchemaChangeImplV2(convertContext); } @Test @@ -489,6 +511,44 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends TestJsonDebeziumChangeBa schemaChange.setSourceConnector(SourceConnector.MYSQL.connectorName); } + @Test + public void testAutoCreateTableWithConvert() throws Exception { + String record = + "{ \"source\":{ \"version\":\"1.9.7.Final\", \"connector\":\"oracle\", \"name\":\"oracle_logminer\", \"ts_ms\":1696945825065, \"snapshot\":\"true\", \"db\":\"TESTDB\", \"sequence\":null, \"schema\":\"ADMIN\", \"table\":\"PERSONS\", \"txId\":null, \"scn\":\"1199617\", \"commit_scn\":null, \"lcr_position\":null, \"rs_id\":null, \"ssn\":0, \"redo_thread\":null [...] + JsonNode recordRoot = objectMapper.readTree(record); + schemaChangeWithConvert.setSourceConnector(SourceConnector.ORACLE.connectorName); + TableSchema tableSchema = schemaChangeWithConvert.extractCreateTableSchema(recordRoot); + Assert.assertEquals("TESTDB", tableSchema.getDatabase()); + Assert.assertEquals("ods_PERSONS_dt", tableSchema.getTable()); + Assert.assertArrayEquals(new String[] {"ID"}, tableSchema.getKeys().toArray()); + Assert.assertEquals(3, tableSchema.getFields().size()); + Assert.assertEquals("ID", tableSchema.getFields().get("ID").getName()); + Assert.assertEquals("NAME4", tableSchema.getFields().get("NAME4").getName()); + Assert.assertEquals("age4", tableSchema.getFields().get("age4").getName()); + + // match table + Map<Pattern, String> tableConvert = new HashMap<>(); + tableConvert.put(Pattern.compile("PER.*"), "PERSONS_RES"); + + convertContext.setTableNameConverter( + new TableNameConverter("prefix_", "_suffix", tableConvert)); + schemaChangeWithConvert = new JsonDebeziumSchemaChangeImplV2(convertContext); + TableSchema tableSchema2 = schemaChangeWithConvert.extractCreateTableSchema(recordRoot); + Assert.assertEquals("TESTDB", tableSchema2.getDatabase()); + Assert.assertEquals("PERSONS_RES", tableSchema2.getTable()); + + // no match table + Map<Pattern, String> tableConvert2 = new HashMap<>(); + tableConvert2.put(Pattern.compile("NOMATCH.*"), "PERSONS_RES"); + convertContext.setTableNameConverter(new TableNameConverter("pre_", "_suf", tableConvert2)); + schemaChangeWithConvert = new JsonDebeziumSchemaChangeImplV2(convertContext); + TableSchema tableSchema3 = schemaChangeWithConvert.extractCreateTableSchema(recordRoot); + Assert.assertEquals("TESTDB", tableSchema3.getDatabase()); + Assert.assertEquals("pre_PERSONS_suf", tableSchema3.getTable()); + + schemaChangeWithConvert.setSourceConnector(SourceConnector.MYSQL.connectorName); + } + @Test public void testDateTimeFullOrigin() throws JsonProcessingException { Map<String, Map<String, FieldSchema>> originFieldSchemaMap = new LinkedHashMap<>(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org