This is an automated email from the ASF dual-hosted git repository.

jiafengzheng pushed a commit to branch release-1.2.1
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/release-1.2.1 by this push:
     new 5e26635  [improve] Schema change does not require db and tbl 
consistency (#87) (#89)
5e26635 is described below

commit 5e2663502511dba55f3b761ef79eeff44f537762
Author: wudi <676366...@qq.com>
AuthorDate: Wed Nov 30 11:31:36 2022 +0800

    [improve] Schema change does not require db and tbl consistency (#87) (#89)
    
    * Optimizing schema changes
---
 .../sink/writer/JsonDebeziumSchemaSerializer.java  | 89 +++++++++++++++-------
 .../writer/TestJsonDebeziumSchemaSerializer.java   |  4 +-
 2 files changed, 65 insertions(+), 28 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
index c3fe987..bd685ef 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
@@ -25,6 +25,7 @@ import org.apache.commons.codec.binary.Base64;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.sink.HttpGetWithEntity;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.StringUtils;
 import org.apache.http.HttpHeaders;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpPost;
@@ -56,14 +57,23 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
     private static final String OP_UPDATE = "u"; // update
     private static final String OP_DELETE = "d"; // delete
 
-    private static final String addDropDDLRegex = 
"ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+COLUMN\\s+([^\\s]+).*";
+    public static final String EXECUTE_DDL = "ALTER TABLE %s %s COLUMN %s %s"; 
//alter table tbl add cloumn aca int
+    private static final String addDropDDLRegex = 
"ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+(COLUMN\\s+)?([^\\s]+)(\\s+([^\\s]+))?.*";
     private final Pattern addDropDDLPattern;
     private DorisOptions dorisOptions;
     private ObjectMapper objectMapper = new ObjectMapper();
+    private String database;
+    private String table;
+    //table name of the cdc upstream, format is db.tbl
+    private String sourceTableName;
 
-    public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions, Pattern 
pattern) {
+    public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions, Pattern 
pattern, String sourceTableName) {
         this.dorisOptions = dorisOptions;
         this.addDropDDLPattern = pattern == null ? 
Pattern.compile(addDropDDLRegex, Pattern.CASE_INSENSITIVE) : pattern;
+        String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
+        this.database = tableInfo[0];
+        this.table = tableInfo[1];
+        this.sourceTableName = sourceTableName;
     }
 
     @Override
@@ -97,8 +107,16 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
     public boolean schemaChange(JsonNode recordRoot) {
         boolean status = false;
         try{
-            boolean doSchemaChange = checkSchemaChange(recordRoot);
-            status = doSchemaChange && execSchemaChange(recordRoot);
+            if(!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && 
!checkTable(recordRoot)){
+                return false;
+            }
+            String ddl = extractDDL(recordRoot);
+            if(StringUtils.isNullOrWhitespaceOnly(ddl)){
+                LOG.info("ddl can not do schema change:{}", recordRoot);
+                return false;
+            }
+            boolean doSchemaChange = checkSchemaChange(ddl);
+            status = doSchemaChange && execSchemaChange(ddl);
             LOG.info("schema change status:{}", status);
         }catch (Exception ex){
             LOG.warn("schema change error :", ex);
@@ -106,6 +124,16 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
         return status;
     }
 
+    /**
+     * When cdc synchronizes multiple tables, it will capture multiple table 
schema changes
+     */
+    protected boolean checkTable(JsonNode recordRoot) {
+        String db = extractDatabase(recordRoot);
+        String tbl = extractTable(recordRoot);
+        String dbTbl = db + "." + tbl;
+        return sourceTableName.equals(dbTbl);
+    }
+
     private void addDeleteSign(Map<String, String> valueMap, boolean delete) {
         if(delete){
             valueMap.put(DORIS_DELETE_SIGN, "1");
@@ -114,11 +142,9 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
         }
     }
 
-    private boolean checkSchemaChange(JsonNode record) throws IOException {
-        String database = extractDatabase(record);
-        String table = extractTable(record);
+    private boolean checkSchemaChange(String ddl) throws IOException {
         String requestUrl = String.format(CHECK_SCHEMA_CHANGE_API, 
dorisOptions.getFenodes(), database, table);
-        Map<String,Object> param = buildRequestParam(record);
+        Map<String,Object> param = buildRequestParam(ddl);
         if(param.size() != 2){
             return false;
         }
@@ -139,27 +165,21 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
      * "columnName" : "column"
      * }
      */
-    private Map<String, Object> buildRequestParam(JsonNode record) throws 
JsonProcessingException {
+    protected Map<String, Object> buildRequestParam(String ddl) {
         Map<String,Object> params = new HashMap<>();
-        String ddl = extractDDL(record);
-        if(ddl == null){
-            return params;
-        }
         Matcher matcher = addDropDDLPattern.matcher(ddl);
         if(matcher.find()){
             String op = matcher.group(1);
-            String col = matcher.group(2);
+            String col = matcher.group(3);
             params.put("isDropColumn", op.equalsIgnoreCase("DROP"));
             params.put("columnName", col);
         }
         return params;
     }
 
-    private boolean execSchemaChange(JsonNode record) throws IOException {
-        String extractDDL = extractDDL(record);
+    private boolean execSchemaChange(String ddl) throws IOException {
         Map<String, String> param = new HashMap<>();
-        param.put("stmt", extractDDL);
-        String database = extractDatabase(record);
+        param.put("stmt", ddl);
         String requestUrl = String.format(SCHEMA_CHANGE_API, 
dorisOptions.getFenodes(), database);
         HttpPost httpPost = new HttpPost(requestUrl);
         httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
@@ -169,15 +189,20 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
         return success;
     }
 
-    private String extractDatabase(JsonNode record) {
-        return extractJsonNode(record.get("source"), "db");
+    protected String extractDatabase(JsonNode record) {
+        if(record.get("source").has("schema")){
+            //compatible with schema
+            return extractJsonNode(record.get("source"), "schema");
+        }else{
+            return extractJsonNode(record.get("source"), "db");
+        }
     }
 
-    private String extractTable(JsonNode record) {
+    protected String extractTable(JsonNode record) {
         return extractJsonNode(record.get("source"), "table");
     }
 
-    private boolean handleResponse(HttpUriRequest request) throws IOException {
+    private boolean handleResponse(HttpUriRequest request) {
         try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
             CloseableHttpResponse response = httpclient.execute(request);
             final int statusCode = response.getStatusLine().getStatusCode();
@@ -215,20 +240,26 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
         return recordMap != null ? recordMap : new HashMap<>();
     }
 
-    @VisibleForTesting
     public String extractDDL(JsonNode record) throws JsonProcessingException {
         String historyRecord = extractJsonNode(record, "historyRecord");
         if (Objects.isNull(historyRecord)) {
             return null;
         }
         String ddl = extractJsonNode(objectMapper.readTree(historyRecord), 
"ddl");
+        LOG.debug("received debezium ddl :{}", ddl);
         if (!Objects.isNull(ddl)) {
             //filter add/drop operation
-            if (addDropDDLPattern.matcher(ddl).matches()) {
+            Matcher matcher = addDropDDLPattern.matcher(ddl);
+            if(matcher.find()){
+                String op = matcher.group(1);
+                String col = matcher.group(3);
+                String type = matcher.group(5);
+                type = type == null ? "" : type;
+                ddl = String.format(EXECUTE_DDL, 
dorisOptions.getTableIdentifier(), op, col, type);
+                LOG.info("parse ddl:{}", ddl);
                 return ddl;
             }
         }
-        LOG.info("parse ddl:{}", ddl);
         return null;
     }
 
@@ -246,6 +277,7 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
     public static class Builder {
         private DorisOptions dorisOptions;
         private Pattern addDropDDLPattern;
+        private String sourceTableName;
 
         public JsonDebeziumSchemaSerializer.Builder 
setDorisOptions(DorisOptions dorisOptions) {
             this.dorisOptions = dorisOptions;
@@ -257,8 +289,13 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
             return this;
         }
 
+        public JsonDebeziumSchemaSerializer.Builder setSourceTableName(String 
sourceTableName) {
+            this.sourceTableName = sourceTableName;
+            return this;
+        }
+
         public JsonDebeziumSchemaSerializer build() {
-            return new JsonDebeziumSchemaSerializer(dorisOptions, 
addDropDDLPattern);
+            return new JsonDebeziumSchemaSerializer(dorisOptions, 
addDropDDLPattern, sourceTableName);
         }
     }
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
index 0d35ae3..ed5c37f 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
@@ -100,8 +100,8 @@ public class TestJsonDebeziumSchemaSerializer {
 
     @Test
     public void testExtractDDL() throws IOException {
-        String srcDDL = "alter table t1 add \n column  c_1 varchar(200)";
-        String record = 
"{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\
 [...]
+        String srcDDL = "ALTER TABLE test.t1 add COLUMN c_1 varchar(200)";
+        String record = 
"{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\
 [...]
         JsonNode recordRoot = objectMapper.readTree(record);
         String ddl = serializer.extractDDL(recordRoot);
         Assert.assertEquals(srcDDL, ddl);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to