This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new ed4a4c39 [Fix](cdc) Ignore when mongodb schema change fails (#537)
ed4a4c39 is described below

commit ed4a4c39a6fcd6e287df91c593681c733609765d
Author: wudi <676366...@qq.com>
AuthorDate: Fri Jan 10 12:08:31 2025 +0800

    [Fix](cdc) Ignore when mongodb schema change fails (#537)
---
 .../serializer/jsondebezium/CdcSchemaChange.java   |  5 +--
 .../MongoDBJsonDebeziumSchemaSerializer.java       |  7 +---
 .../serializer/MongoJsonDebeziumSchemaChange.java  | 44 ++++++++++++----------
 3 files changed, 29 insertions(+), 27 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcSchemaChange.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcSchemaChange.java
index 858a5eff..ac00017d 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcSchemaChange.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcSchemaChange.java
@@ -20,8 +20,6 @@ package 
org.apache.doris.flink.sink.writer.serializer.jsondebezium;
 import com.fasterxml.jackson.databind.JsonNode;
 import org.apache.doris.flink.sink.writer.ChangeEvent;
 
-import java.io.IOException;
-
 /**
  * When cdc connector captures data changes about source database schema 
changes, you need to
  * inherit this class to complete the synchronized changes to Doris schema. 
Supports data messages
@@ -33,7 +31,8 @@ public abstract class CdcSchemaChange implements ChangeEvent {
 
     protected abstract String extractTable(JsonNode record);
 
-    public abstract boolean schemaChange(JsonNode recordRoot) throws 
IOException;
+    /** Schema change */
+    public abstract boolean schemaChange(JsonNode recordRoot);
 
     protected abstract String getCdcTableIdentifier(JsonNode record);
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java
index d4a87ff8..296a3727 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java
@@ -120,11 +120,8 @@ public class MongoDBJsonDebeziumSchemaSerializer 
implements DorisRecordSerialize
         LOG.debug("received debezium json data {} :", record);
         JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);
         String op = getOperateType(recordRoot);
-        try {
-            schemaChange.schemaChange(recordRoot);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
+        schemaChange.schemaChange(recordRoot);
+
         return dataChange.serialize(record, recordRoot, op);
     }
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java
index 01eebd45..c3a4a7d8 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java
@@ -89,26 +89,32 @@ public class MongoJsonDebeziumSchemaChange extends 
CdcSchemaChange {
     }
 
     @Override
-    public boolean schemaChange(JsonNode recordRoot) throws IOException {
-        JsonNode logData = getFullDocument(recordRoot);
-        String cdcTableIdentifier = getCdcTableIdentifier(recordRoot);
-        String dorisTableIdentifier =
-                getDorisTableIdentifier(cdcTableIdentifier, dorisOptions, 
tableMapping);
-        String[] tableInfo = dorisTableIdentifier.split("\\.");
-        if (tableInfo.length != 2) {
-            throw new DorisRuntimeException();
+    public boolean schemaChange(JsonNode recordRoot) {
+        try {
+            JsonNode logData = getFullDocument(recordRoot);
+            String cdcTableIdentifier = getCdcTableIdentifier(recordRoot);
+            String dorisTableIdentifier =
+                    getDorisTableIdentifier(cdcTableIdentifier, dorisOptions, 
tableMapping);
+            String[] tableInfo = dorisTableIdentifier.split("\\.");
+            if (tableInfo.length != 2) {
+                throw new DorisRuntimeException();
+            }
+            String dataBase = tableInfo[0];
+            String table = tableInfo[1];
+            // build table fields mapping for all record
+            buildDorisTableFieldsMapping(dataBase, table);
+
+            // Determine whether change stream log and tableField are exactly 
the same, if not,
+            // perform
+            // schema change
+            checkAndUpdateSchemaChange(logData, dorisTableIdentifier, 
dataBase, table);
+            formatSpecialFieldData(logData);
+            ((ObjectNode) recordRoot).set(FIELD_DATA, logData);
+            return true;
+        } catch (Exception ex) {
+            LOG.warn("schema change error : ", ex);
+            return false;
         }
-        String dataBase = tableInfo[0];
-        String table = tableInfo[1];
-        // build table fields mapping for all record
-        buildDorisTableFieldsMapping(dataBase, table);
-
-        // Determine whether change stream log and tableField are exactly the 
same, if not, perform
-        // schema change
-        checkAndUpdateSchemaChange(logData, dorisTableIdentifier, dataBase, 
table);
-        formatSpecialFieldData(logData);
-        ((ObjectNode) recordRoot).set(FIELD_DATA, logData);
-        return true;
     }
 
     private void formatSpecialFieldData(JsonNode logData) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to