This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f09d028 [Fix] Fix the issue with parsing MongoDB timestamp and array 
types (#547)
7f09d028 is described below

commit 7f09d02831e3c0f595ddb7742aa481e8060a06b5
Author: Qinghuang Xu <781240...@qq.com>
AuthorDate: Mon Feb 10 10:47:59 2025 +0800

    [Fix] Fix the issue with parsing MongoDB timestamp and array types (#547)
---
 .../tools/cdc/mongodb/ChangeStreamConstant.java    |  1 +
 .../doris/flink/tools/cdc/mongodb/MongoDBType.java | 32 ++++++++++++---------
 .../serializer/MongoJsonDebeziumSchemaChange.java  | 10 +++++--
 .../flink/tools/cdc/mongodb/MongoDBSchemaTest.java | 33 ++++++++++++++++++++++
 .../flink/tools/cdc/mongodb/MongoDBTypeTest.java   | 22 +++++++++++----
 5 files changed, 78 insertions(+), 20 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/ChangeStreamConstant.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/ChangeStreamConstant.java
index f8772c9f..4e0d8e53 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/ChangeStreamConstant.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/ChangeStreamConstant.java
@@ -35,6 +35,7 @@ public class ChangeStreamConstant implements Serializable {
     public static final String FIELD_DOCUMENT_KEY = "documentKey";
 
     public static final String DATE_FIELD = "$date";
+    public static final String TIMESTAMP_FIELD = "$timestamp";
 
     public static final String DECIMAL_FIELD = "$numberDecimal";
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBType.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBType.java
index 578a407c..cb4896ab 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBType.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBType.java
@@ -30,26 +30,29 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.fasterxml.jackson.databind.node.TextNode;
 import org.apache.doris.flink.catalog.doris.DorisType;
 import org.apache.doris.flink.exception.DorisRuntimeException;
-import org.bson.BsonArray;
+import org.bson.BsonTimestamp;
 import org.bson.types.Decimal128;
 import org.bson.types.ObjectId;
 
 import java.math.BigDecimal;
 import java.util.Date;
+import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-public class MongoDBType {
-
-    public static final String DATE_TYPE = "$date";
-    public static final String DECIMAL_TYPE = "$numberDecimal";
-    public static final String LONG_TYPE = "$numberLong";
+import static 
org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.DATE_FIELD;
+import static 
org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.DECIMAL_FIELD;
+import static 
org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.LONG_FIELD;
+import static 
org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.TIMESTAMP_FIELD;
 
+public class MongoDBType {
     public static String toDorisType(Object value) {
         if (value instanceof Integer) {
             return DorisType.INT;
         } else if (value instanceof Date) {
             return DorisType.DATETIME_V2 + "(3)";
+        } else if (value instanceof BsonTimestamp) {
+            return DorisType.DATETIME_V2 + "(0)";
         } else if (value instanceof Long) {
             return DorisType.BIGINT;
         } else if (value instanceof Double) {
@@ -60,8 +63,8 @@ public class MongoDBType {
             return DorisType.STRING;
         } else if (value instanceof ObjectId) {
             return DorisType.VARCHAR + "(30)";
-        } else if (value instanceof BsonArray) {
-            return DorisType.ARRAY;
+        } else if (value instanceof List) {
+            return DorisType.ARRAY + "<" + DorisType.STRING + ">";
         } else if (value instanceof Decimal128) {
             return checkAndRebuildBigDecimal(((Decimal128) 
value).bigDecimalValue());
         } else {
@@ -77,19 +80,22 @@ public class MongoDBType {
         } else if (value instanceof LongNode) {
             return DorisType.BIGINT;
         } else if (value instanceof DoubleNode) {
+            // When mongo double is in the JsonNode, it's actually a decimal 
type
             return DorisType.DOUBLE;
         } else if (value instanceof BooleanNode) {
             return DorisType.BOOLEAN;
         } else if (value instanceof ArrayNode) {
-            return DorisType.ARRAY;
+            return DorisType.ARRAY + "<" + DorisType.STRING + ">";
         } else if (value instanceof DecimalNode) {
             return checkAndRebuildBigDecimal(value.decimalValue());
         } else if (value instanceof ObjectNode) {
-            if (value.size() == 1 && value.get(DATE_TYPE) != null) {
+            if (value.size() == 1 && value.get(DATE_FIELD) != null) {
                 return DorisType.DATETIME_V2 + "(3)";
-            } else if (value.size() == 1 && value.get(DECIMAL_TYPE) != null) {
-                return checkAndRebuildBigDecimal(new 
BigDecimal(value.get(DECIMAL_TYPE).asText()));
-            } else if (value.size() == 1 && value.get(LONG_TYPE) != null) {
+            } else if (value.size() == 1 && value.get(TIMESTAMP_FIELD) != 
null) {
+                return DorisType.DATETIME_V2 + "(0)";
+            } else if (value.size() == 1 && value.get(DECIMAL_FIELD) != null) {
+                return checkAndRebuildBigDecimal(new 
BigDecimal(value.get(DECIMAL_FIELD).asText()));
+            } else if (value.size() == 1 && value.get(LONG_FIELD) != null) {
                 return DorisType.BIGINT;
             } else {
                 return DorisType.STRING;
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java
index c3a4a7d8..c67e856e 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java
@@ -50,6 +50,7 @@ import static 
org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.FIEL
 import static 
org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.FIELD_NAMESPACE;
 import static 
org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.FIELD_TABLE;
 import static 
org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.LONG_FIELD;
+import static 
org.apache.doris.flink.tools.cdc.mongodb.ChangeStreamConstant.TIMESTAMP_FIELD;
 
 public class MongoJsonDebeziumSchemaChange extends CdcSchemaChange {
 
@@ -67,7 +68,7 @@ public class MongoJsonDebeziumSchemaChange extends 
CdcSchemaChange {
     private final DorisOptions dorisOptions;
 
     private final Set<String> specialFields =
-            new HashSet<>(Arrays.asList(DATE_FIELD, DECIMAL_FIELD, 
LONG_FIELD));
+            new HashSet<>(Arrays.asList(DATE_FIELD, TIMESTAMP_FIELD, 
DECIMAL_FIELD, LONG_FIELD));
 
     public MongoJsonDebeziumSchemaChange(JsonDebeziumChangeContext 
changeContext) {
         this.objectMapper = changeContext.getObjectMapper();
@@ -127,7 +128,12 @@ public class MongoJsonDebeziumSchemaChange extends 
CdcSchemaChange {
                                 if (specialFields.contains(fieldKey)) {
                                     switch (fieldKey) {
                                         case DATE_FIELD:
-                                            long timestamp = 
fieldNode.get(DATE_FIELD).asLong();
+                                        case TIMESTAMP_FIELD:
+                                            JsonNode jsonNode = 
fieldNode.get(fieldKey);
+                                            long timestamp =
+                                                    
fieldKey.equals(TIMESTAMP_FIELD)
+                                                            ? 
jsonNode.get("t").asLong() * 1000L
+                                                            : 
jsonNode.asLong();
                                             String formattedDate =
                                                     
MongoDateConverter.convertTimestampToString(
                                                             timestamp);
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java
index a9cc8bbc..2f095608 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchemaTest.java
@@ -18,12 +18,16 @@
 package org.apache.doris.flink.tools.cdc.mongodb;
 
 import org.apache.doris.flink.catalog.doris.FieldSchema;
+import org.bson.BsonTimestamp;
 import org.bson.Document;
 import org.bson.types.Decimal128;
+import org.bson.types.ObjectId;
 import org.junit.Test;
 
 import java.math.BigDecimal;
 import java.util.ArrayList;
+import java.util.Date;
+import java.util.LinkedHashMap;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
@@ -40,6 +44,35 @@ public class MongoDBSchemaTest {
         assertEquals("db_TEST\\.test_table", mongoDBSchema.getCdcTableName());
     }
 
+    @Test
+    public void testMongoSampleDataFields() throws Exception {
+        ArrayList<Document> sampleData = new ArrayList<>();
+        sampleData.add(new Document("_id", new 
ObjectId("678643e649a4c9239b04297b")));
+        sampleData.add(new Document("c_string", "Hello, MongoDB!"));
+        sampleData.add(new Document("c_bool", true));
+        sampleData.add(new Document("c_int", 123456));
+        sampleData.add(new Document("c_long", 1234567890123456789L));
+        sampleData.add(new Document("c_double", 123.45));
+        sampleData.add(new Document("c_decimal", new 
Decimal128(BigDecimal.valueOf(12345.67))));
+        sampleData.add(new Document("c_date", new Date(1234567890)));
+        sampleData.add(new Document("c_timestamp", new 
BsonTimestamp(1334567890)));
+        Map<String, String> map = new LinkedHashMap<>();
+        map.put("key1", "value1");
+        map.put("key2", "value2");
+        sampleData.add(new Document("c_object", new Document(map)));
+        ArrayList<Object> array = new ArrayList<>();
+        array.add("str1");
+        array.add("str2");
+        array.add(789);
+        sampleData.add(new Document("c_array", array));
+
+        MongoDBSchema mongoDBSchema = new MongoDBSchema(sampleData, "db_TEST", 
"test_table", "");
+
+        assertEquals(
+                "{_id=FieldSchema{name='_id', typeString='VARCHAR(30)', 
defaultValue='null', comment='null'}, c_string=FieldSchema{name='c_string', 
typeString='STRING', defaultValue='null', comment='null'}, 
c_bool=FieldSchema{name='c_bool', typeString='BOOLEAN', defaultValue='null', 
comment='null'}, c_int=FieldSchema{name='c_int', typeString='INT', 
defaultValue='null', comment='null'}, c_long=FieldSchema{name='c_long', 
typeString='BIGINT', defaultValue='null', comment='null'}, c_double=F [...]
+                mongoDBSchema.getFields().toString());
+    }
+
     @Test
     public void replaceDecimalTypeIfNeededTest1() throws Exception {
         ArrayList<Document> documents = new ArrayList<>();
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBTypeTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBTypeTest.java
index ee511ce2..4e273ab8 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBTypeTest.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBTypeTest.java
@@ -29,6 +29,9 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.fasterxml.jackson.databind.node.TextNode;
 import org.apache.doris.flink.catalog.doris.DorisType;
 import org.bson.BsonArray;
+import org.bson.BsonInt64;
+import org.bson.BsonString;
+import org.bson.BsonTimestamp;
 import org.bson.types.Decimal128;
 import org.bson.types.ObjectId;
 import org.junit.Test;
@@ -46,6 +49,7 @@ public class MongoDBTypeTest {
     public void toDorisType() {
         assertEquals(DorisType.INT, MongoDBType.toDorisType(new Integer(123)));
         assertEquals(DorisType.DATETIME_V2 + "(3)", 
MongoDBType.toDorisType(new Date()));
+        assertEquals(DorisType.DATETIME_V2 + "(0)", 
MongoDBType.toDorisType(new BsonTimestamp()));
         assertEquals(DorisType.BIGINT, MongoDBType.toDorisType(new 
Long(1234567891)));
         assertEquals("DECIMALV3(6,2)", MongoDBType.toDorisType(new 
Double("1234.56")));
         assertEquals(DorisType.BOOLEAN, MongoDBType.toDorisType(new 
Boolean(true)));
@@ -53,10 +57,13 @@ public class MongoDBTypeTest {
         assertEquals(
                 DorisType.VARCHAR + "(30)",
                 MongoDBType.toDorisType(new 
ObjectId("66583533791a67a6f8c5a339")));
-        assertEquals(DorisType.ARRAY, MongoDBType.toDorisType(new 
BsonArray()));
         assertEquals(
                 "DECIMALV3(10,5)",
                 MongoDBType.toDorisType(new Decimal128(new 
BigDecimal("12345.55555"))));
+        BsonArray bsonArray = new BsonArray();
+        bsonArray.add(new BsonString("string"));
+        bsonArray.add(new BsonInt64(123456789));
+        assertEquals(DorisType.ARRAY + "<STRING>", 
MongoDBType.toDorisType(bsonArray));
     }
 
     @Test
@@ -67,22 +74,27 @@ public class MongoDBTypeTest {
         assertEquals(DorisType.DOUBLE, MongoDBType.jsonNodeToDorisType(new 
DoubleNode(1234.23)));
         assertEquals(DorisType.BOOLEAN, 
MongoDBType.jsonNodeToDorisType(BooleanNode.TRUE));
         assertEquals(
-                DorisType.ARRAY,
+                DorisType.ARRAY + "<STRING>",
                 
MongoDBType.jsonNodeToDorisType(JsonNodeFactory.instance.arrayNode()));
         assertEquals(
                 "DECIMALV3(6,2)",
                 MongoDBType.jsonNodeToDorisType(new DecimalNode(new 
BigDecimal("1234.23"))));
 
         ObjectNode dateJsonNodes = JsonNodeFactory.instance.objectNode();
-        dateJsonNodes.put(MongoDBType.DATE_TYPE, "");
+        dateJsonNodes.put(ChangeStreamConstant.DATE_FIELD, "");
         assertEquals(DorisType.DATETIME_V2 + "(3)", 
MongoDBType.jsonNodeToDorisType(dateJsonNodes));
 
+        ObjectNode timestampJsonNodes = JsonNodeFactory.instance.objectNode();
+        timestampJsonNodes.put(ChangeStreamConstant.TIMESTAMP_FIELD, "");
+        assertEquals(
+                DorisType.DATETIME_V2 + "(0)", 
MongoDBType.jsonNodeToDorisType(timestampJsonNodes));
+
         ObjectNode decimalJsonNodes = JsonNodeFactory.instance.objectNode();
-        decimalJsonNodes.put(MongoDBType.DECIMAL_TYPE, "1234.23");
+        decimalJsonNodes.put(ChangeStreamConstant.DECIMAL_FIELD, "1234.23");
         assertEquals("DECIMALV3(6,2)", 
MongoDBType.jsonNodeToDorisType(decimalJsonNodes));
 
         ObjectNode longJsonNodes = JsonNodeFactory.instance.objectNode();
-        longJsonNodes.put(MongoDBType.LONG_TYPE, "1234234466");
+        longJsonNodes.put(ChangeStreamConstant.LONG_FIELD, "1234234466");
         assertEquals(DorisType.BIGINT, 
MongoDBType.jsonNodeToDorisType(longJsonNodes));
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to