This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 46de849 [Improve](cdc) Column add quote in schema change (#296) 46de849 is described below commit 46de8499cc5438a5d2effe6ca5ebe562de8d1d09 Author: wudi <676366...@qq.com> AuthorDate: Wed Jan 17 14:08:59 2024 +0800 [Improve](cdc) Column add quote in schema change (#296) --- .../doris/flink/catalog/doris/DorisSystem.java | 11 ++++++-- .../flink/sink/schema/SchemaChangeHelper.java | 21 +++++++++++++--- .../flink/sink/schema/SchemaChangeManager.java | 2 +- .../flink/sink/schema/SchemaChangeHelperTest.java | 8 +++--- .../doris/flink/sink/schema/SchemaManagerTest.java | 29 ++++++++++++++++++++-- .../TestJsonDebeziumSchemaChangeImplV2.java | 10 ++++---- 6 files changed, 64 insertions(+), 17 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java index 269affa..45266e5 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java @@ -18,6 +18,7 @@ package org.apache.doris.flink.catalog.doris; import org.apache.flink.annotation.Public; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; import org.apache.commons.compress.utils.Lists; @@ -253,7 +254,7 @@ public class DorisSystem implements Serializable { .append("',"); } - private static String quoteComment(String comment) { + public static String quoteComment(String comment) { if (comment == null) { return ""; } else { @@ -266,10 +267,16 @@ public class DorisSystem implements Serializable { return result; } - private static String identifier(String name) { + public static String identifier(String name) { return "`" + name + "`"; } + public static String quoteTableIdentifier(String tableIdentifier) { + String[] dbTable = tableIdentifier.split("\\."); + Preconditions.checkArgument(dbTable.length == 2); + return identifier(dbTable[0]) + "." + identifier(dbTable[1]); + } + private static String quoteProperties(String name) { return "'" + name + "'"; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java index 5b9901f..580b990 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java @@ -21,6 +21,7 @@ import org.apache.flink.util.StringUtils; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.compress.utils.Lists; +import org.apache.doris.flink.catalog.doris.DorisSystem; import org.apache.doris.flink.catalog.doris.FieldSchema; import java.util.List; @@ -103,23 +104,35 @@ public class SchemaChangeHelper { String type = fieldSchema.getTypeString(); String defaultValue = fieldSchema.getDefaultValue(); String comment = fieldSchema.getComment(); - String addDDL = String.format(ADD_DDL, tableIdentifier, name, type); + String addDDL = + String.format( + ADD_DDL, + DorisSystem.quoteTableIdentifier(tableIdentifier), + DorisSystem.identifier(name), + type); if (!StringUtils.isNullOrWhitespaceOnly(defaultValue)) { addDDL = addDDL + " DEFAULT " + defaultValue; } if (!StringUtils.isNullOrWhitespaceOnly(comment)) { - addDDL = addDDL + " COMMENT " + comment; + addDDL = addDDL + " COMMENT '" + DorisSystem.quoteComment(comment) + "'"; } return addDDL; } public static String buildDropColumnDDL(String tableIdentifier, String columName) { - return String.format(DROP_DDL, tableIdentifier, columName); + return String.format( + DROP_DDL, + DorisSystem.quoteTableIdentifier(tableIdentifier), + DorisSystem.identifier(columName)); } public static String buildRenameColumnDDL( String tableIdentifier, String oldColumnName, String newColumnName) { - return String.format(RENAME_DDL, tableIdentifier, oldColumnName, newColumnName); + return String.format( + RENAME_DDL, + DorisSystem.quoteTableIdentifier(tableIdentifier), + DorisSystem.identifier(oldColumnName), + DorisSystem.identifier(newColumnName)); } public static String buildColumnExistsQuery(String database, String table, String column) { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java index 8860f6a..979e353 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java @@ -226,7 +226,7 @@ public class SchemaChangeManager implements Serializable { .getBytes(StandardCharsets.UTF_8))); } - private String getTableIdentifier(String database, String table) { + private static String getTableIdentifier(String database, String table) { return String.format("%s.%s", database, table); } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaChangeHelperTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaChangeHelperTest.java index f084a84..b577832 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaChangeHelperTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaChangeHelperTest.java @@ -52,15 +52,17 @@ public class SchemaChangeHelperTest { List<String> ddlSqls = SchemaChangeHelper.generateRenameDDLSql( table, oldColumnName, newColumnName, originFieldSchemaMap); - Assert.assertEquals(ddlSqls.get(0), "ALTER TABLE test.test_sink RENAME COLUMN c3 c33"); + Assert.assertEquals( + ddlSqls.get(0), "ALTER TABLE `test`.`test_sink` RENAME COLUMN `c3` `c33`"); } @Test public void testGenerateDDLSql() { SchemaChangeHelper.compareSchema(updateFieldSchemaMap, originFieldSchemaMap); List<String> ddlSqls = SchemaChangeHelper.generateDDLSql("test.test_sink"); - Assert.assertEquals(ddlSqls.get(0), "ALTER TABLE test.test_sink ADD COLUMN c4 BIGINT"); Assert.assertEquals( - ddlSqls.get(1), "ALTER TABLE test.test_sink ADD COLUMN c5 DATETIMEV2(0)"); + ddlSqls.get(0), "ALTER TABLE `test`.`test_sink` ADD COLUMN `c4` BIGINT"); + Assert.assertEquals( + ddlSqls.get(1), "ALTER TABLE `test`.`test_sink` ADD COLUMN `c5` DATETIMEV2(0)"); } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java index 0f76489..977f8da 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java @@ -17,6 +17,7 @@ package org.apache.doris.flink.sink.schema; +import org.apache.doris.flink.catalog.doris.FieldSchema; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.exception.IllegalArgumentException; import org.apache.doris.flink.sink.HttpEntityMock; @@ -27,6 +28,7 @@ import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.message.BasicStatusLine; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.MockedStatic; @@ -94,13 +96,36 @@ public class SchemaManagerTest { public void testColumnExists() throws IOException, IllegalArgumentException { entityMock.setValue(queryResponse); boolean columnExists = schemaChangeManager.checkColumnExists("test", "test_flink", "age"); - System.out.println(columnExists); + Assert.assertEquals(true, columnExists); } @Test public void testColumnNotExists() throws IOException, IllegalArgumentException { entityMock.setValue(queryNoExistsResponse); boolean columnExists = schemaChangeManager.checkColumnExists("test", "test_flink", "age1"); - System.out.println(columnExists); + Assert.assertEquals(false, columnExists); + } + + @Test + public void testAddColumn() { + FieldSchema field = new FieldSchema("col", "int", "comment \"'sdf'"); + String addColumnDDL = SchemaChangeHelper.buildAddColumnDDL("test.test_flink", field); + Assert.assertEquals( + "ALTER TABLE `test`.`test_flink` ADD COLUMN `col` int COMMENT 'comment \"\\'sdf\\''", + addColumnDDL); + } + + @Test + public void testDropColumn() { + String dropColumnDDL = SchemaChangeHelper.buildDropColumnDDL("test.test_flink", "col"); + Assert.assertEquals("ALTER TABLE `test`.`test_flink` DROP COLUMN `col`", dropColumnDDL); + } + + @Test + public void testRenameColumn() { + String renameColumnDDL = + SchemaChangeHelper.buildRenameColumnDDL("test.test_flink", "col", "col_new"); + Assert.assertEquals( + "ALTER TABLE `test`.`test_flink` RENAME COLUMN `col` `col_new`", renameColumnDDL); } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java index adc87b1..c63267d 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java @@ -64,10 +64,10 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends TestJsonDebeziumChangeBa @Test public void testExtractDDLListMultipleColumns() throws IOException { - String sql0 = "ALTER TABLE test.t1 ADD COLUMN id INT DEFAULT '10000'"; - String sql1 = "ALTER TABLE test.t1 ADD COLUMN c199 INT"; - String sql2 = "ALTER TABLE test.t1 ADD COLUMN c12 INT DEFAULT '100'"; - String sql3 = "ALTER TABLE test.t1 DROP COLUMN c13"; + String sql0 = "ALTER TABLE `test`.`t1` ADD COLUMN `id` INT DEFAULT '10000'"; + String sql1 = "ALTER TABLE `test`.`t1` ADD COLUMN `c199` INT"; + String sql2 = "ALTER TABLE `test`.`t1` ADD COLUMN `c12` INT DEFAULT '100'"; + String sql3 = "ALTER TABLE `test`.`t1` DROP COLUMN `c13`"; List<String> srcSqlList = Arrays.asList(sql0, sql1, sql2, sql3); Map<String, FieldSchema> originFiledSchemaMap = new LinkedHashMap<>(); @@ -211,7 +211,7 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends TestJsonDebeziumChangeBa schemaChange.setOriginFieldSchemaMap(originFieldSchemaMap); List<String> ddlList = schemaChange.extractDDLList(record); - Assert.assertEquals("ALTER TABLE test.t1 RENAME COLUMN c3 c333", ddlList.get(0)); + Assert.assertEquals("ALTER TABLE `test`.`t1` RENAME COLUMN `c3` `c333`", ddlList.get(0)); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org