This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 14be15b [Improve](schemaChange)schema change ddl supports multi-column changes, synchronous defaults (#167) 14be15b is described below commit 14be15bc140e18337fc927891946050e00230097 Author: DongLiang-0 <46414265+donglian...@users.noreply.github.com> AuthorDate: Thu Aug 3 18:02:59 2023 +0800 [Improve](schemaChange)schema change ddl supports multi-column changes, synchronous defaults (#167) --- flink-doris-connector/pom.xml | 2 +- .../doris/flink/catalog/doris/FieldSchema.java | 16 ++ .../sink/writer/JsonDebeziumSchemaSerializer.java | 181 ++++++++++++++++++++- .../flink/sink/writer/SchemaChangeHelper.java | 109 +++++++++++++ .../org/apache/doris/flink/tools/cdc/CdcTools.java | 3 +- .../apache/doris/flink/tools/cdc/DatabaseSync.java | 9 +- .../apache/doris/flink/CDCSchemaChangeExample.java | 2 +- .../writer/TestJsonDebeziumSchemaSerializer.java | 66 ++++++++ .../flink/tools/cdc/CdcMysqlSyncDatabaseCase.java | 3 +- .../tools/cdc/CdcOraclelSyncDatabaseCase.java | 3 +- 10 files changed, 383 insertions(+), 11 deletions(-) diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml index b637fef..e10ad27 100644 --- a/flink-doris-connector/pom.xml +++ b/flink-doris-connector/pom.xml @@ -245,7 +245,7 @@ under the License. <dependency> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-mysql-cdc</artifactId> - <version>2.3.0</version> + <version>2.4.1</version> <scope>provided</scope> </dependency> <dependency> diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/FieldSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/FieldSchema.java index 8255bd3..3c28b74 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/FieldSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/FieldSchema.java @@ -19,6 +19,7 @@ package org.apache.doris.flink.catalog.doris; public class FieldSchema { private String name; private String typeString; + private String defaultValue; private String comment; public FieldSchema() { @@ -30,6 +31,13 @@ public class FieldSchema { this.comment = comment; } + public FieldSchema(String name, String typeString, String defaultValue, String comment) { + this.name = name; + this.typeString = typeString; + this.defaultValue = defaultValue; + this.comment = comment; + } + public String getName() { return name; } @@ -53,4 +61,12 @@ public class FieldSchema { public void setComment(String comment) { this.comment = comment; } + + public String getDefaultValue() { + return defaultValue; + } + + public void setDefaultValue(String defaultValue) { + this.defaultValue = defaultValue; + } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java index 3329b23..6b66e76 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java @@ -23,11 +23,18 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.NullNode; import org.apache.commons.codec.binary.Base64; + +import org.apache.doris.flink.catalog.doris.FieldSchema; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.exception.IllegalArgumentException; import org.apache.doris.flink.rest.RestService; import org.apache.doris.flink.sink.HttpGetWithEntity; +import org.apache.doris.flink.sink.writer.SchemaChangeHelper.DDLSchema; +import org.apache.doris.flink.tools.cdc.mysql.MysqlType; + +import org.apache.commons.collections.CollectionUtils; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.util.StringUtils; import org.apache.http.HttpHeaders; @@ -44,8 +51,11 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -70,8 +80,12 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin private String table; //table name of the cdc upstream, format is db.tbl private String sourceTableName; + private boolean firstLoad; + private boolean firstSchemaChange; + private Map<String, FieldSchema> originFieldSchemaMap; + private final boolean newSchemaChange; - public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions, Pattern pattern, String sourceTableName) { + public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions, Pattern pattern, String sourceTableName, boolean newSchemaChange) { this.dorisOptions = dorisOptions; this.addDropDDLPattern = pattern == null ? Pattern.compile(addDropDDLRegex, Pattern.CASE_INSENSITIVE) : pattern; String[] tableInfo = dorisOptions.getTableIdentifier().split("\\."); @@ -82,6 +96,9 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin this.objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS); JsonNodeFactory jsonNodeFactory = JsonNodeFactory.withExactBigDecimals(true); this.objectMapper.setNodeFactory(jsonNodeFactory); + this.newSchemaChange = newSchemaChange; + this.firstLoad = true; + this.firstSchemaChange = true; } @Override @@ -91,9 +108,17 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin String op = extractJsonNode(recordRoot, "op"); if (Objects.isNull(op)) { //schema change ddl - schemaChange(recordRoot); + if (newSchemaChange) { + schemaChangeV2(recordRoot); + } else { + schemaChange(recordRoot); + } return null; } + + if (newSchemaChange && firstLoad) { + initOriginFieldSchema(recordRoot); + } Map<String, String> valueMap; switch (op) { case OP_READ: @@ -113,6 +138,70 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin return objectMapper.writeValueAsString(valueMap).getBytes(StandardCharsets.UTF_8); } + public boolean schemaChangeV2(JsonNode recordRoot) { + boolean status = false; + try { + if (!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && !checkTable(recordRoot)) { + return false; + } + List<String> ddlSqlList = extractDDLList(recordRoot); + if (CollectionUtils.isEmpty(ddlSqlList)) { + LOG.info("ddl can not do schema change:{}", recordRoot); + return false; + } + + List<DDLSchema> ddlSchemas = SchemaChangeHelper.getDdlSchemas(); + for (int i = 0; i < ddlSqlList.size(); i++) { + DDLSchema ddlSchema = ddlSchemas.get(i); + String ddlSql = ddlSqlList.get(i); + boolean doSchemaChange = checkSchemaChange(ddlSchema); + status = doSchemaChange && execSchemaChange(ddlSql); + LOG.info("schema change status:{}", status); + } + } catch (Exception ex) { + LOG.warn("schema change error :", ex); + } + return status; + } + + private boolean checkSchemaChange(DDLSchema ddlSchema) throws IOException, IllegalArgumentException { + String requestUrl = String.format(CHECK_SCHEMA_CHANGE_API, RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), database, table); + Map<String,Object> param = buildRequestParam(ddlSchema); + HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl); + httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader()); + httpGet.setEntity(new StringEntity(objectMapper.writeValueAsString(param))); + boolean success = handleResponse(httpGet); + if (!success) { + LOG.warn("schema change can not do table {}.{}",database,table); + } + return success; + } + + @VisibleForTesting + public List<String> extractDDLList(JsonNode record) throws JsonProcessingException { + JsonNode historyRecord = objectMapper.readTree(extractJsonNode(record, "historyRecord")); + JsonNode tableChanges = historyRecord.get("tableChanges"); + JsonNode tableChange = tableChanges.get(0); + String ddl = extractJsonNode(historyRecord, "ddl"); + LOG.debug("received debezium ddl :{}", ddl); + + Matcher matcher = addDropDDLPattern.matcher(ddl); + if (Objects.isNull(tableChange)|| !tableChange.get("type").asText().equals("ALTER") || !matcher.find()) { + return null; + } + + JsonNode columns = tableChange.get("table").get("columns"); + if (firstSchemaChange) { + fillOriginSchema(columns); + } + Map<String, FieldSchema> updateFiledSchema = new LinkedHashMap<>(); + for (JsonNode column : columns) { + buildFieldSchema(updateFiledSchema, column); + } + SchemaChangeHelper.compareSchema(updateFiledSchema, originFieldSchemaMap); + return SchemaChangeHelper.generateDDLSql(dorisOptions.getTableIdentifier()); + } + @VisibleForTesting public boolean schemaChange(JsonNode recordRoot) { boolean status = false; @@ -168,6 +257,13 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin return success; } + protected Map<String, Object> buildRequestParam(DDLSchema ddlSchema) { + Map<String, Object> params = new HashMap<>(); + params.put("isDropColumn", ddlSchema.isDropColumn()); + params.put("columnName", ddlSchema.getColumnName()); + return params; + } + /** * Build param * { @@ -233,7 +329,8 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin } private String extractJsonNode(JsonNode record, String key) { - return record != null && record.get(key) != null ? record.get(key).asText() : null; + return record != null && record.get(key) != null && + !(record.get(key) instanceof NullNode) ? record.get(key).asText() : null; } private Map<String, String> extractBeforeRow(JsonNode record) { @@ -277,6 +374,76 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin return "Basic " + new String(Base64.encodeBase64((dorisOptions.getUsername() + ":" + dorisOptions.getPassword()).getBytes(StandardCharsets.UTF_8))); } + @VisibleForTesting + public void fillOriginSchema(JsonNode columns) { + if (Objects.nonNull(originFieldSchemaMap)) { + for (JsonNode column : columns) { + String fieldName = column.get("name").asText(); + if (originFieldSchemaMap.containsKey(fieldName)) { + JsonNode length = column.get("length"); + JsonNode scale = column.get("scale"); + String type = MysqlType.toDorisType(column.get("typeName").asText(), + length == null ? 0 : length.asInt(), + scale == null ? 0 : scale.asInt()); + String defaultValue = handleDefaultValue(extractJsonNode(column, "defaultValueExpression")); + String comment = extractJsonNode(column, "comment"); + FieldSchema fieldSchema = originFieldSchemaMap.get(fieldName); + fieldSchema.setName(fieldName); + fieldSchema.setTypeString(type); + fieldSchema.setComment(comment); + fieldSchema.setDefaultValue(defaultValue); + } + } + } else { + originFieldSchemaMap = new LinkedHashMap<>(); + columns.forEach(column -> buildFieldSchema(originFieldSchemaMap, column)); + } + firstSchemaChange = false; + firstLoad = false; + } + + private void buildFieldSchema(Map<String, FieldSchema> filedSchemaMap, JsonNode column) { + String fieldName = column.get("name").asText(); + JsonNode length = column.get("length"); + JsonNode scale = column.get("scale"); + String type = MysqlType.toDorisType(column.get("typeName").asText(), + length == null ? 0 : length.asInt(), scale == null ? 0 : scale.asInt()); + String defaultValue = handleDefaultValue(extractJsonNode(column, "defaultValueExpression")); + String comment = extractJsonNode(column, "comment"); + filedSchemaMap.put(fieldName, new FieldSchema(fieldName, type, defaultValue, comment)); + } + + private String handleDefaultValue(String defaultValue) { + if (StringUtils.isNullOrWhitespaceOnly(defaultValue)) { + return null; + } + // Due to historical reasons, doris needs to add quotes to the default value of the new column + // For example in mysql: alter table add column c1 int default 100 + // In Doris: alter table add column c1 int default '100' + if (Pattern.matches("['\"].*?['\"]", defaultValue)) { + return defaultValue; + } else if (defaultValue.equals("1970-01-01 00:00:00")) { + // TODO: The default value of setting the current time in CDC is 1970-01-01 00:00:00 + return "current_timestamp"; + } + return "'" + defaultValue + "'"; + } + + private void initOriginFieldSchema(JsonNode recordRoot) { + originFieldSchemaMap = new LinkedHashMap<>(); + Set<String> columnNameSet = extractAfterRow(recordRoot).keySet(); + if (CollectionUtils.isEmpty(columnNameSet)) { + columnNameSet = extractBeforeRow(recordRoot).keySet(); + } + columnNameSet.forEach(columnName -> originFieldSchemaMap.put(columnName, new FieldSchema())); + firstLoad = false; + } + + @VisibleForTesting + public Map<String, FieldSchema> getOriginFieldSchemaMap() { + return originFieldSchemaMap; + } + public static JsonDebeziumSchemaSerializer.Builder builder() { return new JsonDebeziumSchemaSerializer.Builder(); } @@ -288,12 +455,18 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin private DorisOptions dorisOptions; private Pattern addDropDDLPattern; private String sourceTableName; + private boolean newSchemaChange; public JsonDebeziumSchemaSerializer.Builder setDorisOptions(DorisOptions dorisOptions) { this.dorisOptions = dorisOptions; return this; } + public JsonDebeziumSchemaSerializer.Builder setNewSchemaChange(boolean newSchemaChange) { + this.newSchemaChange = newSchemaChange; + return this; + } + public JsonDebeziumSchemaSerializer.Builder setPattern(Pattern addDropDDLPattern) { this.addDropDDLPattern = addDropDDLPattern; return this; @@ -305,7 +478,7 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin } public JsonDebeziumSchemaSerializer build() { - return new JsonDebeziumSchemaSerializer(dorisOptions, addDropDDLPattern, sourceTableName); + return new JsonDebeziumSchemaSerializer(dorisOptions, addDropDDLPattern, sourceTableName, newSchemaChange); } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java new file mode 100644 index 0000000..302770c --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.writer; + +import org.apache.doris.flink.catalog.doris.FieldSchema; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.compress.utils.Lists; +import org.apache.flink.util.StringUtils; + +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +public class SchemaChangeHelper { + private static final List<String> dropFieldSchemas = Lists.newArrayList(); + private static final List<FieldSchema> addFieldSchemas = Lists.newArrayList(); + // Used to determine whether the doris table supports ddl + private static final List<DDLSchema> ddlSchemas = Lists.newArrayList(); + public static final String ADD_DDL = "ALTER TABLE %s ADD COLUMN %s %s"; + public static final String DROP_DDL = "ALTER TABLE %s DROP COLUMN %s"; + + public static void compareSchema(Map<String, FieldSchema> updateFiledSchemaMap, + Map<String, FieldSchema> originFieldSchemaMap) { + for (Entry<String, FieldSchema> updateFieldSchema : updateFiledSchemaMap.entrySet()) { + String columName = updateFieldSchema.getKey(); + if (!originFieldSchemaMap.containsKey(columName)) { + addFieldSchemas.add(updateFieldSchema.getValue()); + originFieldSchemaMap.put(columName, updateFieldSchema.getValue()); + } + } + for (Entry<String, FieldSchema> originFieldSchema : originFieldSchemaMap.entrySet()) { + String columName = originFieldSchema.getKey(); + if (!updateFiledSchemaMap.containsKey(columName)) { + dropFieldSchemas.add(columName); + } + } + if (CollectionUtils.isNotEmpty(dropFieldSchemas)) { + dropFieldSchemas.forEach(originFieldSchemaMap::remove); + } + } + + public static List<String> generateDDLSql(String table) { + ddlSchemas.clear(); + List<String> ddlList = Lists.newArrayList(); + for (FieldSchema fieldSchema : addFieldSchemas) { + String name = fieldSchema.getName(); + String type = fieldSchema.getTypeString(); + String defaultValue = fieldSchema.getDefaultValue(); + String comment = fieldSchema.getComment(); + String addDDL = String.format(ADD_DDL, table, name, type); + if (!StringUtils.isNullOrWhitespaceOnly(defaultValue)) { + addDDL = addDDL + " DEFAULT " + defaultValue; + } + if (!StringUtils.isNullOrWhitespaceOnly(comment)) { + addDDL = addDDL + " COMMENT " + comment; + } + ddlList.add(addDDL); + ddlSchemas.add(new DDLSchema(name, false)); + } + for (String columName : dropFieldSchemas) { + String dropDDL = String.format(DROP_DDL, table, columName); + ddlList.add(dropDDL); + ddlSchemas.add(new DDLSchema(columName, true)); + } + + dropFieldSchemas.clear(); + addFieldSchemas.clear(); + return ddlList; + } + + public static List<DDLSchema> getDdlSchemas() { + return ddlSchemas; + } + + static class DDLSchema { + private final String columnName; + private final boolean isDropColumn; + + public DDLSchema(String columnName, boolean isDropColumn) { + this.columnName = columnName; + this.isDropColumn = isDropColumn; + } + + public String getColumnName() { + return columnName; + } + + public boolean isDropColumn() { + return isDropColumn; + } + } + +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java index 4a44be9..6d10266 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java @@ -79,13 +79,14 @@ public class CdcTools { String excludingTables = params.get("excluding-tables"); boolean createTableOnly = params.has("create-table-only"); boolean ignoreDefaultValue = params.has("ignore-default-value"); + boolean useNewSchemaChange = params.has("use-new-schema-change"); Map<String, String> sinkMap = getConfigMap(params, "sink-conf"); Map<String, String> tableMap = getConfigMap(params, "table-conf"); Configuration sinkConfig = Configuration.fromMap(sinkMap); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - databaseSync.create(env, database, config, tablePrefix, tableSuffix, includingTables, excludingTables, ignoreDefaultValue, sinkConfig, tableMap, createTableOnly); + databaseSync.create(env, database, config, tablePrefix, tableSuffix, includingTables, excludingTables, ignoreDefaultValue, sinkConfig, tableMap, createTableOnly, useNewSchemaChange); databaseSync.build(); if(StringUtils.isNullOrWhitespaceOnly(jobName)){ jobName = String.format("%s-Doris Sync Database: %s", type, config.getString("database-name","db")); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index 82424c1..9604dcd 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -60,6 +60,7 @@ public abstract class DatabaseSync { protected boolean ignoreDefaultValue; public StreamExecutionEnvironment env; private boolean createTableOnly = false; + private boolean newSchemaChange; public abstract Connection getConnection() throws SQLException; @@ -70,7 +71,7 @@ public abstract class DatabaseSync { public void create(StreamExecutionEnvironment env, String database, Configuration config, String tablePrefix, String tableSuffix, String includingTables, String excludingTables, boolean ignoreDefaultValue, Configuration sinkConfig, - Map<String, String> tableConfig, boolean createTableOnly) { + Map<String, String> tableConfig, boolean createTableOnly, boolean useNewSchemaChange) { this.env = env; this.config = config; this.database = database; @@ -85,6 +86,7 @@ public abstract class DatabaseSync { this.tableConfig.put(LIGHT_SCHEMA_CHANGE, "true"); } this.createTableOnly = createTableOnly; + this.newSchemaChange = useNewSchemaChange; } public void build() throws Exception { @@ -185,7 +187,10 @@ public abstract class DatabaseSync { } builder.setDorisReadOptions(DorisReadOptions.builder().build()) .setDorisExecutionOptions(executionBuilder.build()) - .setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisBuilder.build()).build()) + .setSerializer(JsonDebeziumSchemaSerializer.builder() + .setDorisOptions(dorisBuilder.build()) + .setNewSchemaChange(newSchemaChange) + .build()) .setDorisOptions(dorisBuilder.build()); return builder.build(); } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java index 3bad9be..62578d3 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java @@ -78,7 +78,7 @@ public class CDCSchemaChangeExample { builder.setDorisReadOptions(DorisReadOptions.builder().build()) .setDorisExecutionOptions(executionBuilder.build()) .setDorisOptions(dorisOptions) - .setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build()); + .setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).setNewSchemaChange(true).build()); env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")//.print(); .sinkTo(builder.build()); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java index de5a4de..5c2827c 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java @@ -20,12 +20,16 @@ package org.apache.doris.flink.sink.writer; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; + +import org.apache.doris.flink.catalog.doris.FieldSchema; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.exception.DorisException; import org.apache.doris.flink.rest.RestService; import org.apache.doris.flink.rest.models.Field; import org.apache.doris.flink.rest.models.Schema; + +import org.apache.commons.collections.CollectionUtils; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Ignore; @@ -35,8 +39,12 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; /** * test for JsonDebeziumSchemaSerializer. @@ -114,6 +122,64 @@ public class TestJsonDebeziumSchemaSerializer { } + @Test + public void testExtractDDLListMultipleColumns() throws IOException { + String sql0 = "ALTER TABLE test.t1 ADD COLUMN c2 INT"; + String sql1 = "ALTER TABLE test.t1 ADD COLUMN c555 VARCHAR(300)"; + String sql2 = "ALTER TABLE test.t1 ADD COLUMN c666 INT DEFAULT '100'"; + String sql3 = "ALTER TABLE test.t1 ADD COLUMN c4 BIGINT DEFAULT '555'"; + String sql4 = "ALTER TABLE test.t1 ADD COLUMN c199 INT"; + String sql5 = "ALTER TABLE test.t1 ADD COLUMN c12 INT DEFAULT '100'"; + String sql6 = "ALTER TABLE test.t1 DROP COLUMN name"; + String sql7 = "ALTER TABLE test.t1 DROP COLUMN test_time"; + String sql8 = "ALTER TABLE test.t1 DROP COLUMN c1"; + String sql9 = "ALTER TABLE test.t1 DROP COLUMN cc"; + List<String> srcSqlList = Arrays.asList(sql0, sql1, sql2, sql3, sql4,sql5,sql6,sql7,sql8,sql9); + + String record = "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1691033764674,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000029\",\"pos\":23305,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23305,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\ [...] + JsonNode recordRoot = objectMapper.readTree(record); + List<String> ddlSQLList = serializer.extractDDLList(recordRoot); + for (int i = 0; i < ddlSQLList.size(); i++) { + String srcSQL = srcSqlList.get(i); + String targetSQL = ddlSQLList.get(i); + Assert.assertEquals(srcSQL, targetSQL); + } + } + + @Test + public void testExtractDDLListRenameColumn() throws IOException { + String record = "{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1691034519226,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000029\",\"pos\":23752,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23752,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\ [...] + JsonNode recordRoot = objectMapper.readTree(record); + List<String> ddlSQLList = serializer.extractDDLList(recordRoot); + Assert.assertTrue(CollectionUtils.isEmpty(ddlSQLList)); + } + + @Test + public void testFillOriginSchema() throws IOException { + Map<String, FieldSchema> srcFiledSchemaMap = new LinkedHashMap<>(); + srcFiledSchemaMap.put("id", new FieldSchema("id", "INT", null, null)); + srcFiledSchemaMap.put("name", new FieldSchema("name", "VARCHAR(150)", null, null)); + srcFiledSchemaMap.put("test_time", new FieldSchema("test_time", "DATETIMEV2(0)", null, null)); + srcFiledSchemaMap.put("c1", new FieldSchema("c1", "INT", "'100'", null)); + + String columnsString = "[{\"name\":\"id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":false,\"enumValues\":[]},{\"name\":\"name\",\"jdbcType\":12,\"typeName\":\"VARCHAR\",\"typeExpression\":\"VARCHAR\",\"charsetName\":\"utf8mb4\",\"length\":50,\"position\":2,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"co [...] + JsonNode columns = objectMapper.readTree(columnsString); + serializer.fillOriginSchema(columns); + Map<String, FieldSchema> originFieldSchemaMap = serializer.getOriginFieldSchemaMap(); + + Iterator<Entry<String, FieldSchema>> originFieldSchemaIterator = originFieldSchemaMap.entrySet().iterator(); + for (Entry<String, FieldSchema> entry : srcFiledSchemaMap.entrySet()) { + FieldSchema srcFiledSchema = entry.getValue(); + Entry<String, FieldSchema> originField = originFieldSchemaIterator.next(); + + Assert.assertEquals(entry.getKey(), originField.getKey()); + Assert.assertEquals(srcFiledSchema.getName(), originField.getValue().getName()); + Assert.assertEquals(srcFiledSchema.getTypeString(), originField.getValue().getTypeString()); + Assert.assertEquals(srcFiledSchema.getDefaultValue(), originField.getValue().getDefaultValue()); + Assert.assertEquals(srcFiledSchema.getComment(), originField.getValue().getComment()); + } + } + @Ignore @Test public void testSerializeAddColumn() throws IOException, DorisException { diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java index 4a109c2..c20be39 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java @@ -65,8 +65,9 @@ public class CdcMysqlSyncDatabaseCase { String includingTables = "tbl1|tbl2|tbl3"; String excludingTables = ""; boolean ignoreDefaultValue = false; + boolean useNewSchemaChange = false; DatabaseSync databaseSync = new MysqlDatabaseSync(); - databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig, false); + databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig, false, useNewSchemaChange); databaseSync.build(); env.execute(String.format("MySQL-Doris Database Sync: %s", database)); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java index b3b4384..08cf586 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java @@ -71,8 +71,9 @@ public class CdcOraclelSyncDatabaseCase { String includingTables = "test.*"; String excludingTables = ""; boolean ignoreDefaultValue = false; + boolean useNewSchemaChange = false; DatabaseSync databaseSync = new OracleDatabaseSync(); - databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig, false); + databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig, false, useNewSchemaChange); databaseSync.build(); env.execute(String.format("Oracle-Doris Database Sync: %s", database)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org