This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new dbed8562 [Feature] support auto create table for shard table (#564)
dbed8562 is described below

commit dbed856246f2d035df544381c4d6208516810d0d
Author: wudi <676366...@qq.com>
AuthorDate: Wed Feb 26 10:13:08 2025 +0800

    [Feature] support auto create table for shard table (#564)
---
 .../flink/catalog/doris/DorisSchemaFactory.java    |  6 +-
 .../serializer/JsonDebeziumSchemaSerializer.java   | 18 +++++-
 .../jsondebezium/JsonDebeziumChangeContext.java    | 43 +++++++++++++
 .../jsondebezium/JsonDebeziumSchemaChange.java     | 10 ++-
 .../JsonDebeziumSchemaChangeImplV2.java            |  1 +
 .../jsondebezium/SQLParserSchemaChange.java        |  1 +
 .../apache/doris/flink/tools/cdc/DatabaseSync.java | 50 +--------------
 .../flink/tools/cdc/ParsingProcessFunction.java    |  5 +-
 .../tools/cdc/converter/TableNameConverter.java    | 73 ++++++++++++++++++++++
 .../cdc/mongodb/MongoParsingProcessFunction.java   |  2 +-
 .../TestJsonDebeziumSchemaChangeImplV2.java        | 60 ++++++++++++++++++
 11 files changed, 212 insertions(+), 57 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactory.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactory.java
index dd42f803..07b05e5e 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactory.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSchemaFactory.java
@@ -255,10 +255,12 @@ public class DorisSchemaFactory {
     }
 
     public static String quoteDefaultValue(String defaultValue) {
-        // DEFAULT current_timestamp not need quote
-        if (defaultValue.equalsIgnoreCase("current_timestamp")) {
+        // DEFAULT current_timestamp or null not need quote
+        if (defaultValue.equalsIgnoreCase("current_timestamp")
+                || defaultValue.equalsIgnoreCase("null")) {
             return defaultValue;
         }
+
         return "'" + defaultValue + "'";
     }
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
index 9c89fce3..93b7efcd 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
@@ -38,6 +38,7 @@ import 
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumSc
 import 
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumSchemaChangeImplV2;
 import 
org.apache.doris.flink.sink.writer.serializer.jsondebezium.SQLParserSchemaChange;
 import org.apache.doris.flink.tools.cdc.DorisTableConfig;
+import org.apache.doris.flink.tools.cdc.converter.TableNameConverter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -81,6 +82,7 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
     private JsonDebeziumDataChange dataChange;
     private JsonDebeziumSchemaChange schemaChange;
     private SchemaChangeMode schemaChangeMode;
+    private TableNameConverter tableNameConverter;
     private final Set<String> initTableSet = new HashSet<>();
 
     public JsonDebeziumSchemaSerializer(
@@ -127,7 +129,8 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
             String targetDatabase,
             String targetTablePrefix,
             String targetTableSuffix,
-            SchemaChangeMode schemaChangeMode) {
+            SchemaChangeMode schemaChangeMode,
+            TableNameConverter tableNameConverter) {
         this(dorisOptions, pattern, sourceTableName, newSchemaChange, 
executionOptions);
         this.tableMapping = tableMapping;
         this.targetDatabase = targetDatabase;
@@ -135,6 +138,7 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
         this.targetTableSuffix = targetTableSuffix;
         this.schemaChangeMode = schemaChangeMode;
         this.dorisTableConfig = dorisTableConfig;
+        this.tableNameConverter = tableNameConverter;
         init();
     }
 
@@ -152,7 +156,8 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
                         ignoreUpdateBefore,
                         targetTablePrefix,
                         targetTableSuffix,
-                        enableDelete);
+                        enableDelete,
+                        tableNameConverter);
         initSchemaChangeInstance(changeContext);
         this.dataChange = new JsonDebeziumDataChange(changeContext);
     }
@@ -233,6 +238,7 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
         private String targetDatabase;
         private String targetTablePrefix = "";
         private String targetTableSuffix = "";
+        private TableNameConverter tableNameConverter;
 
         public JsonDebeziumSchemaSerializer.Builder 
