This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new a24898c5 [fix](cdc)fix mongodb sync cause ClassCastException when user 
specifies the `_id` field manually (#410)
a24898c5 is described below

commit a24898c5d1952dea50a9e6c1b2d2c41b8bc00b39
Author: North Lin <37775475+qg-...@users.noreply.github.com>
AuthorDate: Tue Jul 2 11:07:40 2024 +0800

    [fix](cdc)fix mongodb sync cause ClassCastException when user specifies the 
`_id` field manually (#410)
---
 .../flink/tools/cdc/mongodb/MongoDBDatabaseSync.java     |  5 ++++-
 .../mongodb/serializer/MongoJsonDebeziumDataChange.java  | 16 ++++++++++++++--
 2 files changed, 18 insertions(+), 3 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
index 7c0b6706..fe7f33d0 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java
@@ -133,7 +133,8 @@ public class MongoDBDatabaseSync extends DatabaseSync {
     private ArrayList<Document> sampleData(MongoCollection<Document> 
collection, Long sampleNum) {
         ArrayList<Document> query = new ArrayList<>();
         query.add(new Document("$sample", new Document("size", sampleNum)));
-        return collection.aggregate(query).into(new ArrayList<>());
+        // allowDiskUse to avoid mongo 'Sort exceeded memory limit' error
+        return collection.aggregate(query).allowDiskUse(true).into(new 
ArrayList<>());
     }
 
     private static String buildConnectionString(
@@ -159,6 +160,8 @@ public class MongoDBDatabaseSync extends DatabaseSync {
         String username = config.get(MongoDBSourceOptions.USERNAME);
         String password = config.get(MongoDBSourceOptions.PASSWORD);
         String database = config.get(MongoDBSourceOptions.DATABASE);
+        // note: just to unify job name, no other use.
+        config.setString("database-name", database);
         String collection = config.get(MongoDBSourceOptions.COLLECTION);
         if (StringUtils.isBlank(collection)) {
             collection = config.get(TABLE_NAME);
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java
index 4b20ebd6..8048e38a 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java
@@ -126,7 +126,13 @@ public class MongoJsonDebeziumDataChange extends 
CdcDataChange implements Change
     public Map<String, Object> extractAfterRow(JsonNode recordRoot) {
         JsonNode dataNode = recordRoot.get(FIELD_DATA);
         Map<String, Object> rowMap = extractRow(dataNode);
-        String objectId = ((Map<?, ?>) 
rowMap.get(ID_FIELD)).get(OID_FIELD).toString();
+        String objectId;
+        // if user specifies the `_id` field manually, the $oid field may not 
exist
+        if (rowMap.get(ID_FIELD) instanceof Map<?, ?>) {
+            objectId = ((Map<?, ?>) 
rowMap.get(ID_FIELD)).get(OID_FIELD).toString();
+        } else {
+            objectId = rowMap.get(ID_FIELD).toString();
+        }
         rowMap.put(ID_FIELD, objectId);
         return rowMap;
     }
@@ -135,7 +141,13 @@ public class MongoJsonDebeziumDataChange extends 
CdcDataChange implements Change
             throws JsonProcessingException {
         String documentKey = extractJsonNode(recordRoot, FIELD_DOCUMENT_KEY);
         JsonNode jsonNode = objectMapper.readTree(documentKey);
-        String objectId = extractJsonNode(jsonNode.get(ID_FIELD), OID_FIELD);
+        String objectId;
+        // if user specifies the `_id` field manually, the $oid field may not 
exist
+        if (jsonNode.get(ID_FIELD).has(OID_FIELD)) {
+            objectId = extractJsonNode(jsonNode.get(ID_FIELD), OID_FIELD);
+        } else {
+            objectId = jsonNode.get(ID_FIELD).asText();
+        }
         Map<String, Object> row = new HashMap<>();
         row.put(ID_FIELD, objectId);
         return row;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to