This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new a24898c5 [fix](cdc)fix mongodb sync cause ClassCastException when user specifies the `_id` field manually (#410) a24898c5 is described below commit a24898c5d1952dea50a9e6c1b2d2c41b8bc00b39 Author: North Lin <37775475+qg-...@users.noreply.github.com> AuthorDate: Tue Jul 2 11:07:40 2024 +0800 [fix](cdc)fix mongodb sync cause ClassCastException when user specifies the `_id` field manually (#410) --- .../flink/tools/cdc/mongodb/MongoDBDatabaseSync.java | 5 ++++- .../mongodb/serializer/MongoJsonDebeziumDataChange.java | 16 ++++++++++++++-- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java index 7c0b6706..fe7f33d0 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java @@ -133,7 +133,8 @@ public class MongoDBDatabaseSync extends DatabaseSync { private ArrayList<Document> sampleData(MongoCollection<Document> collection, Long sampleNum) { ArrayList<Document> query = new ArrayList<>(); query.add(new Document("$sample", new Document("size", sampleNum))); - return collection.aggregate(query).into(new ArrayList<>()); + // allowDiskUse to avoid mongo 'Sort exceeded memory limit' error + return collection.aggregate(query).allowDiskUse(true).into(new ArrayList<>()); } private static String buildConnectionString( @@ -159,6 +160,8 @@ public class MongoDBDatabaseSync extends DatabaseSync { String username = config.get(MongoDBSourceOptions.USERNAME); String password = config.get(MongoDBSourceOptions.PASSWORD); String database = config.get(MongoDBSourceOptions.DATABASE); + // note: just to unify job name, no other use. + config.setString("database-name", database); String collection = config.get(MongoDBSourceOptions.COLLECTION); if (StringUtils.isBlank(collection)) { collection = config.get(TABLE_NAME); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java index 4b20ebd6..8048e38a 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java @@ -126,7 +126,13 @@ public class MongoJsonDebeziumDataChange extends CdcDataChange implements Change public Map<String, Object> extractAfterRow(JsonNode recordRoot) { JsonNode dataNode = recordRoot.get(FIELD_DATA); Map<String, Object> rowMap = extractRow(dataNode); - String objectId = ((Map<?, ?>) rowMap.get(ID_FIELD)).get(OID_FIELD).toString(); + String objectId; + // if user specifies the `_id` field manually, the $oid field may not exist + if (rowMap.get(ID_FIELD) instanceof Map<?, ?>) { + objectId = ((Map<?, ?>) rowMap.get(ID_FIELD)).get(OID_FIELD).toString(); + } else { + objectId = rowMap.get(ID_FIELD).toString(); + } rowMap.put(ID_FIELD, objectId); return rowMap; } @@ -135,7 +141,13 @@ public class MongoJsonDebeziumDataChange extends CdcDataChange implements Change throws JsonProcessingException { String documentKey = extractJsonNode(recordRoot, FIELD_DOCUMENT_KEY); JsonNode jsonNode = objectMapper.readTree(documentKey); - String objectId = extractJsonNode(jsonNode.get(ID_FIELD), OID_FIELD); + String objectId; + // if user specifies the `_id` field manually, the $oid field may not exist + if (jsonNode.get(ID_FIELD).has(OID_FIELD)) { + objectId = extractJsonNode(jsonNode.get(ID_FIELD), OID_FIELD); + } else { + objectId = jsonNode.get(ID_FIELD).asText(); + } Map<String, Object> row = new HashMap<>(); row.put(ID_FIELD, objectId); return row; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org