setDorisOptions(DorisOptions dorisOptions) {
             this.dorisOptions = dorisOptions;
@@ -302,6 +308,11 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
             return this;
         }
 
+        public Builder setTableNameConverter(TableNameConverter 
tableNameConverter) {
+            this.tableNameConverter = tableNameConverter;
+            return this;
+        }
+
         public JsonDebeziumSchemaSerializer build() {
             return new JsonDebeziumSchemaSerializer(
                     dorisOptions,
@@ -314,7 +325,8 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
                     targetDatabase,
                     targetTablePrefix,
                     targetTableSuffix,
-                    schemaChangeMode);
+                    schemaChangeMode,
+                    tableNameConverter);
         }
     }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
index 2f7764f3..c9811189 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumChangeContext.java
@@ -17,9 +17,12 @@
 
 package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
 
+import org.apache.flink.annotation.VisibleForTesting;
+
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.tools.cdc.DorisTableConfig;
+import org.apache.doris.flink.tools.cdc.converter.TableNameConverter;
 
 import java.io.Serializable;
 import java.util.HashMap;
@@ -44,6 +47,7 @@ public class JsonDebeziumChangeContext implements 
Serializable {
     private final boolean enableDelete;
     private final String targetTablePrefix;
     private final String targetTableSuffix;
+    private TableNameConverter tableNameConverter;
 
     public JsonDebeziumChangeContext(
             DorisOptions dorisOptions,
@@ -72,6 +76,36 @@ public class JsonDebeziumChangeContext implements 
Serializable {
         this.targetTableSuffix = targetTableSuffix;
     }
 
+    public JsonDebeziumChangeContext(
+            DorisOptions dorisOptions,
+            Map<String, String> tableMapping,
+            String sourceTableName,
+            String targetDatabase,
+            DorisTableConfig dorisTableConfig,
+            ObjectMapper objectMapper,
+            Pattern pattern,
+            String lineDelimiter,
+            boolean ignoreUpdateBefore,
+            String targetTablePrefix,
+            String targetTableSuffix,
+            boolean enableDelete,
+            TableNameConverter tableNameConverter) {
+        this(
+                dorisOptions,
+                tableMapping,
+                sourceTableName,
+                targetDatabase,
+                dorisTableConfig,
+                objectMapper,
+                pattern,
+                lineDelimiter,
+                ignoreUpdateBefore,
+                targetTablePrefix,
+                targetTableSuffix,
+                enableDelete);
+        this.tableNameConverter = tableNameConverter;
+    }
+
     public DorisOptions getDorisOptions() {
         return dorisOptions;
     }
@@ -119,6 +153,10 @@ public class JsonDebeziumChangeContext implements 
Serializable {
         return targetTableSuffix;
     }
 
+    public TableNameConverter getTableNameConverter() {
+        return tableNameConverter;
+    }
+
     public boolean enableDelete() {
         return enableDelete;
     }
@@ -126,4 +164,9 @@ public class JsonDebeziumChangeContext implements 
Serializable {
     public DorisTableConfig getDorisTableConf() {
         return dorisTableConfig;
     }
+
+    @VisibleForTesting
+    public void setTableNameConverter(TableNameConverter tableNameConverter) {
+        this.tableNameConverter = tableNameConverter;
+    }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
index 91be21fa..16757d27 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChange.java
@@ -33,6 +33,7 @@ import org.apache.doris.flink.sink.writer.EventType;
 import org.apache.doris.flink.tools.cdc.DorisTableConfig;
 import org.apache.doris.flink.tools.cdc.SourceConnector;
 import org.apache.doris.flink.tools.cdc.SourceSchema;
+import org.apache.doris.flink.tools.cdc.converter.TableNameConverter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,6 +72,7 @@ public abstract class JsonDebeziumSchemaChange extends 
CdcSchemaChange {
     protected String targetTablePrefix;
     protected String targetTableSuffix;
     protected DorisTableConfig dorisTableConfig;
+    protected TableNameConverter tableNameConverter;
 
     public abstract boolean schemaChange(JsonNode recordRoot);
 
@@ -196,7 +198,13 @@ public abstract class JsonDebeziumSchemaChange extends 
CdcSchemaChange {
 
     protected String getCreateTableIdentifier(JsonNode record) {
         String table = extractJsonNode(record.get("source"), "table");
-        return targetDatabase + "." + targetTablePrefix + table + 
targetTableSuffix;
+        String createTblName;
+        if (tableNameConverter != null) {
+            createTblName = tableNameConverter.convert(table);
+        } else {
+            createTblName = targetTablePrefix + table + targetTableSuffix;
+        }
+        return targetDatabase + "." + createTblName;
     }
 
     public Map<String, String> getTableMapping() {
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
index 0b9172e8..abd6f55b 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java
@@ -88,6 +88,7 @@ public class JsonDebeziumSchemaChangeImplV2 extends 
JsonDebeziumSchemaChange {
                 changeContext.getTargetTableSuffix() == null
                         ? ""
                         : changeContext.getTargetTableSuffix();
+        this.tableNameConverter = changeContext.getTableNameConverter();
     }
 
     @Override
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
index fd10ba53..eda223d6 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java
@@ -53,6 +53,7 @@ public class SQLParserSchemaChange extends 
JsonDebeziumSchemaChange {
                 changeContext.getTargetTableSuffix() == null
                         ? ""
                         : changeContext.getTargetTableSuffix();
+        this.tableNameConverter = changeContext.getTableNameConverter();
     }
 
     @Override
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index d6c69c0b..06e49e24 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -42,11 +42,11 @@ import org.apache.doris.flink.sink.writer.WriteMode;
 import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer;
 import 
org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerializer;
 import org.apache.doris.flink.table.DorisConfigOptions;
+import org.apache.doris.flink.tools.cdc.converter.TableNameConverter;
 import org.apache.doris.flink.tools.cdc.oracle.OracleDatabaseSync;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Serializable;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.SQLSyntaxErrorException;
@@ -343,6 +343,7 @@ public abstract class DatabaseSync {
                 .setTargetDatabase(database)
                 .setTargetTablePrefix(tablePrefix)
                 .setTargetTableSuffix(tableSuffix)
+                .setTableNameConverter(converter)
                 .build();
     }
 
@@ -618,51 +619,4 @@ public abstract class DatabaseSync {
         this.tableSuffix = tableSuffix;
         return this;
     }
-
-    public static class TableNameConverter implements Serializable {
-        private static final long serialVersionUID = 1L;
-        private final String prefix;
-        private final String suffix;
-        private Map<Pattern, String> multiToOneRulesPattern;
-
-        TableNameConverter() {
-            this("", "");
-        }
-
-        TableNameConverter(String prefix, String suffix) {
-            this.prefix = prefix == null ? "" : prefix;
-            this.suffix = suffix == null ? "" : suffix;
-        }
-
-        TableNameConverter(
-                String prefix, String suffix, Map<Pattern, String> 
multiToOneRulesPattern) {
-            this.prefix = prefix == null ? "" : prefix;
-            this.suffix = suffix == null ? "" : suffix;
-            this.multiToOneRulesPattern = multiToOneRulesPattern;
-        }
-
-        public String convert(String tableName) {
-            if (multiToOneRulesPattern == null) {
-                return prefix + tableName + suffix;
-            }
-
-            String target = null;
-
-            for (Map.Entry<Pattern, String> patternStringEntry :
-                    multiToOneRulesPattern.entrySet()) {
-                if (patternStringEntry.getKey().matcher(tableName).matches()) {
-                    target = patternStringEntry.getValue();
-                }
-            }
-            /**
-             * If multiToOneRulesPattern is not null and target is not 
assigned, then the
-             * synchronization task contains both multi to one and one to one 
, prefixes and
-             * suffixes are added to common one-to-one mapping tables
-             */
-            if (target == null) {
-                return prefix + tableName + suffix;
-            }
-            return target;
-        }
-    }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java
index 22e2b9bc..8315f2da 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/ParsingProcessFunction.java
@@ -26,6 +26,7 @@ import org.apache.flink.util.StringUtils;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.doris.flink.tools.cdc.converter.TableNameConverter;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -33,10 +34,10 @@ import java.util.Map;
 public class ParsingProcessFunction extends ProcessFunction<String, Void> {
     protected ObjectMapper objectMapper = new ObjectMapper();
     private transient Map<String, OutputTag<String>> recordOutputTags;
-    private DatabaseSync.TableNameConverter converter;
+    private TableNameConverter converter;
     private String database;
 
-    public ParsingProcessFunction(String database, 
DatabaseSync.TableNameConverter converter) {
+    public ParsingProcessFunction(String database, TableNameConverter 
converter) {
         this.database = database;
         this.converter = converter;
     }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/converter/TableNameConverter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/converter/TableNameConverter.java
new file mode 100644
index 00000000..a9df452c
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/converter/TableNameConverter.java
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc.converter;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/*
+ * Convert the table name of the upstream data source to the table name of the 
doris database.
+ * */
+public class TableNameConverter implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+    private final String prefix;
+    private final String suffix;
+
+    // tbl_.*, tbl
+    private Map<Pattern, String> routeRules;
+
+    public TableNameConverter() {
+        this("", "");
+    }
+
+    public TableNameConverter(String prefix, String suffix) {
+        this.prefix = prefix == null ? "" : prefix;
+        this.suffix = suffix == null ? "" : suffix;
+    }
+
+    public TableNameConverter(String prefix, String suffix, Map<Pattern, 
String> routeRules) {
+        this.prefix = prefix == null ? "" : prefix;
+        this.suffix = suffix == null ? "" : suffix;
+        this.routeRules = routeRules;
+    }
+
+    public String convert(String tableName) {
+        if (routeRules == null) {
+            return prefix + tableName + suffix;
+        }
+
+        String target = null;
+
+        for (Map.Entry<Pattern, String> patternStringEntry : 
routeRules.entrySet()) {
+            if (patternStringEntry.getKey().matcher(tableName).matches()) {
+                target = patternStringEntry.getValue();
+            }
+        }
+        /**
+         * If routeRules is not null and target is not assigned, then the 
synchronization task
+         * contains both multi to one and one to one , prefixes and suffixes 
are added to common
+         * one-to-one mapping tables
+         */
+        if (target == null) {
+            return prefix + tableName + suffix;
+        }
+        return target;
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunction.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunction.java
index 72c61567..704af781 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunction.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunction.java
@@ -19,8 +19,8 @@ package org.apache.doris.flink.tools.cdc.mongodb;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.NullNode;
-import org.apache.doris.flink.tools.cdc.DatabaseSync.TableNameConverter;
 import org.apache.doris.flink.tools.cdc.ParsingProcessFunction;
+import org.apache.doris.flink.tools.cdc.converter.TableNameConverter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
index 39241f94..fc3f6ffb 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
@@ -28,6 +28,7 @@ import org.apache.doris.flink.rest.RestService;
 import org.apache.doris.flink.rest.models.Schema;
 import org.apache.doris.flink.tools.cdc.DorisTableConfig;
 import org.apache.doris.flink.tools.cdc.SourceConnector;
+import org.apache.doris.flink.tools.cdc.converter.TableNameConverter;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -42,6 +43,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.regex.Pattern;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mockStatic;
@@ -50,7 +52,10 @@ import static org.mockito.Mockito.mockStatic;
 public class TestJsonDebeziumSchemaChangeImplV2 extends 
TestJsonDebeziumChangeBase {
 
     private JsonDebeziumSchemaChangeImplV2 schemaChange;
+    private JsonDebeziumSchemaChangeImplV2 schemaChangeWithConvert;
     private JsonDebeziumChangeContext changeContext;
+    private JsonDebeziumChangeContext convertContext;
+
     private MockedStatic<RestService> mockRestService;
 
     @Before
@@ -86,6 +91,23 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends 
TestJsonDebeziumChangeBa
                         "",
                         true);
         schemaChange = new JsonDebeziumSchemaChangeImplV2(changeContext);
+
+        convertContext =
+                new JsonDebeziumChangeContext(
+                        dorisOptions,
+                        tableMapping,
+                        sourceTableName,
+                        targetDatabase,
+                        new DorisTableConfig(tableProperties),
+                        objectMapper,
+                        null,
+                        lineDelimiter,
+                        ignoreUpdateBefore,
+                        "ods_",
+                        "_dt",
+                        true,
+                        new TableNameConverter("ods_", "_dt"));
+        schemaChangeWithConvert = new 
JsonDebeziumSchemaChangeImplV2(convertContext);
     }
 
     @Test
@@ -489,6 +511,44 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends 
TestJsonDebeziumChangeBa
         schemaChange.setSourceConnector(SourceConnector.MYSQL.connectorName);
     }
 
+    @Test
+    public void testAutoCreateTableWithConvert() throws Exception {
+        String record =
+                "{    \"source\":{        \"version\":\"1.9.7.Final\",        
\"connector\":\"oracle\",        \"name\":\"oracle_logminer\",        
\"ts_ms\":1696945825065,        \"snapshot\":\"true\",        
\"db\":\"TESTDB\",        \"sequence\":null,        \"schema\":\"ADMIN\",       
 \"table\":\"PERSONS\",        \"txId\":null,        \"scn\":\"1199617\",       
 \"commit_scn\":null,        \"lcr_position\":null,        \"rs_id\":null,      
  \"ssn\":0,        \"redo_thread\":null   [...]
+        JsonNode recordRoot = objectMapper.readTree(record);
+        
schemaChangeWithConvert.setSourceConnector(SourceConnector.ORACLE.connectorName);
+        TableSchema tableSchema = 
schemaChangeWithConvert.extractCreateTableSchema(recordRoot);
+        Assert.assertEquals("TESTDB", tableSchema.getDatabase());
+        Assert.assertEquals("ods_PERSONS_dt", tableSchema.getTable());
+        Assert.assertArrayEquals(new String[] {"ID"}, 
tableSchema.getKeys().toArray());
+        Assert.assertEquals(3, tableSchema.getFields().size());
+        Assert.assertEquals("ID", tableSchema.getFields().get("ID").getName());
+        Assert.assertEquals("NAME4", 
tableSchema.getFields().get("NAME4").getName());
+        Assert.assertEquals("age4", 
tableSchema.getFields().get("age4").getName());
+
+        // match table
+        Map<Pattern, String> tableConvert = new HashMap<>();
+        tableConvert.put(Pattern.compile("PER.*"), "PERSONS_RES");
+
+        convertContext.setTableNameConverter(
+                new TableNameConverter("prefix_", "_suffix", tableConvert));
+        schemaChangeWithConvert = new 
JsonDebeziumSchemaChangeImplV2(convertContext);
+        TableSchema tableSchema2 = 
schemaChangeWithConvert.extractCreateTableSchema(recordRoot);
+        Assert.assertEquals("TESTDB", tableSchema2.getDatabase());
+        Assert.assertEquals("PERSONS_RES", tableSchema2.getTable());
+
+        // no match table
+        Map<Pattern, String> tableConvert2 = new HashMap<>();
+        tableConvert2.put(Pattern.compile("NOMATCH.*"), "PERSONS_RES");
+        convertContext.setTableNameConverter(new TableNameConverter("pre_", 
"_suf", tableConvert2));
+        schemaChangeWithConvert = new 
JsonDebeziumSchemaChangeImplV2(convertContext);
+        TableSchema tableSchema3 = 
schemaChangeWithConvert.extractCreateTableSchema(recordRoot);
+        Assert.assertEquals("TESTDB", tableSchema3.getDatabase());
+        Assert.assertEquals("pre_PERSONS_suf", tableSchema3.getTable());
+
+        
schemaChangeWithConvert.setSourceConnector(SourceConnector.MYSQL.connectorName);
+    }
+
     @Test
     public void testDateTimeFullOrigin() throws JsonProcessingException {
         Map<String, Map<String, FieldSchema>> originFieldSchemaMap = new 
LinkedHashMap<>();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to