This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new be72511f [improve] auto create Doris table for new MongoDB collections 
in Data (#573)
be72511f is described below

commit be72511f15f1fcfe2ec9fb110c76d0920dab7cee
Author: kwonder0926 <76644839+kwonder0...@users.noreply.github.com>
AuthorDate: Thu Mar 27 18:57:18 2025 +0800

    [improve] auto create Doris table for new MongoDB collections in Data (#573)
---
 .../apache/doris/flink/tools/cdc/DatabaseSync.java |  46 ++------
 .../tools/cdc/mongodb/MongoDBDatabaseSync.java     |   3 +
 .../flink/tools/cdc/mongodb/MongoDBSchema.java     |  23 ++++
 .../MongoDBJsonDebeziumSchemaSerializer.java       |  29 +++++-
 .../serializer/MongoJsonDebeziumDataChange.java    |  19 +---
 .../serializer/MongoJsonDebeziumSchemaChange.java  |  56 +++++++++-
 .../flink/tools/cdc/utils/DorisTableUtil.java      | 109 +++++++++++++++++++
 .../flink/tools/cdc/utils/JsonNodeExtractUtil.java |  62 +++++++++++
 .../tools/cdc/mongodb/MongoDBCreateTableTest.java  | 116 +++++++++++++++++++++
 9 files changed, 404 insertions(+), 59 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 06e49e24..326b4e87 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -28,14 +28,12 @@ import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
-import org.apache.doris.flink.catalog.doris.DorisSchemaFactory;
 import org.apache.doris.flink.catalog.doris.DorisSystem;
 import org.apache.doris.flink.catalog.doris.TableSchema;
 import org.apache.doris.flink.cfg.DorisConnectionOptions;
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
-import org.apache.doris.flink.exception.DorisSystemException;
 import org.apache.doris.flink.sink.DorisSink;
 import org.apache.doris.flink.sink.schema.SchemaChangeMode;
 import org.apache.doris.flink.sink.writer.WriteMode;
@@ -44,12 +42,12 @@ import 
org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerialize
 import org.apache.doris.flink.table.DorisConfigOptions;
 import org.apache.doris.flink.tools.cdc.converter.TableNameConverter;
 import org.apache.doris.flink.tools.cdc.oracle.OracleDatabaseSync;
+import org.apache.doris.flink.tools.cdc.utils.DorisTableUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.sql.Connection;
 import java.sql.SQLException;
-import java.sql.SQLSyntaxErrorException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -150,7 +148,13 @@ public abstract class DatabaseSync {
             // Calculate the mapping relationship between upstream and 
downstream tables
             tableMapping.put(
                     schema.getTableIdentifier(), String.format("%s.%s", 
targetDb, dorisTable));
-            tryCreateTableIfAbsent(dorisSystem, targetDb, dorisTable, schema);
+            DorisTableUtil.tryCreateTableIfAbsent(
+                    dorisSystem,
+                    targetDb,
+                    dorisTable,
+                    schema,
+                    dorisTableConfig,
+                    ignoreIncompatible);
 
             if (!dorisTables.contains(Tuple2.of(targetDb, dorisTable))) {
                 dorisTables.add(Tuple2.of(targetDb, dorisTable));
@@ -470,40 +474,6 @@ public abstract class DatabaseSync {
         }
     }
 
-    private void tryCreateTableIfAbsent(
-            DorisSystem dorisSystem, String targetDb, String dorisTable, 
SourceSchema schema) {
-        if (!dorisSystem.tableExists(targetDb, dorisTable)) {
-            if (dorisTableConfig.isConvertUniqToPk()
-                    && CollectionUtil.isNullOrEmpty(schema.primaryKeys)
-                    && !CollectionUtil.isNullOrEmpty(schema.uniqueIndexs)) {
-                schema.primaryKeys = new ArrayList<>(schema.uniqueIndexs);
-            }
-            TableSchema dorisSchema =
-                    DorisSchemaFactory.createTableSchema(
-                            targetDb,
-                            dorisTable,
-                            schema.getFields(),
-                            schema.getPrimaryKeys(),
-                            dorisTableConfig,
-                            schema.getTableComment());
-            try {
-                dorisSystem.createTable(dorisSchema);
-            } catch (Exception ex) {
-                handleTableCreationFailure(ex);
-            }
-        }
-    }
-
-    private void handleTableCreationFailure(Exception ex) throws 
DorisSystemException {
-        if (ignoreIncompatible && ex.getCause() instanceof 
SQLSyntaxErrorException) {
-            LOG.warn(
-                    "Doris schema and source table schema are not compatible. 
Error: {} ",
-                    ex.getCause().toString());
-        } else {
-            throw new DorisSystemException("Failed to create table due to: ", 
ex);
-        }
-    }
-
     protected Properties getJdbcProperties() {
         Properties jdbcProps = new Properties();
         for (Map.Entry<String, String> entry : config.toMap().entrySet()) {
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
index ca034ac3..b77ec0e3 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
@@ -240,6 +240,9 @@ public class MongoDBDatabaseSync extends DatabaseSync {
                 .setTableMapping(tableMapping)
                 .setTableConf(dorisTableConfig)
                 .setTargetDatabase(database)
+                .setTargetTablePrefix(tablePrefix)
+                .setTargetTableSuffix(tableSuffix)
+                .setTableNameConverter(converter)
                 .build();
     }
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
index 984419bc..2e2788de 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java
@@ -20,6 +20,7 @@ package org.apache.doris.flink.tools.cdc.mongodb;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.tuple.Tuple2;
 
+import com.fasterxml.jackson.databind.JsonNode;
 import org.apache.doris.flink.catalog.doris.DorisType;
 import org.apache.doris.flink.catalog.doris.FieldSchema;
 import org.apache.doris.flink.tools.cdc.SourceSchema;
@@ -64,6 +65,28 @@ public class MongoDBSchema extends SourceSchema {
         primaryKeys.add("_id");
     }
 
+    public MongoDBSchema(
+            JsonNode jsonData, String databaseName, String tableName, String 
tableComment)
+            throws Exception {
+        super(databaseName, null, tableName, tableComment);
+        fields = new LinkedHashMap<>();
+        processSampleData(jsonData);
+
+        primaryKeys = new ArrayList<>();
+        primaryKeys.add("_id");
+    }
+
+    @VisibleForTesting
+    protected void processSampleData(JsonNode data) {
+        data.fieldNames()
+                .forEachRemaining(
+                        fieldName -> {
+                            JsonNode value = data.get(fieldName);
+                            String dorisType = 
MongoDBType.jsonNodeToDorisType(value);
+                            fields.put(fieldName, new FieldSchema(fieldName, 
dorisType, null));
+                        });
+    }
+
     @VisibleForTesting
     protected void processSampleData(Document sampleData) {
         for (Map.Entry<String, Object> entry : sampleData.entrySet()) {
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java
index 296a3727..dec06e91 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java
@@ -29,6 +29,7 @@ import 
org.apache.doris.flink.sink.writer.serializer.jsondebezium.CdcDataChange;
 import 
org.apache.doris.flink.sink.writer.serializer.jsondebezium.CdcSchemaChange;
 import 
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeContext;
 import org.apache.doris.flink.tools.cdc.DorisTableConfig;
+import org.apache.doris.flink.tools.cdc.converter.TableNameConverter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -62,6 +63,7 @@ public class MongoDBJsonDebeziumSchemaSerializer implements 
DorisRecordSerialize
 
     private String targetTablePrefix;
     private String targetTableSuffix;
+    private TableNameConverter tableNameConverter;
 
     public MongoDBJsonDebeziumSchemaSerializer(
             DorisOptions dorisOptions,
@@ -72,7 +74,8 @@ public class MongoDBJsonDebeziumSchemaSerializer implements 
DorisRecordSerialize
             DorisTableConfig dorisTableConfig,
             String targetDatabase,
             String targetTablePrefix,
-            String targetTableSuffix) {
+            String targetTableSuffix,
+            TableNameConverter tableNameConverter) {
         this.dorisOptions = dorisOptions;
         this.pattern = pattern;
         this.sourceTableName = sourceTableName;
@@ -85,6 +88,7 @@ public class MongoDBJsonDebeziumSchemaSerializer implements 
DorisRecordSerialize
         this.targetDatabase = targetDatabase;
         this.targetTablePrefix = targetTablePrefix;
         this.targetTableSuffix = targetTableSuffix;
+        this.tableNameConverter = tableNameConverter;
         if (executionOptions != null) {
             this.lineDelimiter =
                     executionOptions
@@ -111,6 +115,7 @@ public class MongoDBJsonDebeziumSchemaSerializer implements 
DorisRecordSerialize
                         targetTablePrefix,
                         targetTableSuffix,
                         enableDelete);
+        changeContext.setTableNameConverter(tableNameConverter);
         this.dataChange = new MongoJsonDebeziumDataChange(changeContext);
         this.schemaChange = new MongoJsonDebeziumSchemaChange(changeContext);
     }
@@ -143,6 +148,7 @@ public class MongoDBJsonDebeziumSchemaSerializer implements 
DorisRecordSerialize
         private String targetDatabase;
         private String targetTablePrefix = "";
         private String targetTableSuffix = "";
+        private TableNameConverter tableNameConverter;
 
         public MongoDBJsonDebeziumSchemaSerializer.Builder setDorisOptions(
                 DorisOptions dorisOptions) {
@@ -192,6 +198,24 @@ public class MongoDBJsonDebeziumSchemaSerializer 
implements DorisRecordSerialize
             return this;
         }
 
+        public MongoDBJsonDebeziumSchemaSerializer.Builder 
setTargetTablePrefix(
+                String targetTablePrefix) {
+            this.targetTablePrefix = targetTablePrefix;
+            return this;
+        }
+
+        public MongoDBJsonDebeziumSchemaSerializer.Builder 
setTargetTableSuffix(
+                String targetTableSuffix) {
+            this.targetTableSuffix = targetTableSuffix;
+            return this;
+        }
+
+        public MongoDBJsonDebeziumSchemaSerializer.Builder 
setTableNameConverter(
+                TableNameConverter converter) {
+            this.tableNameConverter = converter;
+            return this;
+        }
+
         public MongoDBJsonDebeziumSchemaSerializer build() {
             return new MongoDBJsonDebeziumSchemaSerializer(
                     dorisOptions,
@@ -202,7 +226,8 @@ public class MongoDBJsonDebeziumSchemaSerializer implements 
DorisRecordSerialize
                     dorisTableConfig,
                     targetDatabase,
                     targetTablePrefix,
-                    targetTableSuffix);
+                    targetTableSuffix,
+                    tableNameConverter);
         }
     }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java
index 9dbe7ffe..12fa8581 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java
@@ -20,7 +20,6 @@ package org.apache.doris.flink.tools.cdc.mongodb.serializer;
 import org.apache.flink.util.StringUtils;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.NullNode;
@@ -30,6 +29,7 @@ import 
org.apache.doris.flink.sink.writer.serializer.DorisRecord;
 import 
org.apache.doris.flink.sink.writer.serializer.jsondebezium.CdcDataChange;
 import 
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeContext;
 import org.apache.doris.flink.tools.cdc.SourceSchema;
+import org.apache.doris.flink.tools.cdc.utils.JsonNodeExtractUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -127,16 +127,7 @@ public class MongoJsonDebeziumDataChange extends 
CdcDataChange implements Change
     @Override
     public Map<String, Object> extractAfterRow(JsonNode recordRoot) {
         JsonNode dataNode = recordRoot.get(FIELD_DATA);
-        Map<String, Object> rowMap = extractRow(dataNode);
-        String objectId;
-        // if user specifies the `_id` field manually, the $oid field may not 
exist
-        if (rowMap.get(ID_FIELD) instanceof Map<?, ?>) {
-            objectId = ((Map<?, ?>) 
rowMap.get(ID_FIELD)).get(OID_FIELD).toString();
-        } else {
-            objectId = rowMap.get(ID_FIELD).toString();
-        }
-        rowMap.put(ID_FIELD, objectId);
-        return rowMap;
+        return JsonNodeExtractUtil.extractAfterRow(dataNode, objectMapper);
     }
 
     private Map<String, Object> extractDeleteRow(JsonNode recordRoot)
@@ -154,10 +145,4 @@ public class MongoJsonDebeziumDataChange extends 
CdcDataChange implements Change
         row.put(ID_FIELD, objectId);
         return row;
     }
-
-    private Map<String, Object> extractRow(JsonNode recordRow) {
-        Map<String, Object> recordMap =
-                objectMapper.convertValue(recordRow, new 
TypeReference<Map<String, Object>>() {});
-        return recordMap != null ? recordMap : new HashMap<>();
-    }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java
index c67e856e..cfcf53f2 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java
@@ -17,10 +17,14 @@
 
 package org.apache.doris.flink.tools.cdc.mongodb.serializer;
 
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.StringUtils;
+
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.NullNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.doris.flink.catalog.doris.DataModel;
 import org.apache.doris.flink.catalog.doris.DorisSystem;
 import org.apache.doris.flink.catalog.doris.FieldSchema;
 import org.apache.doris.flink.cfg.DorisOptions;
@@ -30,8 +34,11 @@ import 
org.apache.doris.flink.sink.schema.SchemaChangeManager;
 import 
org.apache.doris.flink.sink.writer.serializer.jsondebezium.CdcSchemaChange;
 import 
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeContext;
 import org.apache.doris.flink.tools.cdc.SourceSchema;
+import org.apache.doris.flink.tools.cdc.mongodb.MongoDBSchema;
 import org.apache.doris.flink.tools.cdc.mongodb.MongoDBType;
 import org.apache.doris.flink.tools.cdc.mongodb.MongoDateConverter;
+import org.apache.doris.flink.tools.cdc.utils.DorisTableUtil;
+import org.apache.doris.flink.tools.cdc.utils.JsonNodeExtractUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,17 +67,19 @@ public class MongoJsonDebeziumSchemaChange extends 
CdcSchemaChange {
 
     private final Map<String, Map<String, String>> tableFields;
 
-    private final SchemaChangeManager schemaChangeManager;
+    private SchemaChangeManager schemaChangeManager;
 
-    private final DorisSystem dorisSystem;
+    private DorisSystem dorisSystem;
 
     public Map<String, String> tableMapping;
     private final DorisOptions dorisOptions;
+    public JsonDebeziumChangeContext changeContext;
 
     private final Set<String> specialFields =
             new HashSet<>(Arrays.asList(DATE_FIELD, TIMESTAMP_FIELD, 
DECIMAL_FIELD, LONG_FIELD));
 
     public MongoJsonDebeziumSchemaChange(JsonDebeziumChangeContext 
changeContext) {
+        this.changeContext = changeContext;
         this.objectMapper = changeContext.getObjectMapper();
         this.dorisOptions = changeContext.getDorisOptions();
         this.tableFields = new HashMap<>();
@@ -96,6 +105,35 @@ public class MongoJsonDebeziumSchemaChange extends 
CdcSchemaChange {
             String cdcTableIdentifier = getCdcTableIdentifier(recordRoot);
             String dorisTableIdentifier =
                     getDorisTableIdentifier(cdcTableIdentifier, dorisOptions, 
tableMapping);
+
+            // if table dorisTableIdentifier is null, create table
+            if (StringUtils.isNullOrWhitespaceOnly(dorisTableIdentifier)) {
+                String[] split = cdcTableIdentifier.split("\\.");
+                String targetDb = changeContext.getTargetDatabase();
+                String sourceTable = split[1];
+                String dorisTable = 
changeContext.getTableNameConverter().convert(sourceTable);
+                LOG.info(
+                        "The table [{}.{}] does not exist. Attempting to 
create a new table named: {}.{}",
+                        targetDb,
+                        sourceTable,
+                        targetDb,
+                        dorisTable);
+                tableMapping.put(cdcTableIdentifier, String.format("%s.%s", 
targetDb, dorisTable));
+                dorisTableIdentifier = tableMapping.get(cdcTableIdentifier);
+                Map<String, Object> stringObjectMap = extractAfterRow(logData);
+                JsonNode jsonNode = objectMapper.valueToTree(stringObjectMap);
+
+                MongoDBSchema mongoSchema = new MongoDBSchema(jsonNode, 
targetDb, dorisTable, "");
+
+                mongoSchema.setModel(DataModel.UNIQUE);
+                DorisTableUtil.tryCreateTableIfAbsent(
+                        dorisSystem,
+                        targetDb,
+                        dorisTable,
+                        mongoSchema,
+                        changeContext.getDorisTableConf());
+            }
+
             String[] tableInfo = dorisTableIdentifier.split("\\.");
             if (tableInfo.length != 2) {
                 throw new DorisRuntimeException();
@@ -163,6 +201,10 @@ public class MongoJsonDebeziumSchemaChange extends 
CdcSchemaChange {
         }
     }
 
+    public Map<String, Object> extractAfterRow(JsonNode recordRoot) {
+        return JsonNodeExtractUtil.extractAfterRow(recordRoot, objectMapper);
+    }
+
     private void checkAndUpdateSchemaChange(
             JsonNode logData, String dorisTableIdentifier, String database, 
String table) {
         Map<String, String> tableFieldMap = 
tableFields.get(dorisTableIdentifier);
@@ -207,4 +249,14 @@ public class MongoJsonDebeziumSchemaChange extends 
CdcSchemaChange {
         String db = nameSpace.get(FIELD_DATABASE).asText();
         return SourceSchema.getString(db, null, table);
     }
+
+    @VisibleForTesting
+    public void setDorisSystem(DorisSystem dorisSystem) {
+        this.dorisSystem = dorisSystem;
+    }
+
+    @VisibleForTesting
+    public void setSchemaChangeManager(SchemaChangeManager 
schemaChangeManager) {
+        this.schemaChangeManager = schemaChangeManager;
+    }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/utils/DorisTableUtil.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/utils/DorisTableUtil.java
new file mode 100644
index 00000000..114bf151
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/utils/DorisTableUtil.java
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc.utils;
+
+import org.apache.flink.util.CollectionUtil;
+
+import org.apache.doris.flink.catalog.doris.DorisSchemaFactory;
+import org.apache.doris.flink.catalog.doris.DorisSystem;
+import org.apache.doris.flink.catalog.doris.TableSchema;
+import org.apache.doris.flink.exception.DorisSystemException;
+import org.apache.doris.flink.tools.cdc.DorisTableConfig;
+import org.apache.doris.flink.tools.cdc.SourceSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLSyntaxErrorException;
+import java.util.ArrayList;
+
+/** Utility class for Doris table operations. */
+public class DorisTableUtil {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DorisTableUtil.class);
+
+    /**
+     * Try to create a table in doris if it doesn't exist.
+     *
+     * @param dorisSystem Doris system instance
+     * @param targetDb Doris database name
+     * @param dorisTable Doris table name
+     * @param schema Doris table schema
+     * @param tableConfig Table configuration
+     * @param ignoreIncompatible Whether to ignore incompatible schema errors
+     * @throws DorisSystemException if table creation fails
+     */
+    public static void tryCreateTableIfAbsent(
+            DorisSystem dorisSystem,
+            String targetDb,
+            String dorisTable,
+            SourceSchema schema,
+            DorisTableConfig tableConfig,
+            boolean ignoreIncompatible)
+            throws DorisSystemException {
+
+        if (!dorisSystem.tableExists(targetDb, dorisTable)) {
+            if (tableConfig.isConvertUniqToPk()
+                    && CollectionUtil.isNullOrEmpty(schema.primaryKeys)
+                    && !CollectionUtil.isNullOrEmpty(schema.uniqueIndexs)) {
+                schema.primaryKeys = new ArrayList<>(schema.uniqueIndexs);
+            }
+
+            TableSchema dorisSchema =
+                    DorisSchemaFactory.createTableSchema(
+                            targetDb,
+                            dorisTable,
+                            schema.getFields(),
+                            schema.getPrimaryKeys(),
+                            tableConfig,
+                            schema.getTableComment());
+            try {
+                dorisSystem.createTable(dorisSchema);
+            } catch (Exception ex) {
+                handleTableCreationFailure(ex, ignoreIncompatible);
+            }
+        }
+    }
+
+    /** Overloaded method without ignoreIncompatible parameter. */
+    public static void tryCreateTableIfAbsent(
+            DorisSystem dorisSystem,
+            String targetDb,
+            String dorisTable,
+            SourceSchema schema,
+            DorisTableConfig tableConfig)
+            throws DorisSystemException {
+        tryCreateTableIfAbsent(dorisSystem, targetDb, dorisTable, schema, 
tableConfig, false);
+    }
+
+    /**
+     * Handle table creation failure.
+     *
+     * @param ex Exception that occurred during table creation
+     * @param ignoreIncompatible Whether to ignore incompatible schema errors
+     * @throws DorisSystemException if table creation fails and errors should 
not be ignored
+     */
+    private static void handleTableCreationFailure(Exception ex, boolean 
ignoreIncompatible)
+            throws DorisSystemException {
+        if (ignoreIncompatible && ex.getCause() instanceof 
SQLSyntaxErrorException) {
+            LOG.warn(
+                    "Doris schema and source table schema are not compatible. 
Error: {} ",
+                    ex.getCause().toString());
+        } else {
+            throw new DorisSystemException("Failed to create table due to: ", 
ex);
+        }
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/utils/JsonNodeExtractUtil.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/utils/JsonNodeExtractUtil.java
new file mode 100644
index 00000000..93aeca0b
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/utils/JsonNodeExtractUtil.java
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc.utils;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.ID_FIELD;
+import static 
org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.OID_FIELD;
+
+/** Utility class for extracting data from JSON nodes */
+public class JsonNodeExtractUtil {
+    private static final Logger LOG = 
LoggerFactory.getLogger(JsonNodeExtractUtil.class);
+
+    /**
+     * Extract row data from JsonNode and convert to Map
+     *
+     * @param recordRow JsonNode containing row data
+     * @param objectMapper ObjectMapper instance for JSON conversion
+     * @return Map containing extracted row data
+     */
+    public static Map<String, Object> extractRow(JsonNode recordRow, 
ObjectMapper objectMapper) {
+        Map<String, Object> recordMap =
+                objectMapper.convertValue(recordRow, new 
TypeReference<Map<String, Object>>() {});
+        return recordMap != null ? recordMap : new HashMap<>();
+    }
+
+    public static Map<String, Object> extractAfterRow(
+            JsonNode recordRoot, ObjectMapper objectMapper) {
+        Map<String, Object> rowMap = 
JsonNodeExtractUtil.extractRow(recordRoot, objectMapper);
+        String objectId;
+        // if user specifies the `_id` field manually, the $oid field may not 
exist
+        if (rowMap.get(ID_FIELD) instanceof Map<?, ?>) {
+            objectId = ((Map<?, ?>) 
rowMap.get(ID_FIELD)).get(OID_FIELD).toString();
+        } else {
+            objectId = rowMap.get(ID_FIELD).toString();
+        }
+        rowMap.put(ID_FIELD, objectId);
+        return rowMap;
+    }
+}
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBCreateTableTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBCreateTableTest.java
new file mode 100644
index 00000000..b395d3a7
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBCreateTableTest.java
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc.mongodb;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.doris.flink.catalog.doris.DorisSystem;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.sink.schema.SchemaChangeManager;
+import 
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeContext;
+import 
org.apache.doris.flink.sink.writer.serializer.jsondebezium.TestJsonDebeziumChangeBase;
+import org.apache.doris.flink.tools.cdc.DorisTableConfig;
+import org.apache.doris.flink.tools.cdc.converter.TableNameConverter;
+import 
org.apache.doris.flink.tools.cdc.mongodb.serializer.MongoJsonDebeziumSchemaChange;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class MongoDBCreateTableTest extends TestJsonDebeziumChangeBase {
+
+    private MongoJsonDebeziumSchemaChange schemaChange;
+
+    @Rule public MockitoRule mockitoRule = MockitoJUnit.rule();
+
+    @Mock private DorisSystem mockDorisSystem;
+
+    @Mock private DorisOptions mockDorisOptions;
+
+    @Mock private SchemaChangeManager mockSchemaManager;
+
+    private final String dbName = "test_db";
+    private final String prefix = "ods_";
+    private final String suffix = "_dt";
+
+    @Before
+    public void setUp() {
+        super.setUp();
+
+        Map<String, String> tableConfig = new HashMap<>();
+        tableConfig.put(DorisTableConfig.REPLICATION_NUM, "1");
+        tableConfig.put(DorisTableConfig.TABLE_BUCKETS, ".*:1");
+
+        JsonDebeziumChangeContext changeContext =
+                new JsonDebeziumChangeContext(
+                        mockDorisOptions,
+                        tableMapping,
+                        null,
+                        dbName,
+                        new DorisTableConfig(tableConfig),
+                        objectMapper,
+                        null,
+                        lineDelimiter,
+                        ignoreUpdateBefore,
+                        prefix,
+                        suffix,
+                        true,
+                        new TableNameConverter(prefix, suffix));
+        schemaChange = new MongoJsonDebeziumSchemaChange(changeContext);
+    }
+
+    @Test
+    public void testAutoCreateTable() throws IOException {
+        String newTableName = "test_table";
+        String record =
+                "{"
+                        + "\"_id\":\"{\\\"_id\\\": {\\\"_id\\\": 
{\\\"$oid\\\": \\\"67d2d13807fe0c4336070cfd\\\"}}}\","
+                        + "\"operationType\":\"insert\","
+                        + "\"fullDocument\":\"{\\\"_id\\\": {\\\"$oid\\\": 
\\\"67d2d13807fe0c4336070cfd\\\"}, \\\"name\\\": \\\"John Doe\\\", \\\"age\\\": 
30, \\\"city\\\": \\\"New York\\\"}\","
+                        + "\"fullDocumentBeforeChange\":null,"
+                        + 
"\"source\":{\"ts_ms\":1741869368000,\"snapshot\":\"false\"},"
+                        + "\"ts_ms\":1741869368365,"
+                        + "\"ns\":{\"db\":\"testDB\",\"coll\":\""
+                        + newTableName
+                        + "\"},"
+                        + "\"to\":null,"
+                        + "\"documentKey\":\"{\\\"_id\\\": {\\\"$oid\\\": 
\\\"67d2d13807fe0c4336070cfd\\\"}}\","
+                        + "\"updateDescription\":null,"
+                        + "\"clusterTime\":\"{\\\"$timestamp\\\": {\\\"t\\\": 
1741869368, \\\"i\\\": 2}}\","
+                        + "\"txnNumber\":null,"
+                        + "\"lsid\":null"
+                        + "}";
+        schemaChange.setSchemaChangeManager(mockSchemaManager);
+        schemaChange.setDorisSystem(mockDorisSystem);
+
+        JsonNode recordRoot = objectMapper.readTree(record);
+        boolean result = schemaChange.schemaChange(recordRoot);
+        Assert.assertTrue(
+                tableMapping.containsValue(
+                        dbName
+                                + "."
+                                + new TableNameConverter(prefix, 
suffix).convert(newTableName)));
+        Assert.assertTrue(result);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to