This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 46de849  [Improve](cdc) Column add quote in schema change (#296)
46de849 is described below

commit 46de8499cc5438a5d2effe6ca5ebe562de8d1d09
Author: wudi <676366...@qq.com>
AuthorDate: Wed Jan 17 14:08:59 2024 +0800

    [Improve](cdc) Column add quote in schema change (#296)
---
 .../doris/flink/catalog/doris/DorisSystem.java     | 11 ++++++--
 .../flink/sink/schema/SchemaChangeHelper.java      | 21 +++++++++++++---
 .../flink/sink/schema/SchemaChangeManager.java     |  2 +-
 .../flink/sink/schema/SchemaChangeHelperTest.java  |  8 +++---
 .../doris/flink/sink/schema/SchemaManagerTest.java | 29 ++++++++++++++++++++--
 .../TestJsonDebeziumSchemaChangeImplV2.java        | 10 ++++----
 6 files changed, 64 insertions(+), 17 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
index 269affa..45266e5 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java
@@ -18,6 +18,7 @@
 package org.apache.doris.flink.catalog.doris;
 
 import org.apache.flink.annotation.Public;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
 import org.apache.commons.compress.utils.Lists;
@@ -253,7 +254,7 @@ public class DorisSystem implements Serializable {
                 .append("',");
     }
 
-    private static String quoteComment(String comment) {
+    public static String quoteComment(String comment) {
         if (comment == null) {
             return "";
         } else {
@@ -266,10 +267,16 @@ public class DorisSystem implements Serializable {
         return result;
     }
 
-    private static String identifier(String name) {
+    public static String identifier(String name) {
         return "`" + name + "`";
     }
 
+    public static String quoteTableIdentifier(String tableIdentifier) {
+        String[] dbTable = tableIdentifier.split("\\.");
+        Preconditions.checkArgument(dbTable.length == 2);
+        return identifier(dbTable[0]) + "." + identifier(dbTable[1]);
+    }
+
     private static String quoteProperties(String name) {
         return "'" + name + "'";
     }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
index 5b9901f..580b990 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
@@ -21,6 +21,7 @@ import org.apache.flink.util.StringUtils;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.compress.utils.Lists;
+import org.apache.doris.flink.catalog.doris.DorisSystem;
 import org.apache.doris.flink.catalog.doris.FieldSchema;
 
 import java.util.List;
@@ -103,23 +104,35 @@ public class SchemaChangeHelper {
         String type = fieldSchema.getTypeString();
         String defaultValue = fieldSchema.getDefaultValue();
         String comment = fieldSchema.getComment();
-        String addDDL = String.format(ADD_DDL, tableIdentifier, name, type);
+        String addDDL =
+                String.format(
+                        ADD_DDL,
+                        DorisSystem.quoteTableIdentifier(tableIdentifier),
+                        DorisSystem.identifier(name),
+                        type);
         if (!StringUtils.isNullOrWhitespaceOnly(defaultValue)) {
             addDDL = addDDL + " DEFAULT " + defaultValue;
         }
         if (!StringUtils.isNullOrWhitespaceOnly(comment)) {
-            addDDL = addDDL + " COMMENT " + comment;
+            addDDL = addDDL + " COMMENT '" + DorisSystem.quoteComment(comment) 
+ "'";
         }
         return addDDL;
     }
 
     public static String buildDropColumnDDL(String tableIdentifier, String 
columName) {
-        return String.format(DROP_DDL, tableIdentifier, columName);
+        return String.format(
+                DROP_DDL,
+                DorisSystem.quoteTableIdentifier(tableIdentifier),
+                DorisSystem.identifier(columName));
     }
 
     public static String buildRenameColumnDDL(
             String tableIdentifier, String oldColumnName, String 
newColumnName) {
-        return String.format(RENAME_DDL, tableIdentifier, oldColumnName, 
newColumnName);
+        return String.format(
+                RENAME_DDL,
+                DorisSystem.quoteTableIdentifier(tableIdentifier),
+                DorisSystem.identifier(oldColumnName),
+                DorisSystem.identifier(newColumnName));
     }
 
     public static String buildColumnExistsQuery(String database, String table, 
String column) {
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
index 8860f6a..979e353 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
@@ -226,7 +226,7 @@ public class SchemaChangeManager implements Serializable {
                                         .getBytes(StandardCharsets.UTF_8)));
     }
 
-    private String getTableIdentifier(String database, String table) {
+    private static String getTableIdentifier(String database, String table) {
         return String.format("%s.%s", database, table);
     }
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaChangeHelperTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaChangeHelperTest.java
index f084a84..b577832 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaChangeHelperTest.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaChangeHelperTest.java
@@ -52,15 +52,17 @@ public class SchemaChangeHelperTest {
         List<String> ddlSqls =
                 SchemaChangeHelper.generateRenameDDLSql(
                         table, oldColumnName, newColumnName, 
originFieldSchemaMap);
-        Assert.assertEquals(ddlSqls.get(0), "ALTER TABLE test.test_sink RENAME 
COLUMN c3 c33");
+        Assert.assertEquals(
+                ddlSqls.get(0), "ALTER TABLE `test`.`test_sink` RENAME COLUMN 
`c3` `c33`");
     }
 
     @Test
     public void testGenerateDDLSql() {
         SchemaChangeHelper.compareSchema(updateFieldSchemaMap, 
originFieldSchemaMap);
         List<String> ddlSqls = 
SchemaChangeHelper.generateDDLSql("test.test_sink");
-        Assert.assertEquals(ddlSqls.get(0), "ALTER TABLE test.test_sink ADD 
COLUMN c4 BIGINT");
         Assert.assertEquals(
-                ddlSqls.get(1), "ALTER TABLE test.test_sink ADD COLUMN c5 
DATETIMEV2(0)");
+                ddlSqls.get(0), "ALTER TABLE `test`.`test_sink` ADD COLUMN 
`c4` BIGINT");
+        Assert.assertEquals(
+                ddlSqls.get(1), "ALTER TABLE `test`.`test_sink` ADD COLUMN 
`c5` DATETIMEV2(0)");
     }
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java
index 0f76489..977f8da 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.flink.sink.schema;
 
+import org.apache.doris.flink.catalog.doris.FieldSchema;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.exception.IllegalArgumentException;
 import org.apache.doris.flink.sink.HttpEntityMock;
@@ -27,6 +28,7 @@ import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
 import org.apache.http.message.BasicStatusLine;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.MockedStatic;
@@ -94,13 +96,36 @@ public class SchemaManagerTest {
     public void testColumnExists() throws IOException, 
IllegalArgumentException {
         entityMock.setValue(queryResponse);
         boolean columnExists = schemaChangeManager.checkColumnExists("test", 
"test_flink", "age");
-        System.out.println(columnExists);
+        Assert.assertEquals(true, columnExists);
     }
 
     @Test
     public void testColumnNotExists() throws IOException, 
IllegalArgumentException {
         entityMock.setValue(queryNoExistsResponse);
         boolean columnExists = schemaChangeManager.checkColumnExists("test", 
"test_flink", "age1");
-        System.out.println(columnExists);
+        Assert.assertEquals(false, columnExists);
+    }
+
+    @Test
+    public void testAddColumn() {
+        FieldSchema field = new FieldSchema("col", "int", "comment \"'sdf'");
+        String addColumnDDL = 
SchemaChangeHelper.buildAddColumnDDL("test.test_flink", field);
+        Assert.assertEquals(
+                "ALTER TABLE `test`.`test_flink` ADD COLUMN `col` int COMMENT 
'comment \"\\'sdf\\''",
+                addColumnDDL);
+    }
+
+    @Test
+    public void testDropColumn() {
+        String dropColumnDDL = 
SchemaChangeHelper.buildDropColumnDDL("test.test_flink", "col");
+        Assert.assertEquals("ALTER TABLE `test`.`test_flink` DROP COLUMN 
`col`", dropColumnDDL);
+    }
+
+    @Test
+    public void testRenameColumn() {
+        String renameColumnDDL =
+                SchemaChangeHelper.buildRenameColumnDDL("test.test_flink", 
"col", "col_new");
+        Assert.assertEquals(
+                "ALTER TABLE `test`.`test_flink` RENAME COLUMN `col` 
`col_new`", renameColumnDDL);
     }
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
index adc87b1..c63267d 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
@@ -64,10 +64,10 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends 
TestJsonDebeziumChangeBa
 
     @Test
     public void testExtractDDLListMultipleColumns() throws IOException {
-        String sql0 = "ALTER TABLE test.t1 ADD COLUMN id INT DEFAULT '10000'";
-        String sql1 = "ALTER TABLE test.t1 ADD COLUMN c199 INT";
-        String sql2 = "ALTER TABLE test.t1 ADD COLUMN c12 INT DEFAULT '100'";
-        String sql3 = "ALTER TABLE test.t1 DROP COLUMN c13";
+        String sql0 = "ALTER TABLE `test`.`t1` ADD COLUMN `id` INT DEFAULT 
'10000'";
+        String sql1 = "ALTER TABLE `test`.`t1` ADD COLUMN `c199` INT";
+        String sql2 = "ALTER TABLE `test`.`t1` ADD COLUMN `c12` INT DEFAULT 
'100'";
+        String sql3 = "ALTER TABLE `test`.`t1` DROP COLUMN `c13`";
         List<String> srcSqlList = Arrays.asList(sql0, sql1, sql2, sql3);
 
         Map<String, FieldSchema> originFiledSchemaMap = new LinkedHashMap<>();
@@ -211,7 +211,7 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends 
TestJsonDebeziumChangeBa
         schemaChange.setOriginFieldSchemaMap(originFieldSchemaMap);
 
         List<String> ddlList = schemaChange.extractDDLList(record);
-        Assert.assertEquals("ALTER TABLE test.t1 RENAME COLUMN c3 c333", 
ddlList.get(0));
+        Assert.assertEquals("ALTER TABLE `test`.`t1` RENAME COLUMN `c3` 
`c333`", ddlList.get(0));
     }
 
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to