This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new ca358a8 [Feature](cdc) add MongoDB cdc (#343) ca358a8 is described below commit ca358a8ac56f009f8bb6ef00eaefcca6eb212baa Author: bingquanzhao <bingquan_z...@icloud.com> AuthorDate: Wed May 8 10:07:52 2024 +0800 [Feature](cdc) add MongoDB cdc (#343) --- flink-doris-connector/pom.xml | 13 ++ .../doris/flink/catalog/doris/DorisSystem.java | 27 +++ .../MongoDBJsonDebeziumSchemaSerializer.java | 200 +++++++++++++++++++ .../serializer/jsondebezium/CdcDataChange.java | 2 +- .../jsondebezium/MongoJsonDebeziumDataChange.java | 147 ++++++++++++++ .../MongoJsonDebeziumSchemaChange.java | 196 ++++++++++++++++++ .../org/apache/doris/flink/tools/cdc/CdcTools.java | 14 ++ .../apache/doris/flink/tools/cdc/DatabaseSync.java | 2 +- .../tools/cdc/mongodb/ChangeStreamConstant.java | 42 ++++ .../tools/cdc/mongodb/MongoDBDatabaseSync.java | 220 +++++++++++++++++++++ .../flink/tools/cdc/mongodb/MongoDBSchema.java | 98 +++++++++ .../doris/flink/tools/cdc/mongodb/MongoDBType.java | 126 ++++++++++++ .../tools/cdc/mongodb/MongoDateConverter.java | 35 ++++ .../cdc/mongodb/MongoParsingProcessFunction.java | 44 +++++ .../flink/tools/cdc/CdcMongoSyncDatabaseCase.java | 102 ++++++++++ 15 files changed, 1266 insertions(+), 2 deletions(-) diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml index beaa5b9..23145e4 100644 --- a/flink-doris-connector/pom.xml +++ b/flink-doris-connector/pom.xml @@ -284,6 +284,19 @@ under the License. </exclusion> </exclusions> </dependency> + <dependency> + <groupId>com.ververica</groupId> + <artifactId>flink-sql-connector-mongodb-cdc</artifactId> + <!-- the dependency is available only for stable releases. --> + <version>${flink.sql.cdc.version}</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <artifactId>flink-shaded-guava</artifactId> + <groupId>org.apache.flink</groupId> + </exclusion> + </exclusions> + </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web</artifactId> diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java index 86745fa..3608e95 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java @@ -36,6 +36,7 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.Statement; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -241,6 +242,32 @@ public class DorisSystem implements Serializable { return sb.toString(); } + public Map<String, String> getTableFieldNames(String databaseName, String tableName) { + if (!databaseExists(databaseName)) { + throw new DorisRuntimeException("database" + databaseName + " is not exists"); + } + String sql = + String.format( + "SELECT COLUMN_NAME,DATA_TYPE " + + "FROM `information_schema`.`COLUMNS` WHERE `TABLE_SCHEMA`= '%s' AND `TABLE_NAME`= '%s'", + databaseName, tableName); + + Map<String, String> columnValues = new HashMap<>(); + try (PreparedStatement ps = + jdbcConnectionProvider.getOrEstablishConnection().prepareStatement(sql)) { + ResultSet rs = ps.executeQuery(); + while (rs.next()) { + String filedName = rs.getString(1); + String datatype = rs.getString(2); + columnValues.put(filedName, datatype); + } + return columnValues; + } catch (Exception e) { + throw new DorisSystemException( + String.format("The following SQL query could not be executed: %s", sql), e); + } + } + private static void buildColumn(StringBuilder sql, FieldSchema field, boolean isKey) { String fieldType = field.getTypeString(); if (isKey && DorisType.STRING.equals(fieldType)) { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/MongoDBJsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/MongoDBJsonDebeziumSchemaSerializer.java new file mode 100644 index 0000000..b2ff598 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/MongoDBJsonDebeziumSchemaSerializer.java @@ -0,0 +1,200 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.writer.serializer; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import org.apache.doris.flink.cfg.DorisExecutionOptions; +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.sink.writer.serializer.jsondebezium.CdcDataChange; +import org.apache.doris.flink.sink.writer.serializer.jsondebezium.CdcSchemaChange; +import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeContext; +import org.apache.doris.flink.sink.writer.serializer.jsondebezium.MongoJsonDebeziumDataChange; +import org.apache.doris.flink.sink.writer.serializer.jsondebezium.MongoJsonDebeziumSchemaChange; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.regex.Pattern; + +import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT; +import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY; + +public class MongoDBJsonDebeziumSchemaSerializer implements DorisRecordSerializer<String> { + + private static final Logger LOG = + LoggerFactory.getLogger(MongoDBJsonDebeziumSchemaSerializer.class); + private final Pattern pattern; + private final DorisOptions dorisOptions; + private final ObjectMapper objectMapper = new ObjectMapper(); + // table name of the cdc upstream, format is db.tbl + private final String sourceTableName; + private String lineDelimiter = LINE_DELIMITER_DEFAULT; + private boolean ignoreUpdateBefore = true; + // <cdc db.schema.table, doris db.table> + private Map<String, String> tableMapping; + // create table properties + private Map<String, String> tableProperties; + private String targetDatabase; + + private CdcDataChange dataChange; + private CdcSchemaChange schemaChange; + + private String targetTablePrefix; + private String targetTableSuffix; + + public MongoDBJsonDebeziumSchemaSerializer( + DorisOptions dorisOptions, + Pattern pattern, + String sourceTableName, + DorisExecutionOptions executionOptions, + Map<String, String> tableMapping, + Map<String, String> tableProperties, + String targetDatabase, + String targetTablePrefix, + String targetTableSuffix) { + this.dorisOptions = dorisOptions; + this.pattern = pattern; + this.sourceTableName = sourceTableName; + // Prevent loss of decimal data precision + this.objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS); + JsonNodeFactory jsonNodeFactory = JsonNodeFactory.withExactBigDecimals(true); + this.objectMapper.setNodeFactory(jsonNodeFactory); + this.tableMapping = tableMapping; + this.tableProperties = tableProperties; + this.targetDatabase = targetDatabase; + this.targetTablePrefix = targetTablePrefix; + this.targetTableSuffix = targetTableSuffix; + if (executionOptions != null) { + this.lineDelimiter = + executionOptions + .getStreamLoadProp() + .getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT); + this.ignoreUpdateBefore = executionOptions.getIgnoreUpdateBefore(); + } + init(); + } + + private void init() { + JsonDebeziumChangeContext changeContext = + new JsonDebeziumChangeContext( + dorisOptions, + tableMapping, + sourceTableName, + targetDatabase, + tableProperties, + objectMapper, + pattern, + lineDelimiter, + ignoreUpdateBefore, + targetTablePrefix, + targetTableSuffix); + this.dataChange = new MongoJsonDebeziumDataChange(changeContext); + this.schemaChange = new MongoJsonDebeziumSchemaChange(changeContext); + } + + @Override + public DorisRecord serialize(String record) throws IOException { + LOG.debug("received debezium json data {} :", record); + JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class); + String op = getOperateType(recordRoot); + try { + schemaChange.schemaChange(recordRoot); + } catch (Exception e) { + throw new RuntimeException(e); + } + return dataChange.serialize(record, recordRoot, op); + } + + private String getOperateType(JsonNode recordRoot) { + return recordRoot.get("operationType").asText(); + } + + public static MongoDBJsonDebeziumSchemaSerializer.Builder builder() { + return new MongoDBJsonDebeziumSchemaSerializer.Builder(); + } + + public static class Builder { + private DorisOptions dorisOptions; + private Pattern addDropDDLPattern; + private String sourceTableName; + private DorisExecutionOptions executionOptions; + private Map<String, String> tableMapping; + private Map<String, String> tableProperties; + private String targetDatabase; + private String targetTablePrefix = ""; + private String targetTableSuffix = ""; + + public MongoDBJsonDebeziumSchemaSerializer.Builder setDorisOptions( + DorisOptions dorisOptions) { + this.dorisOptions = dorisOptions; + return this; + } + + public MongoDBJsonDebeziumSchemaSerializer.Builder setPattern(Pattern addDropDDLPattern) { + this.addDropDDLPattern = addDropDDLPattern; + return this; + } + + public MongoDBJsonDebeziumSchemaSerializer.Builder setSourceTableName( + String sourceTableName) { + this.sourceTableName = sourceTableName; + return this; + } + + public MongoDBJsonDebeziumSchemaSerializer.Builder setExecutionOptions( + DorisExecutionOptions executionOptions) { + this.executionOptions = executionOptions; + return this; + } + + public MongoDBJsonDebeziumSchemaSerializer.Builder setTableMapping( + Map<String, String> tableMapping) { + this.tableMapping = tableMapping; + return this; + } + + public MongoDBJsonDebeziumSchemaSerializer.Builder setTableProperties( + Map<String, String> tableProperties) { + this.tableProperties = tableProperties; + return this; + } + + public MongoDBJsonDebeziumSchemaSerializer.Builder setTargetDatabase( + String targetDatabase) { + this.targetDatabase = targetDatabase; + return this; + } + + public MongoDBJsonDebeziumSchemaSerializer build() { + return new MongoDBJsonDebeziumSchemaSerializer( + dorisOptions, + addDropDDLPattern, + sourceTableName, + executionOptions, + tableMapping, + tableProperties, + targetDatabase, + targetTablePrefix, + targetTableSuffix); + } + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcDataChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcDataChange.java index c344aae..cba431c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcDataChange.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcDataChange.java @@ -31,7 +31,7 @@ import java.util.Map; */ public abstract class CdcDataChange implements ChangeEvent { - protected abstract DorisRecord serialize(String record, JsonNode recordRoot, String op) + public abstract DorisRecord serialize(String record, JsonNode recordRoot, String op) throws IOException; protected abstract Map<String, Object> extractBeforeRow(JsonNode record); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/MongoJsonDebeziumDataChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/MongoJsonDebeziumDataChange.java new file mode 100644 index 0000000..407a7e7 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/MongoJsonDebeziumDataChange.java @@ -0,0 +1,147 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.writer.serializer.jsondebezium; + +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.NullNode; +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.sink.writer.ChangeEvent; +import org.apache.doris.flink.sink.writer.serializer.DorisRecord; +import org.apache.doris.flink.tools.cdc.SourceSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.doris.flink.sink.util.DeleteOperation.addDeleteSign; +import static org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeUtils.extractJsonNode; +import static org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeUtils.getDorisTableIdentifier; +import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.FIELD_DATA; +import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.FIELD_DATABASE; +import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.FIELD_DOCUMENT_KEY; +import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.FIELD_NAMESPACE; +import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.FIELD_TABLE; +import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.ID_FIELD; +import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.OID_FIELD; +import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.OP_DELETE; +import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.OP_INSERT; +import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.OP_REPLACE; +import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.OP_UPDATE; + +public class MongoJsonDebeziumDataChange extends CdcDataChange implements ChangeEvent { + private static final Logger LOG = LoggerFactory.getLogger(MongoJsonDebeziumDataChange.class); + + public DorisOptions dorisOptions; + public String lineDelimiter; + public JsonDebeziumChangeContext changeContext; + public ObjectMapper objectMapper; + public Map<String, String> tableMapping; + + public MongoJsonDebeziumDataChange(JsonDebeziumChangeContext changeContext) { + this.changeContext = changeContext; + this.dorisOptions = changeContext.getDorisOptions(); + this.objectMapper = changeContext.getObjectMapper(); + this.lineDelimiter = changeContext.getLineDelimiter(); + this.tableMapping = changeContext.getTableMapping(); + } + + @Override + public DorisRecord serialize(String record, JsonNode recordRoot, String op) throws IOException { + // Filter out table records that are not in tableMapping + String cdcTableIdentifier = getCdcTableIdentifier(recordRoot); + String dorisTableIdentifier = + getDorisTableIdentifier(cdcTableIdentifier, dorisOptions, tableMapping); + if (StringUtils.isNullOrWhitespaceOnly(dorisTableIdentifier)) { + LOG.warn( + "filter table {}, because it is not listened, record detail is {}", + cdcTableIdentifier, + record); + return null; + } + Map<String, Object> valueMap; + switch (op) { + case OP_INSERT: + case OP_UPDATE: + case OP_REPLACE: + valueMap = extractAfterRow(recordRoot); + addDeleteSign(valueMap, false); + break; + case OP_DELETE: + valueMap = extractDeleteRow(recordRoot); + addDeleteSign(valueMap, true); + break; + default: + LOG.error("parse record fail, unknown op {} in {}", op, record); + return null; + } + + return DorisRecord.of( + dorisTableIdentifier, + objectMapper.writeValueAsString(valueMap).getBytes(StandardCharsets.UTF_8)); + } + + public String getCdcTableIdentifier(JsonNode record) { + if (record.get(FIELD_NAMESPACE) == null + || record.get(FIELD_NAMESPACE) instanceof NullNode) { + LOG.error("Failed to get cdc namespace"); + throw new RuntimeException(); + } + JsonNode nameSpace = record.get(FIELD_NAMESPACE); + String db = extractJsonNode(nameSpace, FIELD_DATABASE); + String table = extractJsonNode(nameSpace, FIELD_TABLE); + return SourceSchema.getString(db, null, table); + } + + @Override + public Map<String, Object> extractBeforeRow(JsonNode record) { + return null; + } + + @Override + public Map<String, Object> extractAfterRow(JsonNode recordRoot) { + JsonNode dataNode = recordRoot.get(FIELD_DATA); + Map<String, Object> rowMap = extractRow(dataNode); + String objectId = ((Map<?, ?>) rowMap.get(ID_FIELD)).get(OID_FIELD).toString(); + rowMap.put(ID_FIELD, objectId); + return rowMap; + } + + private Map<String, Object> extractDeleteRow(JsonNode recordRoot) + throws JsonProcessingException { + String documentKey = extractJsonNode(recordRoot, FIELD_DOCUMENT_KEY); + JsonNode jsonNode = objectMapper.readTree(documentKey); + String objectId = extractJsonNode(jsonNode.get(ID_FIELD), OID_FIELD); + Map<String, Object> row = new HashMap<>(); + row.put(ID_FIELD, objectId); + return row; + } + + private Map<String, Object> extractRow(JsonNode recordRow) { + Map<String, Object> recordMap = + objectMapper.convertValue(recordRow, new TypeReference<Map<String, Object>>() {}); + return recordMap != null ? recordMap : new HashMap<>(); + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/MongoJsonDebeziumSchemaChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/MongoJsonDebeziumSchemaChange.java new file mode 100644 index 0000000..69dbf0b --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/MongoJsonDebeziumSchemaChange.java @@ -0,0 +1,196 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.writer.serializer.jsondebezium; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.NullNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.doris.flink.catalog.doris.DorisSystem; +import org.apache.doris.flink.catalog.doris.FieldSchema; +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.exception.DorisRuntimeException; +import org.apache.doris.flink.exception.IllegalArgumentException; +import org.apache.doris.flink.sink.schema.SchemaChangeManager; +import org.apache.doris.flink.tools.cdc.SourceSchema; +import org.apache.doris.flink.tools.cdc.mongodb.MongoDBType; +import org.apache.doris.flink.tools.cdc.mongodb.MongoDateConverter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeUtils.getDorisTableIdentifier; +import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.DATE_FIELD; +import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.DECIMAL_FIELD; +import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.FIELD_DATA; +import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.FIELD_DATABASE; +import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.FIELD_NAMESPACE; +import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.FIELD_TABLE; +import static org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.LONG_FIELD; + +public class MongoJsonDebeziumSchemaChange extends CdcSchemaChange { + + private static final Logger LOG = LoggerFactory.getLogger(MongoJsonDebeziumSchemaChange.class); + + private final ObjectMapper objectMapper; + + private final Map<String, Map<String, String>> tableFields; + + private final SchemaChangeManager schemaChangeManager; + + private final DorisSystem dorisSystem; + + public Map<String, String> tableMapping; + private final DorisOptions dorisOptions; + + private final Set<String> specialFields = + new HashSet<>(Arrays.asList(DATE_FIELD, DECIMAL_FIELD, LONG_FIELD)); + + public MongoJsonDebeziumSchemaChange(JsonDebeziumChangeContext changeContext) { + this.objectMapper = changeContext.getObjectMapper(); + this.dorisOptions = changeContext.getDorisOptions(); + this.tableFields = new HashMap<>(); + this.schemaChangeManager = new SchemaChangeManager(dorisOptions); + this.dorisSystem = new DorisSystem(dorisOptions); + this.tableMapping = changeContext.getTableMapping(); + } + + @Override + public String extractDatabase(JsonNode record) { + return null; + } + + @Override + public String extractTable(JsonNode record) { + return null; + } + + @Override + public boolean schemaChange(JsonNode recordRoot) throws IOException { + JsonNode logData = getFullDocument(recordRoot); + String cdcTableIdentifier = getCdcTableIdentifier(recordRoot); + String dorisTableIdentifier = + getDorisTableIdentifier(cdcTableIdentifier, dorisOptions, tableMapping); + String[] tableInfo = dorisTableIdentifier.split("\\."); + if (tableInfo.length != 2) { + throw new DorisRuntimeException(); + } + String dataBase = tableInfo[0]; + String table = tableInfo[1]; + // build table fields mapping for all record + buildDorisTableFieldsMapping(dataBase, table); + + // Determine whether change stream log and tableField are exactly the same, if not, perform + // schema change + checkAndUpdateSchemaChange(logData, dorisTableIdentifier, dataBase, table); + formatSpecialFieldData(logData); + ((ObjectNode) recordRoot).set(FIELD_DATA, logData); + return true; + } + + private void formatSpecialFieldData(JsonNode logData) { + logData.fieldNames() + .forEachRemaining( + fieldName -> { + JsonNode fieldNode = logData.get(fieldName); + if (fieldNode.isObject() && fieldNode.size() == 1) { + String fieldKey = fieldNode.fieldNames().next(); + if (specialFields.contains(fieldKey)) { + switch (fieldKey) { + case DATE_FIELD: + long timestamp = fieldNode.get(DATE_FIELD).asLong(); + String formattedDate = + MongoDateConverter.convertTimestampToString( + timestamp); + ((ObjectNode) logData).put(fieldName, formattedDate); + break; + case DECIMAL_FIELD: + String numberDecimal = + fieldNode.get(DECIMAL_FIELD).asText(); + ((ObjectNode) logData).put(fieldName, numberDecimal); + break; + + case LONG_FIELD: + long longFiled = fieldNode.get(LONG_FIELD).asLong(); + ((ObjectNode) logData).put(fieldName, longFiled); + break; + } + } + } + }); + } + + private JsonNode getFullDocument(JsonNode recordRoot) { + try { + return objectMapper.readTree(recordRoot.get(FIELD_DATA).asText()); + } catch (IOException e) { + throw new DorisRuntimeException("Failed to parse fullDocument JSON", e); + } + } + + private void checkAndUpdateSchemaChange( + JsonNode logData, String dorisTableIdentifier, String database, String table) { + Map<String, String> tableFieldMap = tableFields.get(dorisTableIdentifier); + logData.fieldNames() + .forEachRemaining( + name -> { + try { + if (!tableFieldMap.containsKey(name)) { + doSchemaChange(name, logData, database, table); + } + } catch (Exception e) { + throw new RuntimeException("Error during schema change", e); + } + }); + } + + private void doSchemaChange( + String logFieldName, JsonNode logData, String database, String table) + throws IOException, IllegalArgumentException { + String dorisType = MongoDBType.jsonNodeToDorisType(logData.get(logFieldName)); + schemaChangeManager.addColumn( + database, table, new FieldSchema(logFieldName, dorisType, null)); + String identifier = database + "." + table; + tableFields.computeIfAbsent(identifier, k -> new HashMap<>()).put(logFieldName, dorisType); + } + + private void buildDorisTableFieldsMapping(String databaseName, String tableName) { + String identifier = databaseName + "." + tableName; + tableFields.computeIfAbsent( + identifier, k -> dorisSystem.getTableFieldNames(databaseName, tableName)); + } + + @Override + public String getCdcTableIdentifier(JsonNode record) { + if (record.get(FIELD_NAMESPACE) == null + || record.get(FIELD_NAMESPACE) instanceof NullNode) { + LOG.error("Failed to get cdc namespace"); + throw new RuntimeException(); + } + JsonNode nameSpace = record.get(FIELD_NAMESPACE); + String table = nameSpace.get(FIELD_TABLE).asText(); + String db = nameSpace.get(FIELD_DATABASE).asText(); + return SourceSchema.getString(db, null, table); + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java index e1a01ab..194ef87 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java @@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; +import org.apache.doris.flink.tools.cdc.mongodb.MongoDBDatabaseSync; import org.apache.doris.flink.tools.cdc.mysql.MysqlDatabaseSync; import org.apache.doris.flink.tools.cdc.oracle.OracleDatabaseSync; import org.apache.doris.flink.tools.cdc.postgres.PostgresDatabaseSync; @@ -40,6 +41,7 @@ public class CdcTools { private static final String ORACLE_SYNC_DATABASE = "oracle-sync-database"; private static final String POSTGRES_SYNC_DATABASE = "postgres-sync-database"; private static final String SQLSERVER_SYNC_DATABASE = "sqlserver-sync-database"; + private static final String MONGODB_SYNC_DATABASE = "mongodb-sync-database"; private static final List<String> EMPTY_KEYS = Collections.singletonList("password"); public static void main(String[] args) throws Exception { @@ -59,6 +61,9 @@ public class CdcTools { case SQLSERVER_SYNC_DATABASE: createSqlServerSyncDatabase(opArgs); break; + case MONGODB_SYNC_DATABASE: + createMongoDBSyncDatabase(opArgs); + break; default: System.out.println("Unknown operation " + operation); System.exit(1); @@ -101,6 +106,15 @@ public class CdcTools { syncDatabase(params, databaseSync, postgresConfig, "SqlServer"); } + private static void createMongoDBSyncDatabase(String[] opArgs) throws Exception { + MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs); + Preconditions.checkArgument(params.has("mongodb-conf")); + Map<String, String> mongoMap = getConfigMap(params, "mongodb-conf"); + Configuration mongoConfig = Configuration.fromMap(mongoMap); + DatabaseSync databaseSync = new MongoDBDatabaseSync(); + syncDatabase(params, databaseSync, mongoConfig, "mongodb"); + } + private static void syncDatabase( MultipleParameterTool params, DatabaseSync databaseSync, diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index 14d3fbb..632edcc 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -82,7 +82,7 @@ public abstract class DatabaseSync { protected String tablePrefix; protected String tableSuffix; protected boolean singleSink; - private final Map<String, String> tableMapping = new HashMap<>(); + protected final Map<String, String> tableMapping = new HashMap<>(); public abstract void registerDriver() throws SQLException; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/ChangeStreamConstant.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/ChangeStreamConstant.java new file mode 100644 index 0000000..f8772c9 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/ChangeStreamConstant.java @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.tools.cdc.mongodb; + +import java.io.Serializable; + +public class ChangeStreamConstant implements Serializable { + private static final long serialVersionUID = 2599456667907755804L; + public static final String ID_FIELD = "_id"; + public static final String OID_FIELD = "$oid"; + public static final String FIELD_TYPE = "operationType"; + public static final String FIELD_DATA = "fullDocument"; + public static final String OP_UPDATE = "update"; + public static final String OP_INSERT = "insert"; + public static final String OP_REPLACE = "replace"; + public static final String OP_DELETE = "delete"; + public static final String FIELD_DATABASE = "db"; + public static final String FIELD_TABLE = "coll"; + public static final String FIELD_NAMESPACE = "ns"; + public static final String FIELD_DOCUMENT_KEY = "documentKey"; + + public static final String DATE_FIELD = "$date"; + + public static final String DECIMAL_FIELD = "$numberDecimal"; + + public static final String LONG_FIELD = "$numberLong"; +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java new file mode 100644 index 0000000..e2e0023 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java @@ -0,0 +1,220 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.tools.cdc.mongodb; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import com.mongodb.ConnectionString; +import com.mongodb.MongoClientSettings; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.MongoIterable; +import com.ververica.cdc.connectors.base.options.SourceOptions; +import com.ververica.cdc.connectors.base.options.StartupOptions; +import com.ververica.cdc.connectors.mongodb.source.MongoDBSource; +import com.ververica.cdc.connectors.mongodb.source.MongoDBSourceBuilder; +import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions; +import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig; +import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; +import org.apache.commons.lang3.StringUtils; +import org.apache.doris.flink.catalog.doris.DataModel; +import org.apache.doris.flink.cfg.DorisExecutionOptions; +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer; +import org.apache.doris.flink.sink.writer.serializer.MongoDBJsonDebeziumSchemaSerializer; +import org.apache.doris.flink.tools.cdc.DatabaseSync; +import org.apache.doris.flink.tools.cdc.ParsingProcessFunction; +import org.apache.doris.flink.tools.cdc.SourceSchema; +import org.bson.Document; + +import javax.annotation.Nullable; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.encodeValue; +import static org.apache.flink.util.Preconditions.checkNotNull; + +public class MongoDBDatabaseSync extends DatabaseSync { + + private static final String INITIAL_MODE = "initial"; + private static final String LATEST_OFFSET_MODE = "latest-offset"; + private static final String TIMESTAMP_MODE = "timestamp"; + public static final ConfigOption<Double> MONGO_CDC_CREATE_SAMPLE_PERCENT = + ConfigOptions.key("schema.sample-percent") + .doubleType() + .defaultValue(0.2) + .withDescription("mongo cdc sample percent"); + + public static final ConfigOption<String> TABLE_NAME = + ConfigOptions.key("table-name") + .stringType() + .noDefaultValue() + .withDescription("Table name of the Mongo database to monitor."); + + public MongoDBDatabaseSync() throws SQLException {} + + @Override + public void registerDriver() throws SQLException {} + + @Override + public Connection getConnection() throws SQLException { + return null; + } + + @Override + public List<SourceSchema> getSchemaList() throws Exception { + String databaseName = config.get(MongoDBSourceOptions.DATABASE); + List<SourceSchema> schemaList = new ArrayList<>(); + MongoClientSettings.Builder settingsBuilder = MongoClientSettings.builder(); + + settingsBuilder.applyConnectionString( + new ConnectionString( + buildConnectionString( + config.get(MongoDBSourceOptions.USERNAME), + config.get(MongoDBSourceOptions.PASSWORD), + config.get(MongoDBSourceOptions.SCHEME), + config.get(MongoDBSourceOptions.HOSTS), + config.get(MongoDBSourceOptions.CONNECTION_OPTIONS)))); + + MongoClientSettings settings = settingsBuilder.build(); + Double samplePercent = config.get(MONGO_CDC_CREATE_SAMPLE_PERCENT); + try (MongoClient mongoClient = MongoClients.create(settings)) { + MongoDatabase mongoDatabase = mongoClient.getDatabase(databaseName); + MongoIterable<String> collectionNames = mongoDatabase.listCollectionNames(); + for (String collectionName : collectionNames) { + if (!isSyncNeeded(collectionName)) { + continue; + } + MongoCollection<Document> collection = mongoDatabase.getCollection(collectionName); + Document firstDocument = collection.find().first(); + if (firstDocument == null) { + throw new IllegalStateException("No documents in collection to infer schema"); + } + + long totalDocuments = collection.countDocuments(); + long sampleSize = (long) Math.ceil(totalDocuments * samplePercent); + ArrayList<Document> documents = sampleData(collection, sampleSize); + MongoDBSchema mongoDBSchema = + new MongoDBSchema(documents, databaseName, collectionName, null); + mongoDBSchema.setModel(DataModel.UNIQUE); + schemaList.add(mongoDBSchema); + } + } + + return schemaList; + } + + private ArrayList<Document> sampleData(MongoCollection<Document> collection, Long sampleNum) { + ArrayList<Document> query = new ArrayList<>(); + query.add(new Document("$sample", new Document("size", sampleNum))); + return collection.aggregate(query).into(new ArrayList<>()); + } + + private static String buildConnectionString( + @Nullable String username, + @Nullable String password, + String scheme, + String hosts, + @Nullable String connectionOptions) { + StringBuilder sb = new StringBuilder(scheme).append("://"); + if (StringUtils.isNotEmpty(username) && StringUtils.isNotEmpty(password)) { + sb.append(encodeValue(username)).append(":").append(encodeValue(password)).append("@"); + } + sb.append(checkNotNull(hosts)); + if (StringUtils.isNotEmpty(connectionOptions)) { + sb.append("/?").append(connectionOptions); + } + return sb.toString(); + } + + @Override + public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env) { + String hosts = config.get(MongoDBSourceOptions.HOSTS); + String username = config.get(MongoDBSourceOptions.USERNAME); + String password = config.get(MongoDBSourceOptions.PASSWORD); + String database = config.get(MongoDBSourceOptions.DATABASE); + String collection = config.get(MongoDBSourceOptions.COLLECTION); + if (StringUtils.isBlank(collection)) { + collection = config.get(TABLE_NAME); + } + MongoDBSourceBuilder<String> mongoDBSourceBuilder = MongoDBSource.builder(); + Map<String, Object> customConverterConfigs = new HashMap<>(); + customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric"); + JsonDebeziumDeserializationSchema schema = + new JsonDebeziumDeserializationSchema(false, customConverterConfigs); + + mongoDBSourceBuilder + .hosts(hosts) + .username(username) + .password(password) + .databaseList(database) + .collectionList(collection); + + String startupMode = config.get(SourceOptions.SCAN_STARTUP_MODE); + switch (startupMode.toLowerCase()) { + case INITIAL_MODE: + mongoDBSourceBuilder.startupOptions(StartupOptions.initial()); + break; + case LATEST_OFFSET_MODE: + mongoDBSourceBuilder.startupOptions(StartupOptions.latest()); + break; + case TIMESTAMP_MODE: + mongoDBSourceBuilder.startupOptions( + StartupOptions.timestamp( + config.get(SourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS))); + break; + default: + throw new IllegalArgumentException("Unsupported startup mode: " + startupMode); + } + MongoDBSource<String> mongoDBSource = mongoDBSourceBuilder.deserializer(schema).build(); + return env.fromSource(mongoDBSource, WatermarkStrategy.noWatermarks(), "MongoDB Source"); + } + + @Override + public ParsingProcessFunction buildProcessFunction() { + return new MongoParsingProcessFunction(converter); + } + + @Override + public DorisRecordSerializer<String> buildSchemaSerializer( + DorisOptions.Builder dorisBuilder, DorisExecutionOptions executionOptions) { + return MongoDBJsonDebeziumSchemaSerializer.builder() + .setDorisOptions(dorisBuilder.build()) + .setExecutionOptions(executionOptions) + .setTableMapping(tableMapping) + .setTableProperties(tableConfig) + .setTargetDatabase(database) + .build(); + } + + @Override + public String getTableListPrefix() { + return config.get(MongoDBSourceOptions.DATABASE); + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java new file mode 100644 index 0000000..2c2e1b4 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.tools.cdc.mongodb; + +import org.apache.flink.api.java.tuple.Tuple2; + +import org.apache.doris.flink.catalog.doris.DorisType; +import org.apache.doris.flink.catalog.doris.FieldSchema; +import org.apache.doris.flink.tools.cdc.SourceSchema; +import org.bson.Document; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.Map; + +public class MongoDBSchema extends SourceSchema { + + public MongoDBSchema( + ArrayList<Document> sampleData, + String databaseName, + String tableName, + String tableComment) + throws Exception { + super(databaseName, null, tableName, tableComment); + fields = new LinkedHashMap<>(); + for (Document data : sampleData) { + processSampleData(data); + } + + primaryKeys = new ArrayList<>(); + primaryKeys.add("_id"); + } + + private void processSampleData(Document sampleData) { + for (Map.Entry<String, Object> entry : sampleData.entrySet()) { + String fieldName = entry.getKey(); + Object value = entry.getValue(); + String dorisType = MongoDBType.toDorisType(value); + if (isDecimalField(fieldName)) { + dorisType = replaceDecimalTypeIfNeeded(fieldName, dorisType); + } + fields.put(fieldName, new FieldSchema(fieldName, dorisType, null)); + } + } + + private boolean isDecimalField(String fieldName) { + FieldSchema existingField = fields.get(fieldName); + return existingField != null && existingField.getTypeString().startsWith(DorisType.DECIMAL); + } + + private String replaceDecimalTypeIfNeeded(String fieldName, String newDorisType) { + FieldSchema existingField = fields.get(fieldName); + if (existingField.getTypeString().startsWith(DorisType.DECIMAL)) { + Tuple2<Integer, Integer> existingPrecisionAndScale = + MongoDBType.getDecimalPrecisionAndScale(existingField.getTypeString()); + int existingPrecision = existingPrecisionAndScale.f0; + int existingScale = existingPrecisionAndScale.f1; + + Tuple2<Integer, Integer> currentPrecisionAndScale = + MongoDBType.getDecimalPrecisionAndScale(newDorisType); + int currentPrecision = currentPrecisionAndScale.f0; + int currentScale = currentPrecisionAndScale.f1; + + int newScale = Math.max(existingScale, currentScale); + int newIntegerPartSize = + Math.max(existingPrecision - existingScale, currentPrecision - currentScale); + int newPrecision = newIntegerPartSize + newScale; + + return DorisType.DECIMAL + "(" + newPrecision + "," + newScale + ")"; + } + return newDorisType; + } + + @Override + public String convertToDorisType(String fieldType, Integer precision, Integer scale) { + return null; + } + + @Override + public String getCdcTableName() { + return databaseName + "\\." + tableName; + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBType.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBType.java new file mode 100644 index 0000000..bee85ce --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBType.java @@ -0,0 +1,126 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.tools.cdc.mongodb; + +import org.apache.flink.api.java.tuple.Tuple2; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.BooleanNode; +import com.fasterxml.jackson.databind.node.DecimalNode; +import com.fasterxml.jackson.databind.node.DoubleNode; +import com.fasterxml.jackson.databind.node.IntNode; +import com.fasterxml.jackson.databind.node.LongNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; +import org.apache.doris.flink.catalog.doris.DorisType; +import org.apache.doris.flink.exception.DorisRuntimeException; +import org.bson.BsonArray; +import org.bson.types.Decimal128; +import org.bson.types.ObjectId; + +import java.math.BigDecimal; +import java.util.Date; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class MongoDBType { + + public static final String DATE_TYPE = "$date"; + public static final String DECIMAL_TYPE = "$numberDecimal"; + public static final String LONG_TYPE = "$numberLong"; + + public static String toDorisType(Object value) { + if (value instanceof Integer) { + return DorisType.INT; + } else if (value instanceof Date) { + return DorisType.DATETIME_V2 + "(3)"; + } else if (value instanceof Long) { + return DorisType.BIGINT; + } else if (value instanceof Double) { + return checkAndRebuildBigDecimal(new BigDecimal(String.valueOf(value))); + } else if (value instanceof Boolean) { + return DorisType.BOOLEAN; + } else if (value instanceof String) { + return DorisType.STRING; + } else if (value instanceof ObjectId) { + return DorisType.VARCHAR + "(30)"; + } else if (value instanceof BsonArray) { + return DorisType.ARRAY; + } else if (value instanceof Decimal128) { + return checkAndRebuildBigDecimal(((Decimal128) value).bigDecimalValue()); + } else { + return DorisType.STRING; + } + } + + public static String jsonNodeToDorisType(JsonNode value) { + if (value instanceof IntNode) { + return DorisType.INT; + } else if (value instanceof TextNode) { + return DorisType.STRING; + } else if (value instanceof LongNode) { + return DorisType.BIGINT; + } else if (value instanceof DoubleNode) { + return DorisType.DOUBLE; + } else if (value instanceof BooleanNode) { + return DorisType.BOOLEAN; + } else if (value instanceof ArrayNode) { + return DorisType.ARRAY; + } else if (value instanceof DecimalNode) { + return checkAndRebuildBigDecimal(value.decimalValue()); + } else if (value instanceof ObjectNode) { + if (value.size() == 1 && value.get(DATE_TYPE) != null) { + return DorisType.DATETIME_V2 + "(3)"; + } else if (value.size() == 1 && value.get(DECIMAL_TYPE) != null) { + return checkAndRebuildBigDecimal(new BigDecimal(value.get(DECIMAL_TYPE).asText())); + } else if (value.size() == 1 && value.get(LONG_TYPE) != null) { + return DorisType.BIGINT; + } else { + return DorisType.STRING; + } + } else { + return DorisType.STRING; + } + } + + public static Tuple2<Integer, Integer> getDecimalPrecisionAndScale(String decimalString) { + // Simplified regular expression to match two numbers in brackets + String regex = "\\((\\d+),(\\d+)\\)"; + Pattern pattern = Pattern.compile(regex); + Matcher matcher = pattern.matcher(decimalString); + + if (matcher.find()) { + Integer precision = Integer.parseInt(matcher.group(1)); + Integer scale = Integer.parseInt(matcher.group(2)); + return new Tuple2<>(precision, scale); + } + throw new DorisRuntimeException("Get Decimal precision and Scale error !"); + } + + public static String checkAndRebuildBigDecimal(BigDecimal decimal) { + if (decimal.scale() < 0) { + decimal = new BigDecimal(decimal.toPlainString()); + } + return decimal.precision() <= 38 + ? String.format( + "%s(%s,%s)", + DorisType.DECIMAL_V3, decimal.precision(), Math.max(decimal.scale(), 0)) + : DorisType.STRING; + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDateConverter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDateConverter.java new file mode 100644 index 0000000..b7a3ec7 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDateConverter.java @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.tools.cdc.mongodb; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; + +public class MongoDateConverter { + private static final ThreadLocal<DateTimeFormatter> dateFormatterThreadLocal = + ThreadLocal.withInitial( + () -> DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS")); + + public static String convertTimestampToString(long timestamp) { + Instant instant = Instant.ofEpochMilli(timestamp); + LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.of("Asia/Shanghai")); + return dateFormatterThreadLocal.get().format(localDateTime); + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunction.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunction.java new file mode 100644 index 0000000..737617a --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoParsingProcessFunction.java @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.tools.cdc.mongodb; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.NullNode; +import org.apache.doris.flink.tools.cdc.DatabaseSync.TableNameConverter; +import org.apache.doris.flink.tools.cdc.ParsingProcessFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MongoParsingProcessFunction extends ParsingProcessFunction { + private static final Logger LOG = LoggerFactory.getLogger(MongoParsingProcessFunction.class); + + public MongoParsingProcessFunction(TableNameConverter converter) { + super(converter); + } + + @Override + protected String getRecordTableName(String record) throws Exception { + JsonNode jsonNode = objectMapper.readValue(record, JsonNode.class); + if (jsonNode.get("ns") == null || jsonNode.get("ns") instanceof NullNode) { + LOG.error("Failed to get cdc namespace"); + throw new RuntimeException(); + } + JsonNode nameSpace = jsonNode.get("ns"); + return extractJsonNode(nameSpace, "coll"); + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMongoSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMongoSyncDatabaseCase.java new file mode 100644 index 0000000..ef66e88 --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMongoSyncDatabaseCase.java @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.tools.cdc; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import org.apache.doris.flink.tools.cdc.mongodb.MongoDBDatabaseSync; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +public class CdcMongoSyncDatabaseCase { + public static void main(String[] args) throws Exception { + Configuration conf = new Configuration(); + // conf.setString(RestOptions.BIND_PORT, "8018"); + // conf.setString("rest.flamegraph.enabled", "true"); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + // StreamExecutionEnvironment env = + // StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); + Map<String, String> flinkMap = new HashMap<>(); + flinkMap.put("execution.checkpointing.interval", "10s"); + flinkMap.put("pipeline.operator-chaining", "false"); + flinkMap.put("parallelism.default", "8"); + + String database = "cdc_test"; + String tablePrefix = ""; + String tableSuffix = ""; + + Configuration configuration = Configuration.fromMap(flinkMap); + env.configure(configuration); + Map<String, String> mongoConfig = new HashMap<>(); + mongoConfig.put("database", "test"); + mongoConfig.put("hosts", "127.0.0.1:27017"); + mongoConfig.put("username", "flinkuser"); + // mysqlConfig.put("password",""); + mongoConfig.put("password", "flinkpwd"); + // mongoConfig.put("scan.startup.mode", "latest-offset"); + mongoConfig.put("scan.startup.mode", "initial"); + mongoConfig.put("schema.sample-percent", "1"); + Configuration config = Configuration.fromMap(mongoConfig); + + Map<String, String> sinkConfig = new HashMap<>(); + sinkConfig.put("fenodes", "127.0.0.1:8030"); + // sinkConfig.put("benodes","10.20.30.1:8040, 10.20.30.2:8040, 10.20.30.3:8040"); + sinkConfig.put("username", "root"); + sinkConfig.put("password", ""); + sinkConfig.put("jdbc-url", "jdbc:mysql://127.0.0.1:9030"); + sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString()); + sinkConfig.put("auto-redirect", "false"); + // sinkConfig.put("sink.enable.batch-mode","true"); + // sinkConfig.put("sink.write-mode","stream_load_batch"); + Configuration sinkConf = Configuration.fromMap(sinkConfig); + + Map<String, String> tableConfig = new HashMap<>(); + tableConfig.put("replication_num", "1"); + tableConfig.put("table-buckets", ".*:1"); + String includingTables = "cdc_test"; + // String includingTables = "a_.*|b_.*|c"; + String excludingTables = ""; + String multiToOneOrigin = "a_.*|b_.*"; + String multiToOneTarget = "a|b"; + boolean ignoreDefaultValue = false; + // boolean useNewSchemaChange = false; + DatabaseSync databaseSync = new MongoDBDatabaseSync(); + databaseSync + .setEnv(env) + .setDatabase(database) + .setConfig(config) + .setTablePrefix(tablePrefix) + .setTableSuffix(tableSuffix) + .setIncludingTables(includingTables) + .setExcludingTables(excludingTables) + .setMultiToOneOrigin(multiToOneOrigin) + .setMultiToOneTarget(multiToOneTarget) + .setIgnoreDefaultValue(ignoreDefaultValue) + .setSinkConfig(sinkConf) + .setTableConfig(tableConfig) + .setCreateTableOnly(false) + // .setSingleSink(true) + // .setNewSchemaChange(useNewSchemaChange) + .create(); + databaseSync.build(); + env.execute(String.format("Mongo-Doris Database Sync: %s", database)); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org