This is an automated email from the ASF dual-hosted git repository. jiafengzheng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 5ad474a fix debeziumSourcefunction ddl parse (#201) 5ad474a is described below commit 5ad474a0c15c097f0b09ab1a23f86048a7647082 Author: wudi <676366...@qq.com> AuthorDate: Thu Sep 28 10:11:57 2023 +0800 fix debeziumSourcefunction ddl parse (#201) --- .../sink/writer/JsonDebeziumSchemaSerializer.java | 26 ++++++++++++++-------- .../flink/tools/cdc/oracle/OracleDatabaseSync.java | 1 + 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java index 38ae229..b2d88c6 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java @@ -49,6 +49,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -197,7 +198,7 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin String ddlSql = ddlSqlList.get(i); boolean doSchemaChange = checkSchemaChange(ddlSchema); status = doSchemaChange && execSchemaChange(ddlSql); - LOG.info("schema change status:{}", status); + LOG.info("schema change status:{}, ddl:{}", status, ddlSql); } } catch (Exception ex) { LOG.warn("schema change error :", ex); @@ -220,12 +221,14 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin @VisibleForTesting public List<String> extractDDLList(JsonNode record) throws JsonProcessingException { - JsonNode historyRecord = objectMapper.readTree(extractJsonNode(record, "historyRecord")); + JsonNode historyRecord = extractHistoryRecord(record); JsonNode tableChanges = historyRecord.get("tableChanges"); - JsonNode tableChange = tableChanges.get(0); String ddl = extractJsonNode(historyRecord, "ddl"); + if(Objects.isNull(tableChanges) || Objects.isNull(ddl)){ + return new ArrayList<>(); + } LOG.debug("received debezium ddl :{}", ddl); - + JsonNode tableChange = tableChanges.get(0); Matcher matcher = addDropDDLPattern.matcher(ddl); if (Objects.isNull(tableChange)|| !tableChange.get("type").asText().equals("ALTER") || !matcher.find()) { return null; @@ -388,12 +391,17 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin return recordMap != null ? recordMap : new HashMap<>(); } - public String extractDDL(JsonNode record) throws JsonProcessingException { - String historyRecord = extractJsonNode(record, "historyRecord"); - if (Objects.isNull(historyRecord)) { - return null; + private JsonNode extractHistoryRecord(JsonNode record) throws JsonProcessingException { + if(record.has("historyRecord")){ + return objectMapper.readTree(record.get("historyRecord").asText()); } - String ddl = extractJsonNode(objectMapper.readTree(historyRecord), "ddl"); + //The ddl passed by some scenes will not be included in the historyRecord, such as DebeziumSourceFunction + return record; + } + + public String extractDDL(JsonNode record) throws JsonProcessingException { + JsonNode historyRecord = extractHistoryRecord(record); + String ddl = extractJsonNode(historyRecord, "ddl"); LOG.debug("received debezium ddl :{}", ddl); if (!Objects.isNull(ddl)) { //filter add/drop operation diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java index bfd974f..0049579 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java @@ -162,6 +162,7 @@ public class OracleDatabaseSync extends DatabaseSync { .tableList(schemaName + "." + tableName) .username(username) .password(password) + .includeSchemaChanges(true) .startupOptions(startupOptions) .deserializer(schema) .debeziumProperties(debeziumProperties) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org