This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new be72511f [improve] auto create Doris table for new MongoDB collections in Data (#573) be72511f is described below commit be72511f15f1fcfe2ec9fb110c76d0920dab7cee Author: kwonder0926 <76644839+kwonder0...@users.noreply.github.com> AuthorDate: Thu Mar 27 18:57:18 2025 +0800 [improve] auto create Doris table for new MongoDB collections in Data (#573) --- .../apache/doris/flink/tools/cdc/DatabaseSync.java | 46 ++------ .../tools/cdc/mongodb/MongoDBDatabaseSync.java | 3 + .../flink/tools/cdc/mongodb/MongoDBSchema.java | 23 ++++ .../MongoDBJsonDebeziumSchemaSerializer.java | 29 +++++- .../serializer/MongoJsonDebeziumDataChange.java | 19 +--- .../serializer/MongoJsonDebeziumSchemaChange.java | 56 +++++++++- .../flink/tools/cdc/utils/DorisTableUtil.java | 109 +++++++++++++++++++ .../flink/tools/cdc/utils/JsonNodeExtractUtil.java | 62 +++++++++++ .../tools/cdc/mongodb/MongoDBCreateTableTest.java | 116 +++++++++++++++++++++ 9 files changed, 404 insertions(+), 59 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index 06e49e24..326b4e87 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -28,14 +28,12 @@ import org.apache.flink.util.OutputTag; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; -import org.apache.doris.flink.catalog.doris.DorisSchemaFactory; import org.apache.doris.flink.catalog.doris.DorisSystem; import org.apache.doris.flink.catalog.doris.TableSchema; import org.apache.doris.flink.cfg.DorisConnectionOptions; import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; -import org.apache.doris.flink.exception.DorisSystemException; import org.apache.doris.flink.sink.DorisSink; import org.apache.doris.flink.sink.schema.SchemaChangeMode; import org.apache.doris.flink.sink.writer.WriteMode; @@ -44,12 +42,12 @@ import org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerialize import org.apache.doris.flink.table.DorisConfigOptions; import org.apache.doris.flink.tools.cdc.converter.TableNameConverter; import org.apache.doris.flink.tools.cdc.oracle.OracleDatabaseSync; +import org.apache.doris.flink.tools.cdc.utils.DorisTableUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.SQLException; -import java.sql.SQLSyntaxErrorException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -150,7 +148,13 @@ public abstract class DatabaseSync { // Calculate the mapping relationship between upstream and downstream tables tableMapping.put( schema.getTableIdentifier(), String.format("%s.%s", targetDb, dorisTable)); - tryCreateTableIfAbsent(dorisSystem, targetDb, dorisTable, schema); + DorisTableUtil.tryCreateTableIfAbsent( + dorisSystem, + targetDb, + dorisTable, + schema, + dorisTableConfig, + ignoreIncompatible); if (!dorisTables.contains(Tuple2.of(targetDb, dorisTable))) { dorisTables.add(Tuple2.of(targetDb, dorisTable)); @@ -470,40 +474,6 @@ public abstract class DatabaseSync { } } - private void tryCreateTableIfAbsent( - DorisSystem dorisSystem, String targetDb, String dorisTable, SourceSchema schema) { - if (!dorisSystem.tableExists(targetDb, dorisTable)) { - if (dorisTableConfig.isConvertUniqToPk() - && CollectionUtil.isNullOrEmpty(schema.primaryKeys) - && !CollectionUtil.isNullOrEmpty(schema.uniqueIndexs)) { - schema.primaryKeys = new ArrayList<>(schema.uniqueIndexs); - } - TableSchema dorisSchema = - DorisSchemaFactory.createTableSchema( - targetDb, - dorisTable, - schema.getFields(), - schema.getPrimaryKeys(), - dorisTableConfig, - schema.getTableComment()); - try { - dorisSystem.createTable(dorisSchema); - } catch (Exception ex) { - handleTableCreationFailure(ex); - } - } - } - - private void handleTableCreationFailure(Exception ex) throws DorisSystemException { - if (ignoreIncompatible && ex.getCause() instanceof SQLSyntaxErrorException) { - LOG.warn( - "Doris schema and source table schema are not compatible. Error: {} ", - ex.getCause().toString()); - } else { - throw new DorisSystemException("Failed to create table due to: ", ex); - } - } - protected Properties getJdbcProperties() { Properties jdbcProps = new Properties(); for (Map.Entry<String, String> entry : config.toMap().entrySet()) { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java index ca034ac3..b77ec0e3 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java @@ -240,6 +240,9 @@ public class MongoDBDatabaseSync extends DatabaseSync { .setTableMapping(tableMapping) .setTableConf(dorisTableConfig) .setTargetDatabase(database) + .setTargetTablePrefix(tablePrefix) + .setTargetTableSuffix(tableSuffix) + .setTableNameConverter(converter) .build(); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java index 984419bc..2e2788de 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java @@ -20,6 +20,7 @@ package org.apache.doris.flink.tools.cdc.mongodb; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.java.tuple.Tuple2; +import com.fasterxml.jackson.databind.JsonNode; import org.apache.doris.flink.catalog.doris.DorisType; import org.apache.doris.flink.catalog.doris.FieldSchema; import org.apache.doris.flink.tools.cdc.SourceSchema; @@ -64,6 +65,28 @@ public class MongoDBSchema extends SourceSchema { primaryKeys.add("_id"); } + public MongoDBSchema( + JsonNode jsonData, String databaseName, String tableName, String tableComment) + throws Exception { + super(databaseName, null, tableName, tableComment); + fields = new LinkedHashMap<>(); + processSampleData(jsonData); + + primaryKeys = new ArrayList<>(); + primaryKeys.add("_id"); + } + + @VisibleForTesting + protected void processSampleData(JsonNode data) { + data.fieldNames() + .forEachRemaining( + fieldName -> { + JsonNode value = data.get(fieldName); + String dorisType = MongoDBType.jsonNodeToDorisType(value); + fields.put(fieldName, new FieldSchema(fieldName, dorisType, null)); + }); + } + @VisibleForTesting protected void processSampleData(Document sampleData) { for (Map.Entry<String, Object> entry : sampleData.entrySet()) { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java index 296a3727..dec06e91 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java @@ -29,6 +29,7 @@ import org.apache.doris.flink.sink.writer.serializer.jsondebezium.CdcDataChange; import org.apache.doris.flink.sink.writer.serializer.jsondebezium.CdcSchemaChange; import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeContext; import org.apache.doris.flink.tools.cdc.DorisTableConfig; +import org.apache.doris.flink.tools.cdc.converter.TableNameConverter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,6 +63,7 @@ public class MongoDBJsonDebeziumSchemaSerializer implements DorisRecordSerialize private String targetTablePrefix; private String targetTableSuffix; + private TableNameConverter tableNameConverter; public MongoDBJsonDebeziumSchemaSerializer( DorisOptions dorisOptions, @@ -72,7 +74,8 @@ public class MongoDBJsonDebeziumSchemaSerializer implements DorisRecordSerialize DorisTableConfig dorisTableConfig, String targetDatabase, String targetTablePrefix, - String targetTableSuffix) { + String targetTableSuffix, + TableNameConverter tableNameConverter) { this.dorisOptions = dorisOptions; this.pattern = pattern; this.sourceTableName = sourceTableName; @@ -85,6 +88,7 @@ public class MongoDBJsonDebeziumSchemaSerializer implements DorisRecordSerialize this.targetDatabase = targetDatabase; this.targetTablePrefix = targetTablePrefix; this.targetTableSuffix = targetTableSuffix; + this.tableNameConverter = tableNameConverter; if (executionOptions != null) { this.lineDelimiter = executionOptions @@ -111,6 +115,7 @@ public class MongoDBJsonDebeziumSchemaSerializer implements DorisRecordSerialize targetTablePrefix, targetTableSuffix, enableDelete); + changeContext.setTableNameConverter(tableNameConverter); this.dataChange = new MongoJsonDebeziumDataChange(changeContext); this.schemaChange = new MongoJsonDebeziumSchemaChange(changeContext); } @@ -143,6 +148,7 @@ public class MongoDBJsonDebeziumSchemaSerializer implements DorisRecordSerialize private String targetDatabase; private String targetTablePrefix = ""; private String targetTableSuffix = ""; + private TableNameConverter tableNameConverter; public MongoDBJsonDebeziumSchemaSerializer.Builder setDorisOptions( DorisOptions dorisOptions) { @@ -192,6 +198,24 @@ public class MongoDBJsonDebeziumSchemaSerializer implements DorisRecordSerialize return this; } + public MongoDBJsonDebeziumSchemaSerializer.Builder setTargetTablePrefix( + String targetTablePrefix) { + this.targetTablePrefix = targetTablePrefix; + return this; + } + + public MongoDBJsonDebeziumSchemaSerializer.Builder setTargetTableSuffix( + String targetTableSuffix) { + this.targetTableSuffix = targetTableSuffix; + return this; + } + + public MongoDBJsonDebeziumSchemaSerializer.Builder setTableNameConverter( + TableNameConverter converter) { + this.tableNameConverter = converter; + return this; + } + public MongoDBJsonDebeziumSchemaSerializer build() { return new MongoDBJsonDebeziumSchemaSerializer( dorisOptions, @@ -202,7 +226,8 @@ public class MongoDBJsonDebeziumSchemaSerializer implements DorisRecordSerialize dorisTableConfig, targetDatabase, targetTablePrefix, - targetTableSuffix); + targetTableSuffix, + tableNameConverter); } } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java index 9dbe7ffe..12fa8581 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java @@ -20,7 +20,6 @@ package org.apache.doris.flink.tools.cdc.mongodb.serializer; import org.apache.flink.util.StringUtils; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.NullNode; @@ -30,6 +29,7 @@ import org.apache.doris.flink.sink.writer.serializer.DorisRecord; import org.apache.doris.flink.sink.writer.serializer.jsondebezium.CdcDataChange; import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeContext; import org.apache.doris.flink.tools.cdc.SourceSchema; +import org.apache.doris.flink.tools.cdc.utils.JsonNodeExtractUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -127,16 +127,7 @@ public class MongoJsonDebeziumDataChange extends CdcDataChange implements Change @Override public Map<String, Object> extractAfterRow(JsonNode recordRoot) { JsonNode dataNode = recordRoot.get(FIELD_DATA); - Map<String, Object> rowMap = extractRow(dataNode); - String objectId; - // if user specifies the `_id` field manually, the $oid field may not exist - if (rowMap.get(ID_FIELD) instanceof Map<?, ?>) { - objectId = ((Map<?, ?>) rowMap.get(ID_FIELD)).get(OID_FIELD).toString(); - } else { - objectId = rowMap.get(ID_FIELD).toString(); - } - rowMap.put(ID_FIELD, objectId); - return rowMap; + return JsonNodeExtractUtil.extractAfterRow(dataNode, objectMapper); } private Map<String, Object> extractDeleteRow(JsonNode recordRoot) @@ -154,10 +145,4 @@ public class MongoJsonDebeziumDataChange extends CdcDataChange implements Change row.put(ID_FIELD, objectId); return row; } - - private Map<String, Object> extractRow(JsonNode recordRow) { - Map<String, Object> recordMap = - objectMapper.convertValue(recordRow, new TypeReference<Map<String, Object>>() {}); - return recordMap != null ? recordMap : new HashMap<>(); - } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java index c67e856e..cfcf53f2 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java @@ -17,10 +17,14 @@ package org.apache.doris.flink.tools.cdc.mongodb.serializer; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.util.StringUtils; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.NullNode; import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.doris.flink.catalog.doris.DataModel; import org.apache.doris.flink.catalog.doris.DorisSystem; import org.apache.doris.flink.catalog.doris.FieldSchema; import org.apache.doris.flink.cfg.DorisOptions; @@ -30,8 +34,11 @@ import org.apache.doris.flink.sink.schema.SchemaChangeManager; import org.apache.doris.flink.sink.writer.serializer.jsondebezium.CdcSchemaChange; import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeContext; import org.apache.doris.flink.tools.cdc.SourceSchema; +import org.apache.doris.flink.tools.cdc.mongodb.MongoDBSchema; import org.apache.doris.flink.tools.cdc.mongodb.MongoDBType; import org.apache.doris.flink.tools.cdc.mongodb.MongoDateConverter; +import org.apache.doris.flink.tools.cdc.utils.DorisTableUtil; +import org.apache.doris.flink.tools.cdc.utils.JsonNodeExtractUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,17 +67,19 @@ public class MongoJsonDebeziumSchemaChange extends CdcSchemaChange { private final Map<String, Map<String, String>> tableFields; - private final SchemaChangeManager schemaChangeManager; + private SchemaChangeManager schemaChangeManager; - private final DorisSystem dorisSystem; + private DorisSystem dorisSystem; public Map<String, String> tableMapping; private final DorisOptions dorisOptions; + public JsonDebeziumChangeContext changeContext; private final Set<String> specialFields = new HashSet<>(Arrays.asList(DATE_FIELD, TIMESTAMP_FIELD, DECIMAL_FIELD, LONG_FIELD)); public MongoJsonDebeziumSchemaChange(JsonDebeziumChangeContext changeContext) { + this.changeContext = changeContext; this.objectMapper = changeContext.getObjectMapper(); this.dorisOptions = changeContext.getDorisOptions(); this.tableFields = new HashMap<>(); @@ -96,6 +105,35 @@ public class MongoJsonDebeziumSchemaChange extends CdcSchemaChange { String cdcTableIdentifier = getCdcTableIdentifier(recordRoot); String dorisTableIdentifier = getDorisTableIdentifier(cdcTableIdentifier, dorisOptions, tableMapping); + + // if table dorisTableIdentifier is null, create table + if (StringUtils.isNullOrWhitespaceOnly(dorisTableIdentifier)) { + String[] split = cdcTableIdentifier.split("\\."); + String targetDb = changeContext.getTargetDatabase(); + String sourceTable = split[1]; + String dorisTable = changeContext.getTableNameConverter().convert(sourceTable); + LOG.info( + "The table [{}.{}] does not exist. Attempting to create a new table named: {}.{}", + targetDb, + sourceTable, + targetDb, + dorisTable); + tableMapping.put(cdcTableIdentifier, String.format("%s.%s", targetDb, dorisTable)); + dorisTableIdentifier = tableMapping.get(cdcTableIdentifier); + Map<String, Object> stringObjectMap = extractAfterRow(logData); + JsonNode jsonNode = objectMapper.valueToTree(stringObjectMap); + + MongoDBSchema mongoSchema = new MongoDBSchema(jsonNode, targetDb, dorisTable, ""); + + mongoSchema.setModel(DataModel.UNIQUE); + DorisTableUtil.tryCreateTableIfAbsent( + dorisSystem, + targetDb, + dorisTable, + mongoSchema, + changeContext.getDorisTableConf()); + } + String[] tableInfo = dorisTableIdentifier.split("\\."); if (tableInfo.length != 2) { throw new DorisRuntimeException(); @@ -163,6 +201,10 @@ public class MongoJsonDebeziumSchemaChange extends CdcSchemaChange { } } + public Map<String, Object> extractAfterRow(JsonNode recordRoot) { + return JsonNodeExtractUtil.extractAfterRow(recordRoot, objectMapper); + } + private void checkAndUpdateSchemaChange( JsonNode logData, String dorisTableIdentifier, String database, String table) { Map<String, String> tableFieldMap = tableFields.get(dorisTableIdentifier); @@ -207,4 +249,14 @@ public class MongoJsonDebeziumSchemaChange extends CdcSchemaChange { String db = nameSpace.get(FIELD_DATABASE).asText(); return SourceSchema.getString(db, null, table); } + + @VisibleForTesting + public void setDorisSystem(DorisSystem dorisSystem) { + this.dorisSystem = dorisSystem; + } + + @VisibleForTesting + public void setSchemaChangeManager(SchemaChangeManager schemaChangeManager) { + this.schemaChangeManager = schemaChangeManager; + } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/utils/DorisTableUtil.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/utils/DorisTableUtil.java new file mode 100644 index 00000000..114bf151 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/utils/DorisTableUtil.java @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.tools.cdc.utils; + +import org.apache.flink.util.CollectionUtil; + +import org.apache.doris.flink.catalog.doris.DorisSchemaFactory; +import org.apache.doris.flink.catalog.doris.DorisSystem; +import org.apache.doris.flink.catalog.doris.TableSchema; +import org.apache.doris.flink.exception.DorisSystemException; +import org.apache.doris.flink.tools.cdc.DorisTableConfig; +import org.apache.doris.flink.tools.cdc.SourceSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.SQLSyntaxErrorException; +import java.util.ArrayList; + +/** Utility class for Doris table operations. */ +public class DorisTableUtil { + private static final Logger LOG = LoggerFactory.getLogger(DorisTableUtil.class); + + /** + * Try to create a table in doris if it doesn't exist. + * + * @param dorisSystem Doris system instance + * @param targetDb Doris database name + * @param dorisTable Doris table name + * @param schema Doris table schema + * @param tableConfig Table configuration + * @param ignoreIncompatible Whether to ignore incompatible schema errors + * @throws DorisSystemException if table creation fails + */ + public static void tryCreateTableIfAbsent( + DorisSystem dorisSystem, + String targetDb, + String dorisTable, + SourceSchema schema, + DorisTableConfig tableConfig, + boolean ignoreIncompatible) + throws DorisSystemException { + + if (!dorisSystem.tableExists(targetDb, dorisTable)) { + if (tableConfig.isConvertUniqToPk() + && CollectionUtil.isNullOrEmpty(schema.primaryKeys) + && !CollectionUtil.isNullOrEmpty(schema.uniqueIndexs)) { + schema.primaryKeys = new ArrayList<>(schema.uniqueIndexs); + } + + TableSchema dorisSchema = + DorisSchemaFactory.createTableSchema( + targetDb, + dorisTable, + schema.getFields(), + schema.getPrimaryKeys(), + tableConfig, + schema.getTableComment()); + try { + dorisSystem.createTable(dorisSchema); + } catch (Exception ex) { + handleTableCreationFailure(ex, ignoreIncompatible); + } + } + } + + /** Overloaded method without ignoreIncompatible parameter. */ + public static void tryCreateTableIfAbsent( + DorisSystem dorisSystem, + String targetDb, + String dorisTable, + SourceSchema schema, + DorisTableConfig tableConfig) + throws DorisSystemException { + tryCreateTableIfAbsent(dorisSystem, targetDb, dorisTable, schema, tableConfig, false); + } + + /** + * Handle table creation failure. + * + * @param ex Exception that occurred during table creation + * @param ignoreIncompatible Whether to ignore incompatible schema errors + * @throws DorisSystemException if table creation fails and errors should not be ignored + */ + private static void handleTableCreationFailure(Exception ex, boolean ignoreIncompatible) + throws DorisSystemException { + if (ignoreIncompatible && ex.getCause() instanceof SQLSyntaxErrorException) { + LOG.warn( + "Doris schema and source table schema are not compatible. Error: {} ", + ex.getCause().toString()); + } else { + throw new DorisSystemException("Failed to create table due to: ", ex); + } + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/utils/JsonNodeExtractUtil.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/utils/JsonNodeExtractUtil.java new file mode 100644 index 00000000..93aeca0b --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/utils/JsonNodeExtractUtil.java @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.tools.cdc.utils; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.ID_FIELD; +import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.OID_FIELD; + +/** Utility class for extracting data from JSON nodes */ +public class JsonNodeExtractUtil { + private static final Logger LOG = LoggerFactory.getLogger(JsonNodeExtractUtil.class); + + /** + * Extract row data from JsonNode and convert to Map + * + * @param recordRow JsonNode containing row data + * @param objectMapper ObjectMapper instance for JSON conversion + * @return Map containing extracted row data + */ + public static Map<String, Object> extractRow(JsonNode recordRow, ObjectMapper objectMapper) { + Map<String, Object> recordMap = + objectMapper.convertValue(recordRow, new TypeReference<Map<String, Object>>() {}); + return recordMap != null ? recordMap : new HashMap<>(); + } + + public static Map<String, Object> extractAfterRow( + JsonNode recordRoot, ObjectMapper objectMapper) { + Map<String, Object> rowMap = JsonNodeExtractUtil.extractRow(recordRoot, objectMapper); + String objectId; + // if user specifies the `_id` field manually, the $oid field may not exist + if (rowMap.get(ID_FIELD) instanceof Map<?, ?>) { + objectId = ((Map<?, ?>) rowMap.get(ID_FIELD)).get(OID_FIELD).toString(); + } else { + objectId = rowMap.get(ID_FIELD).toString(); + } + rowMap.put(ID_FIELD, objectId); + return rowMap; + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBCreateTableTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBCreateTableTest.java new file mode 100644 index 00000000..b395d3a7 --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBCreateTableTest.java @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.tools.cdc.mongodb; + +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.doris.flink.catalog.doris.DorisSystem; +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.sink.schema.SchemaChangeManager; +import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeContext; +import org.apache.doris.flink.sink.writer.serializer.jsondebezium.TestJsonDebeziumChangeBase; +import org.apache.doris.flink.tools.cdc.DorisTableConfig; +import org.apache.doris.flink.tools.cdc.converter.TableNameConverter; +import org.apache.doris.flink.tools.cdc.mongodb.serializer.MongoJsonDebeziumSchemaChange; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class MongoDBCreateTableTest extends TestJsonDebeziumChangeBase { + + private MongoJsonDebeziumSchemaChange schemaChange; + + @Rule public MockitoRule mockitoRule = MockitoJUnit.rule(); + + @Mock private DorisSystem mockDorisSystem; + + @Mock private DorisOptions mockDorisOptions; + + @Mock private SchemaChangeManager mockSchemaManager; + + private final String dbName = "test_db"; + private final String prefix = "ods_"; + private final String suffix = "_dt"; + + @Before + public void setUp() { + super.setUp(); + + Map<String, String> tableConfig = new HashMap<>(); + tableConfig.put(DorisTableConfig.REPLICATION_NUM, "1"); + tableConfig.put(DorisTableConfig.TABLE_BUCKETS, ".*:1"); + + JsonDebeziumChangeContext changeContext = + new JsonDebeziumChangeContext( + mockDorisOptions, + tableMapping, + null, + dbName, + new DorisTableConfig(tableConfig), + objectMapper, + null, + lineDelimiter, + ignoreUpdateBefore, + prefix, + suffix, + true, + new TableNameConverter(prefix, suffix)); + schemaChange = new MongoJsonDebeziumSchemaChange(changeContext); + } + + @Test + public void testAutoCreateTable() throws IOException { + String newTableName = "test_table"; + String record = + "{" + + "\"_id\":\"{\\\"_id\\\": {\\\"_id\\\": {\\\"$oid\\\": \\\"67d2d13807fe0c4336070cfd\\\"}}}\"," + + "\"operationType\":\"insert\"," + + "\"fullDocument\":\"{\\\"_id\\\": {\\\"$oid\\\": \\\"67d2d13807fe0c4336070cfd\\\"}, \\\"name\\\": \\\"John Doe\\\", \\\"age\\\": 30, \\\"city\\\": \\\"New York\\\"}\"," + + "\"fullDocumentBeforeChange\":null," + + "\"source\":{\"ts_ms\":1741869368000,\"snapshot\":\"false\"}," + + "\"ts_ms\":1741869368365," + + "\"ns\":{\"db\":\"testDB\",\"coll\":\"" + + newTableName + + "\"}," + + "\"to\":null," + + "\"documentKey\":\"{\\\"_id\\\": {\\\"$oid\\\": \\\"67d2d13807fe0c4336070cfd\\\"}}\"," + + "\"updateDescription\":null," + + "\"clusterTime\":\"{\\\"$timestamp\\\": {\\\"t\\\": 1741869368, \\\"i\\\": 2}}\"," + + "\"txnNumber\":null," + + "\"lsid\":null" + + "}"; + schemaChange.setSchemaChangeManager(mockSchemaManager); + schemaChange.setDorisSystem(mockDorisSystem); + + JsonNode recordRoot = objectMapper.readTree(record); + boolean result = schemaChange.schemaChange(recordRoot); + Assert.assertTrue( + tableMapping.containsValue( + dbName + + "." + + new TableNameConverter(prefix, suffix).convert(newTableName))); + Assert.assertTrue(result); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org