This is an automated email from the ASF dual-hosted git repository.

jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ad474a  fix debeziumSourcefunction ddl parse (#201)
5ad474a is described below

commit 5ad474a0c15c097f0b09ab1a23f86048a7647082
Author: wudi <676366...@qq.com>
AuthorDate: Thu Sep 28 10:11:57 2023 +0800

    fix debeziumSourcefunction ddl parse (#201)
---
 .../sink/writer/JsonDebeziumSchemaSerializer.java  | 26 ++++++++++++++--------
 .../flink/tools/cdc/oracle/OracleDatabaseSync.java |  1 +
 2 files changed, 18 insertions(+), 9 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
index 38ae229..b2d88c6 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
@@ -49,6 +49,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -197,7 +198,7 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
                 String ddlSql = ddlSqlList.get(i);
                 boolean doSchemaChange = checkSchemaChange(ddlSchema);
                 status = doSchemaChange && execSchemaChange(ddlSql);
-                LOG.info("schema change status:{}", status);
+                LOG.info("schema change status:{}, ddl:{}", status, ddlSql);
             }
         } catch (Exception ex) {
             LOG.warn("schema change error :", ex);
@@ -220,12 +221,14 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
 
     @VisibleForTesting
     public List<String> extractDDLList(JsonNode record) throws 
JsonProcessingException {
-        JsonNode historyRecord = objectMapper.readTree(extractJsonNode(record, 
"historyRecord"));
+        JsonNode historyRecord = extractHistoryRecord(record);
         JsonNode tableChanges = historyRecord.get("tableChanges");
-        JsonNode tableChange = tableChanges.get(0);
         String ddl = extractJsonNode(historyRecord, "ddl");
+        if(Objects.isNull(tableChanges) || Objects.isNull(ddl)){
+            return new ArrayList<>();
+        }
         LOG.debug("received debezium ddl :{}", ddl);
-
+        JsonNode tableChange = tableChanges.get(0);
         Matcher matcher = addDropDDLPattern.matcher(ddl);
         if (Objects.isNull(tableChange)|| 
!tableChange.get("type").asText().equals("ALTER") || !matcher.find()) {
             return null;
@@ -388,12 +391,17 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
         return recordMap != null ? recordMap : new HashMap<>();
     }
 
-    public String extractDDL(JsonNode record) throws JsonProcessingException {
-        String historyRecord = extractJsonNode(record, "historyRecord");
-        if (Objects.isNull(historyRecord)) {
-            return null;
+    private JsonNode extractHistoryRecord(JsonNode record) throws 
JsonProcessingException {
+        if(record.has("historyRecord")){
+            return objectMapper.readTree(record.get("historyRecord").asText());
         }
-        String ddl = extractJsonNode(objectMapper.readTree(historyRecord), 
"ddl");
+        //The ddl passed by some scenes will not be included in the 
historyRecord, such as DebeziumSourceFunction
+        return record;
+    }
+
+    public String extractDDL(JsonNode record) throws JsonProcessingException {
+        JsonNode historyRecord = extractHistoryRecord(record);
+        String ddl = extractJsonNode(historyRecord, "ddl");
         LOG.debug("received debezium ddl :{}", ddl);
         if (!Objects.isNull(ddl)) {
             //filter add/drop operation
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
index bfd974f..0049579 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/oracle/OracleDatabaseSync.java
@@ -162,6 +162,7 @@ public class OracleDatabaseSync extends DatabaseSync {
                     .tableList(schemaName + "." + tableName)
                     .username(username)
                     .password(password)
+                    .includeSchemaChanges(true)
                     .startupOptions(startupOptions)
                     .deserializer(schema)
                     .debeziumProperties(debeziumProperties)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to