This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 14be15b  [Improve](schemaChange)schema change ddl supports 
multi-column changes, synchronous defaults (#167)
14be15b is described below

commit 14be15bc140e18337fc927891946050e00230097
Author: DongLiang-0 <46414265+donglian...@users.noreply.github.com>
AuthorDate: Thu Aug 3 18:02:59 2023 +0800

    [Improve](schemaChange)schema change ddl supports multi-column changes, 
synchronous defaults (#167)
---
 flink-doris-connector/pom.xml                      |   2 +-
 .../doris/flink/catalog/doris/FieldSchema.java     |  16 ++
 .../sink/writer/JsonDebeziumSchemaSerializer.java  | 181 ++++++++++++++++++++-
 .../flink/sink/writer/SchemaChangeHelper.java      | 109 +++++++++++++
 .../org/apache/doris/flink/tools/cdc/CdcTools.java |   3 +-
 .../apache/doris/flink/tools/cdc/DatabaseSync.java |   9 +-
 .../apache/doris/flink/CDCSchemaChangeExample.java |   2 +-
 .../writer/TestJsonDebeziumSchemaSerializer.java   |  66 ++++++++
 .../flink/tools/cdc/CdcMysqlSyncDatabaseCase.java  |   3 +-
 .../tools/cdc/CdcOraclelSyncDatabaseCase.java      |   3 +-
 10 files changed, 383 insertions(+), 11 deletions(-)

diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index b637fef..e10ad27 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -245,7 +245,7 @@ under the License.
         <dependency>
             <groupId>com.ververica</groupId>
             <artifactId>flink-sql-connector-mysql-cdc</artifactId>
-            <version>2.3.0</version>
+            <version>2.4.1</version>
             <scope>provided</scope>
         </dependency>
         <dependency>
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/FieldSchema.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/FieldSchema.java
index 8255bd3..3c28b74 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/FieldSchema.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/FieldSchema.java
@@ -19,6 +19,7 @@ package org.apache.doris.flink.catalog.doris;
 public class FieldSchema {
     private String name;
     private String typeString;
+    private String defaultValue;
     private String comment;
 
     public FieldSchema() {
@@ -30,6 +31,13 @@ public class FieldSchema {
         this.comment = comment;
     }
 
+    public FieldSchema(String name, String typeString, String defaultValue, 
String comment) {
+        this.name = name;
+        this.typeString = typeString;
+        this.defaultValue = defaultValue;
+        this.comment = comment;
+    }
+
     public String getName() {
         return name;
     }
@@ -53,4 +61,12 @@ public class FieldSchema {
     public void setComment(String comment) {
         this.comment = comment;
     }
+
+    public String getDefaultValue() {
+        return defaultValue;
+    }
+
+    public void setDefaultValue(String defaultValue) {
+        this.defaultValue = defaultValue;
+    }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
index 3329b23..6b66e76 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
@@ -23,11 +23,18 @@ import 
com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.NullNode;
 import org.apache.commons.codec.binary.Base64;
+
+import org.apache.doris.flink.catalog.doris.FieldSchema;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.exception.IllegalArgumentException;
 import org.apache.doris.flink.rest.RestService;
 import org.apache.doris.flink.sink.HttpGetWithEntity;
+import org.apache.doris.flink.sink.writer.SchemaChangeHelper.DDLSchema;
+import org.apache.doris.flink.tools.cdc.mysql.MysqlType;
+
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.util.StringUtils;
 import org.apache.http.HttpHeaders;
@@ -44,8 +51,11 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -70,8 +80,12 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
     private String table;
     //table name of the cdc upstream, format is db.tbl
     private String sourceTableName;
+    private boolean firstLoad;
+    private boolean firstSchemaChange;
+    private Map<String, FieldSchema> originFieldSchemaMap;
+    private final boolean newSchemaChange;
 
-    public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions, Pattern 
pattern, String sourceTableName) {
+    public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions, Pattern 
pattern, String sourceTableName, boolean newSchemaChange) {
         this.dorisOptions = dorisOptions;
         this.addDropDDLPattern = pattern == null ? 
Pattern.compile(addDropDDLRegex, Pattern.CASE_INSENSITIVE) : pattern;
         String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
@@ -82,6 +96,9 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
         
this.objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
         JsonNodeFactory jsonNodeFactory = 
JsonNodeFactory.withExactBigDecimals(true);
         this.objectMapper.setNodeFactory(jsonNodeFactory);
+        this.newSchemaChange = newSchemaChange;
+        this.firstLoad = true;
+        this.firstSchemaChange = true;
     }
 
     @Override
@@ -91,9 +108,17 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
         String op = extractJsonNode(recordRoot, "op");
         if (Objects.isNull(op)) {
             //schema change ddl
-            schemaChange(recordRoot);
+            if (newSchemaChange) {
+                schemaChangeV2(recordRoot);
+            } else {
+                schemaChange(recordRoot);
+            }
             return null;
         }
+
+        if (newSchemaChange && firstLoad) {
+            initOriginFieldSchema(recordRoot);
+        }
         Map<String, String> valueMap;
         switch (op) {
             case OP_READ:
@@ -113,6 +138,70 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
         return 
objectMapper.writeValueAsString(valueMap).getBytes(StandardCharsets.UTF_8);
     }
 
+    public boolean schemaChangeV2(JsonNode recordRoot) {
+        boolean status = false;
+        try {
+            if (!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && 
!checkTable(recordRoot)) {
+                return false;
+            }
+            List<String> ddlSqlList = extractDDLList(recordRoot);
+            if (CollectionUtils.isEmpty(ddlSqlList)) {
+                LOG.info("ddl can not do schema change:{}", recordRoot);
+                return false;
+            }
+
+            List<DDLSchema> ddlSchemas = SchemaChangeHelper.getDdlSchemas();
+            for (int i = 0; i < ddlSqlList.size(); i++) {
+                DDLSchema ddlSchema = ddlSchemas.get(i);
+                String ddlSql = ddlSqlList.get(i);
+                boolean doSchemaChange = checkSchemaChange(ddlSchema);
+                status = doSchemaChange && execSchemaChange(ddlSql);
+                LOG.info("schema change status:{}", status);
+            }
+        } catch (Exception ex) {
+            LOG.warn("schema change error :", ex);
+        }
+        return status;
+    }
+
+    private boolean checkSchemaChange(DDLSchema ddlSchema) throws IOException, 
IllegalArgumentException {
+        String requestUrl = String.format(CHECK_SCHEMA_CHANGE_API, 
RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), database, table);
+        Map<String,Object> param = buildRequestParam(ddlSchema);
+        HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl);
+        httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
+        httpGet.setEntity(new 
StringEntity(objectMapper.writeValueAsString(param)));
+        boolean success = handleResponse(httpGet);
+        if (!success) {
+            LOG.warn("schema change can not do table {}.{}",database,table);
+        }
+        return success;
+    }
+
+    @VisibleForTesting
+    public List<String> extractDDLList(JsonNode record) throws 
JsonProcessingException {
+        JsonNode historyRecord = objectMapper.readTree(extractJsonNode(record, 
"historyRecord"));
+        JsonNode tableChanges = historyRecord.get("tableChanges");
+        JsonNode tableChange = tableChanges.get(0);
+        String ddl = extractJsonNode(historyRecord, "ddl");
+        LOG.debug("received debezium ddl :{}", ddl);
+
+        Matcher matcher = addDropDDLPattern.matcher(ddl);
+        if (Objects.isNull(tableChange)|| 
!tableChange.get("type").asText().equals("ALTER") || !matcher.find()) {
+            return null;
+        }
+
+        JsonNode columns = tableChange.get("table").get("columns");
+        if (firstSchemaChange) {
+            fillOriginSchema(columns);
+        }
+        Map<String, FieldSchema> updateFiledSchema = new LinkedHashMap<>();
+        for (JsonNode column : columns) {
+            buildFieldSchema(updateFiledSchema, column);
+        }
+        SchemaChangeHelper.compareSchema(updateFiledSchema, 
originFieldSchemaMap);
+        return 
SchemaChangeHelper.generateDDLSql(dorisOptions.getTableIdentifier());
+    }
+
     @VisibleForTesting
     public boolean schemaChange(JsonNode recordRoot) {
         boolean status = false;
@@ -168,6 +257,13 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
         return success;
     }
 
+    protected Map<String, Object> buildRequestParam(DDLSchema ddlSchema) {
+        Map<String, Object> params = new HashMap<>();
+        params.put("isDropColumn", ddlSchema.isDropColumn());
+        params.put("columnName", ddlSchema.getColumnName());
+        return params;
+    }
+
     /**
      * Build param
      * {
@@ -233,7 +329,8 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
     }
 
     private String extractJsonNode(JsonNode record, String key) {
-        return record != null && record.get(key) != null ? 
record.get(key).asText() : null;
+        return record != null && record.get(key) != null &&
+                !(record.get(key) instanceof NullNode) ? 
record.get(key).asText() : null;
     }
 
     private Map<String, String> extractBeforeRow(JsonNode record) {
@@ -277,6 +374,76 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
         return "Basic " + new 
String(Base64.encodeBase64((dorisOptions.getUsername() + ":" + 
dorisOptions.getPassword()).getBytes(StandardCharsets.UTF_8)));
     }
 
+    @VisibleForTesting
+    public void fillOriginSchema(JsonNode columns) {
+        if (Objects.nonNull(originFieldSchemaMap)) {
+            for (JsonNode column : columns) {
+                String fieldName = column.get("name").asText();
+                if (originFieldSchemaMap.containsKey(fieldName)) {
+                    JsonNode length = column.get("length");
+                    JsonNode scale = column.get("scale");
+                    String type = 
MysqlType.toDorisType(column.get("typeName").asText(),
+                            length == null ? 0 : length.asInt(),
+                            scale == null ? 0 : scale.asInt());
+                    String defaultValue = 
handleDefaultValue(extractJsonNode(column, "defaultValueExpression"));
+                    String comment = extractJsonNode(column, "comment");
+                    FieldSchema fieldSchema = 
originFieldSchemaMap.get(fieldName);
+                    fieldSchema.setName(fieldName);
+                    fieldSchema.setTypeString(type);
+                    fieldSchema.setComment(comment);
+                    fieldSchema.setDefaultValue(defaultValue);
+                }
+            }
+        } else {
+            originFieldSchemaMap = new LinkedHashMap<>();
+            columns.forEach(column -> buildFieldSchema(originFieldSchemaMap, 
column));
+        }
+        firstSchemaChange = false;
+        firstLoad = false;
+    }
+
+    private void buildFieldSchema(Map<String, FieldSchema> filedSchemaMap, 
JsonNode column) {
+        String fieldName = column.get("name").asText();
+        JsonNode length = column.get("length");
+        JsonNode scale = column.get("scale");
+        String type = MysqlType.toDorisType(column.get("typeName").asText(),
+                length == null ? 0 : length.asInt(), scale == null ? 0 : 
scale.asInt());
+        String defaultValue = handleDefaultValue(extractJsonNode(column, 
"defaultValueExpression"));
+        String comment = extractJsonNode(column, "comment");
+        filedSchemaMap.put(fieldName, new FieldSchema(fieldName, type, 
defaultValue, comment));
+    }
+
+    private String handleDefaultValue(String defaultValue) {
+        if (StringUtils.isNullOrWhitespaceOnly(defaultValue)) {
+            return null;
+        }
+        // Due to historical reasons, doris needs to add quotes to the default 
value of the new column
+        // For example in mysql: alter table add column c1 int default 100
+        // In Doris: alter table add column c1 int default '100'
+        if (Pattern.matches("['\"].*?['\"]", defaultValue)) {
+            return defaultValue;
+        } else if (defaultValue.equals("1970-01-01 00:00:00")) {
+            // TODO: The default value of setting the current time in CDC is 
1970-01-01 00:00:00
+            return "current_timestamp";
+        }
+        return "'" + defaultValue + "'";
+    }
+
+    private void initOriginFieldSchema(JsonNode recordRoot) {
+        originFieldSchemaMap = new LinkedHashMap<>();
+        Set<String> columnNameSet = extractAfterRow(recordRoot).keySet();
+        if (CollectionUtils.isEmpty(columnNameSet)) {
+            columnNameSet = extractBeforeRow(recordRoot).keySet();
+        }
+        columnNameSet.forEach(columnName -> 
originFieldSchemaMap.put(columnName, new FieldSchema()));
+        firstLoad = false;
+    }
+
+    @VisibleForTesting
+    public Map<String, FieldSchema> getOriginFieldSchemaMap() {
+        return originFieldSchemaMap;
+    }
+
     public static JsonDebeziumSchemaSerializer.Builder builder() {
         return new JsonDebeziumSchemaSerializer.Builder();
     }
@@ -288,12 +455,18 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
         private DorisOptions dorisOptions;
         private Pattern addDropDDLPattern;
         private String sourceTableName;
+        private boolean newSchemaChange;
 
         public JsonDebeziumSchemaSerializer.Builder 
setDorisOptions(DorisOptions dorisOptions) {
             this.dorisOptions = dorisOptions;
             return this;
         }
 
+        public JsonDebeziumSchemaSerializer.Builder setNewSchemaChange(boolean 
newSchemaChange) {
+            this.newSchemaChange = newSchemaChange;
+            return this;
+        }
+
         public JsonDebeziumSchemaSerializer.Builder setPattern(Pattern 
addDropDDLPattern) {
             this.addDropDDLPattern = addDropDDLPattern;
             return this;
@@ -305,7 +478,7 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
         }
 
         public JsonDebeziumSchemaSerializer build() {
-            return new JsonDebeziumSchemaSerializer(dorisOptions, 
addDropDDLPattern, sourceTableName);
+            return new JsonDebeziumSchemaSerializer(dorisOptions, 
addDropDDLPattern, sourceTableName, newSchemaChange);
         }
     }
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java
new file mode 100644
index 0000000..302770c
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer;
+
+import org.apache.doris.flink.catalog.doris.FieldSchema;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.compress.utils.Lists;
+import org.apache.flink.util.StringUtils;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+public class SchemaChangeHelper {
+    private static final List<String> dropFieldSchemas = Lists.newArrayList();
+    private static final List<FieldSchema> addFieldSchemas = 
Lists.newArrayList();
+    // Used to determine whether the doris table supports ddl
+    private static final List<DDLSchema> ddlSchemas = Lists.newArrayList();
+    public static final String ADD_DDL = "ALTER TABLE %s ADD COLUMN %s %s";
+    public static final String DROP_DDL = "ALTER TABLE %s DROP COLUMN %s";
+
+    public static void compareSchema(Map<String, FieldSchema> 
updateFiledSchemaMap,
+            Map<String, FieldSchema> originFieldSchemaMap) {
+        for (Entry<String, FieldSchema> updateFieldSchema : 
updateFiledSchemaMap.entrySet()) {
+            String columName = updateFieldSchema.getKey();
+            if (!originFieldSchemaMap.containsKey(columName)) {
+                addFieldSchemas.add(updateFieldSchema.getValue());
+                originFieldSchemaMap.put(columName, 
updateFieldSchema.getValue());
+            }
+        }
+        for (Entry<String, FieldSchema> originFieldSchema : 
originFieldSchemaMap.entrySet()) {
+            String columName = originFieldSchema.getKey();
+            if (!updateFiledSchemaMap.containsKey(columName)) {
+                dropFieldSchemas.add(columName);
+            }
+        }
+        if (CollectionUtils.isNotEmpty(dropFieldSchemas)) {
+            dropFieldSchemas.forEach(originFieldSchemaMap::remove);
+        }
+    }
+
+    public static List<String> generateDDLSql(String table) {
+        ddlSchemas.clear();
+        List<String> ddlList = Lists.newArrayList();
+        for (FieldSchema fieldSchema : addFieldSchemas) {
+            String name = fieldSchema.getName();
+            String type = fieldSchema.getTypeString();
+            String defaultValue = fieldSchema.getDefaultValue();
+            String comment = fieldSchema.getComment();
+            String addDDL = String.format(ADD_DDL, table, name, type);
+            if (!StringUtils.isNullOrWhitespaceOnly(defaultValue)) {
+                addDDL = addDDL + " DEFAULT " + defaultValue;
+            }
+            if (!StringUtils.isNullOrWhitespaceOnly(comment)) {
+                addDDL = addDDL + " COMMENT " + comment;
+            }
+            ddlList.add(addDDL);
+            ddlSchemas.add(new DDLSchema(name, false));
+        }
+        for (String columName : dropFieldSchemas) {
+            String dropDDL = String.format(DROP_DDL, table, columName);
+            ddlList.add(dropDDL);
+            ddlSchemas.add(new DDLSchema(columName, true));
+        }
+
+        dropFieldSchemas.clear();
+        addFieldSchemas.clear();
+        return ddlList;
+    }
+
+    public static List<DDLSchema> getDdlSchemas() {
+        return ddlSchemas;
+    }
+
+    static class DDLSchema {
+        private final String columnName;
+        private final boolean isDropColumn;
+
+        public DDLSchema(String columnName, boolean isDropColumn) {
+            this.columnName = columnName;
+            this.isDropColumn = isDropColumn;
+        }
+
+        public String getColumnName() {
+            return columnName;
+        }
+
+        public boolean isDropColumn() {
+            return isDropColumn;
+        }
+    }
+
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
index 4a44be9..6d10266 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
@@ -79,13 +79,14 @@ public class CdcTools {
         String excludingTables = params.get("excluding-tables");
         boolean createTableOnly = params.has("create-table-only");
         boolean ignoreDefaultValue = params.has("ignore-default-value");
+        boolean useNewSchemaChange = params.has("use-new-schema-change");
 
         Map<String, String> sinkMap = getConfigMap(params, "sink-conf");
         Map<String, String> tableMap = getConfigMap(params, "table-conf");
         Configuration sinkConfig = Configuration.fromMap(sinkMap);
 
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        databaseSync.create(env, database, config, tablePrefix, tableSuffix, 
includingTables, excludingTables, ignoreDefaultValue, sinkConfig, tableMap, 
createTableOnly);
+        databaseSync.create(env, database, config, tablePrefix, tableSuffix, 
includingTables, excludingTables, ignoreDefaultValue, sinkConfig, tableMap, 
createTableOnly, useNewSchemaChange);
         databaseSync.build();
         if(StringUtils.isNullOrWhitespaceOnly(jobName)){
             jobName = String.format("%s-Doris Sync Database: %s", type, 
config.getString("database-name","db"));
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 82424c1..9604dcd 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -60,6 +60,7 @@ public abstract class DatabaseSync {
     protected boolean ignoreDefaultValue;
     public StreamExecutionEnvironment env;
     private boolean createTableOnly = false;
+    private boolean newSchemaChange;
 
     public abstract Connection getConnection() throws SQLException;
 
@@ -70,7 +71,7 @@ public abstract class DatabaseSync {
     public void create(StreamExecutionEnvironment env, String database, 
Configuration config,
                        String tablePrefix, String tableSuffix, String 
includingTables,
                        String excludingTables, boolean ignoreDefaultValue, 
Configuration sinkConfig,
-            Map<String, String> tableConfig, boolean createTableOnly) {
+            Map<String, String> tableConfig, boolean createTableOnly, boolean 
useNewSchemaChange) {
         this.env = env;
         this.config = config;
         this.database = database;
@@ -85,6 +86,7 @@ public abstract class DatabaseSync {
             this.tableConfig.put(LIGHT_SCHEMA_CHANGE, "true");
         }
         this.createTableOnly = createTableOnly;
+        this.newSchemaChange = useNewSchemaChange;
     }
 
     public void build() throws Exception {
@@ -185,7 +187,10 @@ public abstract class DatabaseSync {
         }
         builder.setDorisReadOptions(DorisReadOptions.builder().build())
                 .setDorisExecutionOptions(executionBuilder.build())
-                
.setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisBuilder.build()).build())
+                .setSerializer(JsonDebeziumSchemaSerializer.builder()
+                        .setDorisOptions(dorisBuilder.build())
+                        .setNewSchemaChange(newSchemaChange)
+                        .build())
                 .setDorisOptions(dorisBuilder.build());
         return builder.build();
     }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java
index 3bad9be..62578d3 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java
@@ -78,7 +78,7 @@ public class CDCSchemaChangeExample {
         builder.setDorisReadOptions(DorisReadOptions.builder().build())
                 .setDorisExecutionOptions(executionBuilder.build())
                 .setDorisOptions(dorisOptions)
-                
.setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build());
+                
.setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).setNewSchemaChange(true).build());
 
         env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL 
Source")//.print();
                 .sinkTo(builder.build());
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
index de5a4de..5c2827c 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
@@ -20,12 +20,16 @@ package org.apache.doris.flink.sink.writer;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.doris.flink.catalog.doris.FieldSchema;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.exception.DorisException;
 import org.apache.doris.flink.rest.RestService;
 import org.apache.doris.flink.rest.models.Field;
 import org.apache.doris.flink.rest.models.Schema;
+
+import org.apache.commons.collections.CollectionUtils;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
@@ -35,8 +39,12 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 
 /**
  * test for JsonDebeziumSchemaSerializer.
@@ -114,6 +122,64 @@ public class TestJsonDebeziumSchemaSerializer {
 
     }
 
+    @Test
+    public void testExtractDDLListMultipleColumns() throws IOException {
+        String sql0 = "ALTER TABLE test.t1 ADD COLUMN c2 INT";
+        String sql1 = "ALTER TABLE test.t1 ADD COLUMN c555 VARCHAR(300)";
+        String sql2 = "ALTER TABLE test.t1 ADD COLUMN c666 INT DEFAULT '100'";
+        String sql3 = "ALTER TABLE test.t1 ADD COLUMN c4 BIGINT DEFAULT '555'";
+        String sql4 = "ALTER TABLE test.t1 ADD COLUMN c199 INT";
+        String sql5 = "ALTER TABLE test.t1 ADD COLUMN c12 INT DEFAULT '100'";
+        String sql6 = "ALTER TABLE test.t1 DROP COLUMN name";
+        String sql7 = "ALTER TABLE test.t1 DROP COLUMN test_time";
+        String sql8 = "ALTER TABLE test.t1 DROP COLUMN c1";
+        String sql9 = "ALTER TABLE test.t1 DROP COLUMN cc";
+        List<String> srcSqlList = Arrays.asList(sql0, sql1, sql2, sql3, 
sql4,sql5,sql6,sql7,sql8,sql9);
+
+        String record = 
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1691033764674,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000029\",\"pos\":23305,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23305,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\
 [...]
+        JsonNode recordRoot = objectMapper.readTree(record);
+        List<String> ddlSQLList = serializer.extractDDLList(recordRoot);
+        for (int i = 0; i < ddlSQLList.size(); i++) {
+            String srcSQL = srcSqlList.get(i);
+            String targetSQL = ddlSQLList.get(i);
+            Assert.assertEquals(srcSQL, targetSQL);
+        }
+    }
+
+    @Test
+    public void testExtractDDLListRenameColumn() throws IOException {
+        String record = 
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1691034519226,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000029\",\"pos\":23752,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23752,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\
 [...]
+        JsonNode recordRoot = objectMapper.readTree(record);
+        List<String> ddlSQLList = serializer.extractDDLList(recordRoot);
+        Assert.assertTrue(CollectionUtils.isEmpty(ddlSQLList));
+    }
+
+    @Test
+    public void testFillOriginSchema() throws IOException {
+        Map<String, FieldSchema> srcFiledSchemaMap = new LinkedHashMap<>();
+        srcFiledSchemaMap.put("id", new FieldSchema("id", "INT", null, null));
+        srcFiledSchemaMap.put("name", new FieldSchema("name", "VARCHAR(150)", 
null, null));
+        srcFiledSchemaMap.put("test_time", new FieldSchema("test_time", 
"DATETIMEV2(0)", null, null));
+        srcFiledSchemaMap.put("c1", new FieldSchema("c1", "INT", "'100'", 
null));
+
+        String columnsString = 
"[{\"name\":\"id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":false,\"enumValues\":[]},{\"name\":\"name\",\"jdbcType\":12,\"typeName\":\"VARCHAR\",\"typeExpression\":\"VARCHAR\",\"charsetName\":\"utf8mb4\",\"length\":50,\"position\":2,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"co
 [...]
+        JsonNode columns = objectMapper.readTree(columnsString);
+        serializer.fillOriginSchema(columns);
+        Map<String, FieldSchema> originFieldSchemaMap = 
serializer.getOriginFieldSchemaMap();
+
+        Iterator<Entry<String, FieldSchema>> originFieldSchemaIterator = 
originFieldSchemaMap.entrySet().iterator();
+        for (Entry<String, FieldSchema> entry : srcFiledSchemaMap.entrySet()) {
+            FieldSchema srcFiledSchema = entry.getValue();
+            Entry<String, FieldSchema> originField = 
originFieldSchemaIterator.next();
+
+            Assert.assertEquals(entry.getKey(), originField.getKey());
+            Assert.assertEquals(srcFiledSchema.getName(), 
originField.getValue().getName());
+            Assert.assertEquals(srcFiledSchema.getTypeString(), 
originField.getValue().getTypeString());
+            Assert.assertEquals(srcFiledSchema.getDefaultValue(), 
originField.getValue().getDefaultValue());
+            Assert.assertEquals(srcFiledSchema.getComment(), 
originField.getValue().getComment());
+        }
+    }
+
     @Ignore
     @Test
     public void testSerializeAddColumn() throws IOException, DorisException {
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
index 4a109c2..c20be39 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
@@ -65,8 +65,9 @@ public class CdcMysqlSyncDatabaseCase {
         String includingTables = "tbl1|tbl2|tbl3";
         String excludingTables = "";
         boolean ignoreDefaultValue = false;
+        boolean useNewSchemaChange = false;
         DatabaseSync databaseSync = new MysqlDatabaseSync();
-        
databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig,
 false);
+        
databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig,
 false, useNewSchemaChange);
         databaseSync.build();
         env.execute(String.format("MySQL-Doris Database Sync: %s", database));
 
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
index b3b4384..08cf586 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
@@ -71,8 +71,9 @@ public class CdcOraclelSyncDatabaseCase {
         String includingTables = "test.*";
         String excludingTables = "";
         boolean ignoreDefaultValue = false;
+        boolean useNewSchemaChange = false;
         DatabaseSync databaseSync = new OracleDatabaseSync();
-        
databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig,
 false);
+        
databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig,
 false, useNewSchemaChange);
         databaseSync.build();
         env.execute(String.format("Oracle-Doris Database Sync: %s", database));
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to