This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 0eb3aa7  [Feature](CDC) Add auto create table  (#248)
0eb3aa7 is described below

commit 0eb3aa7075befc7478e014bbadd754af17deaf75
Author: wudi <676366...@qq.com>
AuthorDate: Thu Nov 30 17:53:00 2023 +0800

    [Feature](CDC) Add auto create table  (#248)
---
 .../apache/doris/flink/sink/writer/EventType.java} |  25 +---
 .../serializer/JsonDebeziumSchemaSerializer.java   | 147 ++++++++++++++++-----
 .../doris/flink/table/DorisConfigOptions.java      |   2 +-
 .../org/apache/doris/flink/tools/cdc/CdcTools.java |   1 +
 .../apache/doris/flink/tools/cdc/DatabaseSync.java |  27 +++-
 .../flink/tools/cdc/mysql/MysqlDatabaseSync.java   |  10 +-
 .../flink/tools/cdc/oracle/OracleDatabaseSync.java |  10 +-
 .../tools/cdc/postgres/PostgresDatabaseSync.java   |  10 +-
 .../tools/cdc/sqlserver/SqlServerDatabaseSync.java |  10 +-
 .../writer/TestJsonDebeziumSchemaSerializer.java   |  31 ++++-
 .../doris/flink/tools/cdc/DatabaseSyncTest.java    |  16 +++
 11 files changed, 225 insertions(+), 64 deletions(-)

diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/EventType.java
similarity index 51%
copy from 
flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
copy to 
flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/EventType.java
index daab90b..d26bf27 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/EventType.java
@@ -14,27 +14,10 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-package org.apache.doris.flink.tools.cdc;
 
-import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
-import org.junit.Test;
-import java.util.Arrays;
+package org.apache.doris.flink.sink.writer;
 
-/**
- * Unit tests for the {@link DatabaseSync}.
- **/
-public class DatabaseSyncTest {
-    @Test
-    public void multiToOneRulesParserTest() throws Exception{
-        String[][] testCase = {
-                {"a_.*|b_.*","a|b"} //  Normal condition
-//                ,{"a_.*|b_.*","a|b|c"} // Unequal length
-//                ,{"",""} // Null value
-//                ,{"***....","a"} // Abnormal regular expression
-        };
-        DatabaseSync databaseSync = new MysqlDatabaseSync();
-        Arrays.stream(testCase).forEach(arr->{
-            databaseSync.multiToOneRulesParser(arr[0], arr[1]);
-        });
-    }
+public enum EventType {
+    ALTER,
+    CREATE
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
index 0c4309b..29d9cee 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java
@@ -26,12 +26,14 @@ import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.NullNode;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.doris.flink.catalog.doris.FieldSchema;
+import org.apache.doris.flink.catalog.doris.TableSchema;
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.exception.IllegalArgumentException;
 import org.apache.doris.flink.sink.schema.SchemaChangeHelper;
 import org.apache.doris.flink.sink.schema.SchemaChangeHelper.DDLSchema;
 import org.apache.doris.flink.sink.schema.SchemaChangeManager;
+import org.apache.doris.flink.sink.writer.EventType;
 import org.apache.doris.flink.tools.cdc.SourceConnector;
 import org.apache.doris.flink.tools.cdc.SourceSchema;
 import org.apache.doris.flink.tools.cdc.mysql.MysqlType;
@@ -41,6 +43,7 @@ import 
org.apache.doris.flink.tools.cdc.sqlserver.SqlServerType;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,6 +51,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -67,7 +71,6 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
     private static final String OP_CREATE = "c"; // insert
     private static final String OP_UPDATE = "u"; // update
     private static final String OP_DELETE = "d"; // delete
-
     public static final String EXECUTE_DDL = "ALTER TABLE %s %s COLUMN %s %s"; 
// alter table tbl add cloumn aca int
     private static final String addDropDDLRegex
             = 
"ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+(COLUMN\\s+)?([^\\s]+)(\\s+([^\\s]+))?.*";
@@ -214,25 +217,40 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
                 return false;
             }
 
-            // db,table
-            Tuple2<String, String> tuple = getDorisTableTuple(recordRoot);
-            if(tuple == null){
+            EventType eventType = extractEventType(recordRoot);
+            if(eventType == null){
                 return false;
             }
-
-            List<String> ddlSqlList = extractDDLList(recordRoot);
-            if (CollectionUtils.isEmpty(ddlSqlList)) {
-                LOG.info("ddl can not do schema change:{}", recordRoot);
-                return false;
-            }
-
-            List<DDLSchema> ddlSchemas = SchemaChangeHelper.getDdlSchemas();
-            for (int i = 0; i < ddlSqlList.size(); i++) {
-                DDLSchema ddlSchema = ddlSchemas.get(i);
-                String ddlSql = ddlSqlList.get(i);
-                boolean doSchemaChange = checkSchemaChange(tuple.f0, tuple.f1, 
ddlSchema);
-                status = doSchemaChange && schemaChangeManager.execute(ddlSql, 
tuple.f0);
-                LOG.info("schema change status:{}, ddl:{}", status, ddlSql);
+            if(eventType.equals(EventType.CREATE)){
+                TableSchema tableSchema = extractCreateTableSchema(recordRoot);
+                status = schemaChangeManager.createTable(tableSchema);
+                if(status){
+                    String cdcTbl = getCdcTableIdentifier(recordRoot);
+                    String dorisTbl = getCreateTableIdentifier(recordRoot);
+                    tableMapping.put(cdcTbl, dorisTbl);
+                    LOG.info("create table ddl status: {}", status);
+                }
+            } else if (eventType.equals(EventType.ALTER)){
+                // db,table
+                Tuple2<String, String> tuple = getDorisTableTuple(recordRoot);
+                if(tuple == null){
+                    return false;
+                }
+                List<String> ddlSqlList = extractDDLList(recordRoot);
+                if (CollectionUtils.isEmpty(ddlSqlList)) {
+                    LOG.info("ddl can not do schema change:{}", recordRoot);
+                    return false;
+                }
+                List<DDLSchema> ddlSchemas = 
SchemaChangeHelper.getDdlSchemas();
+                for (int i = 0; i < ddlSqlList.size(); i++) {
+                    DDLSchema ddlSchema = ddlSchemas.get(i);
+                    String ddlSql = ddlSqlList.get(i);
+                    boolean doSchemaChange = checkSchemaChange(tuple.f0, 
tuple.f1, ddlSchema);
+                    status = doSchemaChange && 
schemaChangeManager.execute(ddlSql, tuple.f0);
+                    LOG.info("schema change status:{}, ddl:{}", status, 
ddlSql);
+                }
+            } else{
+                LOG.info("Unsupported event type {}", eventType);
             }
         } catch (Exception ex) {
             LOG.warn("schema change error :", ex);
@@ -240,18 +258,26 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
         return status;
     }
 
+    protected JsonNode extractTableChange(JsonNode record) throws 
JsonProcessingException {
+        JsonNode historyRecord = extractHistoryRecord(record);
+        JsonNode tableChanges = historyRecord.get("tableChanges");
+        if(!Objects.isNull(tableChanges)){
+            JsonNode tableChange = tableChanges.get(0);
+            return tableChange;
+        }
+        return null;
+    }
+
+    /**
+     * Parse Alter Event
+     */
     @VisibleForTesting
-    public List<String> extractDDLList(JsonNode record) throws 
JsonProcessingException {
+    public List<String> extractDDLList(JsonNode record) throws IOException{
         String dorisTable = getDorisTableIdentifier(record);
         JsonNode historyRecord = extractHistoryRecord(record);
-        JsonNode tableChanges = historyRecord.get("tableChanges");
         String ddl = extractJsonNode(historyRecord, "ddl");
-        if (Objects.isNull(tableChanges) || Objects.isNull(ddl)) {
-            return new ArrayList<>();
-        }
-        LOG.debug("received debezium ddl :{}", ddl);
-        JsonNode tableChange = tableChanges.get(0);
-        if (Objects.isNull(tableChange) || 
!tableChange.get("type").asText().equals("ALTER")) {
+        JsonNode tableChange = extractTableChange(record);
+        if (Objects.isNull(tableChange) || Objects.isNull(ddl)) {
             return null;
         }
 
@@ -285,6 +311,47 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
         return SchemaChangeHelper.generateDDLSql(dorisTable);
     }
 
+    @VisibleForTesting
+    public TableSchema extractCreateTableSchema(JsonNode record) throws 
JsonProcessingException {
+        String dorisTable = getCreateTableIdentifier(record);
+        JsonNode tableChange =  extractTableChange(record);
+        JsonNode pkColumns = 
tableChange.get("table").get("primaryKeyColumnNames");
+        JsonNode columns = tableChange.get("table").get("columns");
+        String tblComment = tableChange.get("table").get("comment").asText();
+        Map<String, FieldSchema> field = new LinkedHashMap<>();
+        for (JsonNode column : columns) {
+            buildFieldSchema(field, column);
+        }
+        List<String> pkList = new ArrayList<>();
+        for(JsonNode column : pkColumns){
+            String fieldName = column.asText();
+            pkList.add(fieldName);
+        }
+
+        TableSchema tableSchema = new TableSchema();
+        tableSchema.setFields(field);
+        tableSchema.setKeys(pkList);
+        tableSchema.setDistributeKeys(buildDistributeKeys(pkList, field));
+        tableSchema.setTableComment(tblComment);
+
+        String[] split = dorisTable.split("\\.");
+        Preconditions.checkArgument(split.length == 2);
+        tableSchema.setDatabase(split[0]);
+        tableSchema.setTable(split[1]);
+        return tableSchema;
+    }
+
+    private List<String> buildDistributeKeys(List<String> primaryKeys, 
Map<String, FieldSchema> fields) {
+        if (!CollectionUtil.isNullOrEmpty(primaryKeys)) {
+            return primaryKeys;
+        }
+        if(!fields.isEmpty()){
+            Map.Entry<String, FieldSchema> firstField = 
fields.entrySet().iterator().next();
+            return Collections.singletonList(firstField.getKey());
+        }
+        return new ArrayList<>();
+    }
+
     @VisibleForTesting
     public void setOriginFieldSchemaMap(Map<String, FieldSchema> 
originFieldSchemaMap) {
         this.originFieldSchemaMap = originFieldSchemaMap;
@@ -334,6 +401,12 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
         return SourceSchema.getString(db, schema, table);
     }
 
+    public String getCreateTableIdentifier(JsonNode record){
+        String db = extractJsonNode(record.get("source"), "db");
+        String table = extractJsonNode(record.get("source"), "table");
+        return db + "." + table;
+    }
+
     public String getDorisTableIdentifier(String cdcTableIdentifier){
         
if(!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())){
             return dorisOptions.getTableIdentifier();
@@ -405,6 +478,23 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
         return extractJsonNode(record.get("source"), "table");
     }
 
+    /**
+     * Parse event type
+     */
+    protected EventType extractEventType(JsonNode record) throws 
JsonProcessingException {
+        JsonNode tableChange = extractTableChange(record);
+        if(tableChange == null || tableChange.get("type") == null){
+            return null;
+        }
+        String type = tableChange.get("type").asText();
+        if(EventType.ALTER.toString().equalsIgnoreCase(type)){
+            return EventType.ALTER;
+        }else if(EventType.CREATE.toString().equalsIgnoreCase(type)){
+            return EventType.CREATE;
+        }
+        return null;
+    }
+
     private String extractJsonNode(JsonNode record, String key) {
         return record != null && record.get(key) != null &&
                 !(record.get(key) instanceof NullNode) ? 
record.get(key).asText() : null;
@@ -425,7 +515,7 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
     }
 
     private JsonNode extractHistoryRecord(JsonNode record) throws 
JsonProcessingException {
-        if (record.has("historyRecord")) {
+        if (record != null && record.has("historyRecord")) {
             return objectMapper.readTree(record.get("historyRecord").asText());
         }
         // The ddl passed by some scenes will not be included in the 
historyRecord, such as DebeziumSourceFunction
@@ -452,8 +542,6 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
         return null;
     }
 
-
-
     @VisibleForTesting
     public void fillOriginSchema(JsonNode columns) {
         if (Objects.nonNull(originFieldSchemaMap)) {
@@ -623,5 +711,4 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
         return type;
 
     }
-
 }
\ No newline at end of file
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index af01ea5..546dc6e 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -89,7 +89,7 @@ public class DorisConfigOptions {
             .defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT)
             .withDescription("");
     public static final ConfigOption<Integer> DORIS_DESERIALIZE_QUEUE_SIZE = 
ConfigOptions
-            .key("doris.request.retriesdoris.deserialize.queue.size")
+            .key("doris.deserialize.queue.size")
             .intType()
             .defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT)
             .withDescription("");
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
index d05fa15..fe35ee3 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
@@ -129,6 +129,7 @@ public class CdcTools {
                 .setTableConfig(tableMap)
                 .setCreateTableOnly(createTableOnly)
                 .setNewSchemaChange(useNewSchemaChange)
+                .setSingleSink(singleSink)
                 .create();
         databaseSync.build();
         if(StringUtils.isNullOrWhitespaceOnly(jobName)){
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index bf26214..02ab034 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -45,6 +45,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 public abstract class DatabaseSync {
     private static final Logger LOG = 
LoggerFactory.getLogger(DatabaseSync.class);
@@ -83,6 +84,11 @@ public abstract class DatabaseSync {
 
     public abstract DataStreamSource<String> 
buildCdcSource(StreamExecutionEnvironment env);
 
+    /**
+     * Get the prefix of a specific tableList, for example, mysql is database, 
oracle is schema
+     */
+    public abstract String getTableListPrefix();
+
     public DatabaseSync() throws SQLException {
         registerDriver();
     }
@@ -132,8 +138,7 @@ public abstract class DatabaseSync {
             System.out.println("Create table finished.");
             System.exit(0);
         }
-
-        config.setString(TABLE_NAME_OPTIONS, "(" + String.join("|", 
syncTables) + ")");
+        config.setString(TABLE_NAME_OPTIONS, getSyncTableList(syncTables));
         DataStreamSource<String> streamSource = buildCdcSource(env);
         if(singleSink){
             streamSource.sinkTo(buildDorisSink());
@@ -256,6 +261,24 @@ public abstract class DatabaseSync {
         LOG.debug("table {} is synchronized? {}", tableName, sync);
         return sync;
     }
+
+    protected String getSyncTableList(List<String> syncTables){
+        if(!singleSink){
+            return syncTables.stream()
+                    .map(v-> getTableListPrefix() + "\\." + v)
+                    .collect(Collectors.joining("|"));
+        }else{
+            // includingTablePattern and ^excludingPattern
+            String includingPattern = String.format("(%s)\\.(%s)", 
getTableListPrefix(), includingTables);
+            if (StringUtils.isNullOrWhitespaceOnly(excludingTables)) {
+                return includingPattern;
+            }else{
+                String excludingPattern = String.format("?!(%s\\.(%s))$", 
getTableListPrefix(), excludingTables);
+                return String.format("(%s)(%s)", includingPattern, 
excludingPattern);
+            }
+        }
+    }
+
     /**
      * Filter table that many tables merge to one
      */
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
index 22e49aa..a3e01d3 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
@@ -122,7 +122,9 @@ public class MysqlDatabaseSync extends DatabaseSync {
                 .username(config.get(MySqlSourceOptions.USERNAME))
                 .password(config.get(MySqlSourceOptions.PASSWORD))
                 .databaseList(databaseName)
-                .tableList(databaseName + "." + tableName);
+                .tableList(tableName)
+                //default open add newly table
+                .scanNewlyAddedTableEnabled(true);
 
         
config.getOptional(MySqlSourceOptions.SERVER_ID).ifPresent(sourceBuilder::serverId);
         config
@@ -215,6 +217,12 @@ public class MysqlDatabaseSync extends DatabaseSync {
                 mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
     }
 
+    @Override
+    public String getTableListPrefix() {
+        String databaseName = config.get(MySqlSourceOptions.DATABASE_NAME);
+        return databaseName;
+    }
+
     /**
      * set chunkkeyColumn,eg: db.table1:column1,db.table2:column2
      * @param sourceBuilder
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
index 6e27eb6..92c1f95 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
@@ -174,7 +174,7 @@ public class OracleDatabaseSync extends DatabaseSync {
                     .port(port)
                     .databaseList(databaseName)
                     .schemaList(schemaName)
-                    .tableList(schemaName + "." + tableName)
+                    .tableList(tableName)
                     .username(username)
                     .password(password)
                     .includeSchemaChanges(true)
@@ -199,7 +199,7 @@ public class OracleDatabaseSync extends DatabaseSync {
                     .password(password)
                     .database(databaseName)
                     .schemaList(schemaName)
-                    .tableList(schemaName + "." + tableName)
+                    .tableList(tableName)
                     .debeziumProperties(debeziumProperties)
                     .startupOptions(startupOptions)
                     .deserializer(schema)
@@ -207,4 +207,10 @@ public class OracleDatabaseSync extends DatabaseSync {
             return env.addSource(oracleSource, "Oracle Source");
         }
     }
+
+    @Override
+    public String getTableListPrefix() {
+        String schemaName = config.get(OracleSourceOptions.SCHEMA_NAME);
+        return schemaName;
+    }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
index b8c9ad1..04b4127 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.java
@@ -159,7 +159,7 @@ public class PostgresDatabaseSync extends DatabaseSync {
                     .port(port)
                     .database(databaseName)
                     .schemaList(schemaName)
-                    .tableList(schemaName + "." + tableName)
+                    .tableList(tableName)
                     .username(username)
                     .password(password)
                     .deserializer(schema)
@@ -185,7 +185,7 @@ public class PostgresDatabaseSync extends DatabaseSync {
                     .port(port)
                     .database(databaseName)
                     .schemaList(schemaName)
-                    .tableList(schemaName + "." + tableName)
+                    .tableList(tableName)
                     .username(username)
                     .password(password)
                     .debeziumProperties(debeziumProperties)
@@ -196,4 +196,10 @@ public class PostgresDatabaseSync extends DatabaseSync {
             return env.addSource(postgresSource, "Postgres Source");
         }
     }
+
+    @Override
+    public String getTableListPrefix() {
+        String schemaName = config.get(PostgresSourceOptions.SCHEMA_NAME);
+        return schemaName;
+    }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
index fb25212..10db3c1 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/sqlserver/SqlServerDatabaseSync.java
@@ -153,7 +153,7 @@ public class SqlServerDatabaseSync extends DatabaseSync {
                     .hostname(hostname)
                     .port(port)
                     .databaseList(databaseName)
-                    .tableList(schemaName + "." + tableName)
+                    .tableList(tableName)
                     .username(username)
                     .password(password)
                     .startupOptions(startupOptions)
@@ -175,7 +175,7 @@ public class SqlServerDatabaseSync extends DatabaseSync {
                     .hostname(hostname)
                     .port(port)
                     .database(databaseName)
-                    .tableList(schemaName + "." + tableName)
+                    .tableList(tableName)
                     .username(username)
                     .password(password)
                     .debeziumProperties(debeziumProperties)
@@ -185,4 +185,10 @@ public class SqlServerDatabaseSync extends DatabaseSync {
             return env.addSource(sqlServerSource, "SqlServer Source");
         }
     }
+
+    @Override
+    public String getTableListPrefix() {
+        String schemaName = config.get(JdbcSourceOptions.SCHEMA_NAME);
+        return schemaName;
+    }
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
index de549ef..32aedab 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
@@ -20,8 +20,9 @@ package org.apache.doris.flink.sink.writer;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
-
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.doris.flink.catalog.doris.FieldSchema;
+import org.apache.doris.flink.catalog.doris.TableSchema;
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
@@ -29,9 +30,8 @@ import org.apache.doris.flink.exception.DorisException;
 import org.apache.doris.flink.rest.RestService;
 import org.apache.doris.flink.rest.models.Field;
 import org.apache.doris.flink.rest.models.Schema;
-
-import org.apache.commons.collections.CollectionUtils;
 import 
org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerializer;
+import org.apache.doris.flink.tools.cdc.SourceConnector;
 import org.apache.flink.shaded.guava30.com.google.common.collect.Maps;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -198,8 +198,10 @@ public class TestJsonDebeziumSchemaSerializer {
         String record
                 = 
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"oracle\",\"name\":\"oracle_logminer\",\"ts_ms\":1696945825065,\"snapshot\":\"true\",\"db\":\"HELOWIN\",\"sequence\":null,\"schema\":\"ADMIN\",\"table\":\"PERSONS\",\"txId\":null,\"scn\":\"1199617\",\"commit_scn\":null,\"lcr_position\":null,\"rs_id\":null,\"ssn\":0,\"redo_thread\":null},\"databaseName\":\"HELOWIN\",\"schemaName\":\"ADMIN\",\"ddl\":\"\\n
  CREATE TABLE \\\"ADMIN\\\".\\\"PERSONS\\\" \\n   (\\t\\\"ID\ [...]
         JsonNode recordRoot = objectMapper.readTree(record);
+        serializer.setSourceConnector("oracle");
         List<String> ddlSQLList = serializer.extractDDLList(recordRoot);
         Assert.assertTrue(CollectionUtils.isEmpty(ddlSQLList));
+        serializer.setSourceConnector("mysql");
     }
 
     @Test
@@ -419,4 +421,27 @@ public class TestJsonDebeziumSchemaSerializer {
         dorisOptions.setTableIdentifier(tmp);
     }
 
+    @Test
+    @Ignore
+    public void testAutoCreateTable() throws Exception {
+         String record
+                = "{    \"source\":{        \"version\":\"1.9.7.Final\",       
 \"connector\":\"oracle\",        \"name\":\"oracle_logminer\",        
\"ts_ms\":1696945825065,        \"snapshot\":\"true\",        
\"db\":\"TESTDB\",        \"sequence\":null,        \"schema\":\"ADMIN\",       
 \"table\":\"PERSONS\",        \"txId\":null,        \"scn\":\"1199617\",       
 \"commit_scn\":null,        \"lcr_position\":null,        \"rs_id\":null,      
  \"ssn\":0,        \"redo_thread\":null [...]
+        JsonNode recordRoot = objectMapper.readTree(record);
+        dorisOptions = DorisOptions.builder().setFenodes("127.0.0.1:8030")
+                .setTableIdentifier("")
+                .setUsername("root")
+                .setPassword("").build();
+        serializer = 
JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build();
+        serializer.setSourceConnector(SourceConnector.ORACLE.connectorName);
+        TableSchema tableSchema = 
serializer.extractCreateTableSchema(recordRoot);
+        Assert.assertEquals("TESTDB", tableSchema.getDatabase());
+        Assert.assertEquals("PERSONS", tableSchema.getTable());
+        Assert.assertArrayEquals(new String[]{"ID"}, 
tableSchema.getKeys().toArray());
+        Assert.assertEquals(3, tableSchema.getFields().size());
+        Assert.assertEquals("ID", tableSchema.getFields().get("ID").getName());
+        Assert.assertEquals("NAME4", 
tableSchema.getFields().get("NAME4").getName());
+        Assert.assertEquals("age4", 
tableSchema.getFields().get("age4").getName());
+        serializer.setSourceConnector(SourceConnector.MYSQL.connectorName);
+    }
+
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
index daab90b..fb2c8d6 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/DatabaseSyncTest.java
@@ -17,7 +17,10 @@
 package org.apache.doris.flink.tools.cdc;
 
 import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync;
+import org.apache.flink.configuration.Configuration;
+import org.junit.Assert;
 import org.junit.Test;
+
 import java.util.Arrays;
 
 /**
@@ -37,4 +40,17 @@ public class DatabaseSyncTest {
             databaseSync.multiToOneRulesParser(arr[0], arr[1]);
         });
     }
+
+    @Test
+    public void getSyncTableListTest() throws Exception{
+        DatabaseSync databaseSync = new MysqlDatabaseSync();
+        databaseSync.setSingleSink(false);
+        databaseSync.setIncludingTables("tbl_1|tbl_2");
+        Configuration config = new Configuration();
+        config.setString("database-name", "db");
+        config.setString("table-name", "tbl.*");
+        databaseSync.setConfig(config);
+        String syncTableList = 
databaseSync.getSyncTableList(Arrays.asList("tbl_1", "tbl_2"));
+        Assert.assertEquals("db\\.tbl_1|db\\.tbl_2", syncTableList);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to