This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new ed4a4c39 [Fix](cdc) Ignore when mongodb schema change fails (#537) ed4a4c39 is described below commit ed4a4c39a6fcd6e287df91c593681c733609765d Author: wudi <676366...@qq.com> AuthorDate: Fri Jan 10 12:08:31 2025 +0800 [Fix](cdc) Ignore when mongodb schema change fails (#537) --- .../serializer/jsondebezium/CdcSchemaChange.java | 5 +-- .../MongoDBJsonDebeziumSchemaSerializer.java | 7 +--- .../serializer/MongoJsonDebeziumSchemaChange.java | 44 ++++++++++++---------- 3 files changed, 29 insertions(+), 27 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcSchemaChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcSchemaChange.java index 858a5eff..ac00017d 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcSchemaChange.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcSchemaChange.java @@ -20,8 +20,6 @@ package org.apache.doris.flink.sink.writer.serializer.jsondebezium; import com.fasterxml.jackson.databind.JsonNode; import org.apache.doris.flink.sink.writer.ChangeEvent; -import java.io.IOException; - /** * When cdc connector captures data changes about source database schema changes, you need to * inherit this class to complete the synchronized changes to Doris schema. Supports data messages @@ -33,7 +31,8 @@ public abstract class CdcSchemaChange implements ChangeEvent { protected abstract String extractTable(JsonNode record); - public abstract boolean schemaChange(JsonNode recordRoot) throws IOException; + /** Schema change */ + public abstract boolean schemaChange(JsonNode recordRoot); protected abstract String getCdcTableIdentifier(JsonNode record); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java index d4a87ff8..296a3727 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java @@ -120,11 +120,8 @@ public class MongoDBJsonDebeziumSchemaSerializer implements DorisRecordSerialize LOG.debug("received debezium json data {} :", record); JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class); String op = getOperateType(recordRoot); - try { - schemaChange.schemaChange(recordRoot); - } catch (Exception e) { - throw new RuntimeException(e); - } + schemaChange.schemaChange(recordRoot); + return dataChange.serialize(record, recordRoot, op); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java index 01eebd45..c3a4a7d8 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java @@ -89,26 +89,32 @@ public class MongoJsonDebeziumSchemaChange extends CdcSchemaChange { } @Override - public boolean schemaChange(JsonNode recordRoot) throws IOException { - JsonNode logData = getFullDocument(recordRoot); - String cdcTableIdentifier = getCdcTableIdentifier(recordRoot); - String dorisTableIdentifier = - getDorisTableIdentifier(cdcTableIdentifier, dorisOptions, tableMapping); - String[] tableInfo = dorisTableIdentifier.split("\\."); - if (tableInfo.length != 2) { - throw new DorisRuntimeException(); + public boolean schemaChange(JsonNode recordRoot) { + try { + JsonNode logData = getFullDocument(recordRoot); + String cdcTableIdentifier = getCdcTableIdentifier(recordRoot); + String dorisTableIdentifier = + getDorisTableIdentifier(cdcTableIdentifier, dorisOptions, tableMapping); + String[] tableInfo = dorisTableIdentifier.split("\\."); + if (tableInfo.length != 2) { + throw new DorisRuntimeException(); + } + String dataBase = tableInfo[0]; + String table = tableInfo[1]; + // build table fields mapping for all record + buildDorisTableFieldsMapping(dataBase, table); + + // Determine whether change stream log and tableField are exactly the same, if not, + // perform + // schema change + checkAndUpdateSchemaChange(logData, dorisTableIdentifier, dataBase, table); + formatSpecialFieldData(logData); + ((ObjectNode) recordRoot).set(FIELD_DATA, logData); + return true; + } catch (Exception ex) { + LOG.warn("schema change error : ", ex); + return false; } - String dataBase = tableInfo[0]; - String table = tableInfo[1]; - // build table fields mapping for all record - buildDorisTableFieldsMapping(dataBase, table); - - // Determine whether change stream log and tableField are exactly the same, if not, perform - // schema change - checkAndUpdateSchemaChange(logData, dorisTableIdentifier, dataBase, table); - formatSpecialFieldData(logData); - ((ObjectNode) recordRoot).set(FIELD_DATA, logData); - return true; } private void formatSpecialFieldData(JsonNode logData) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org