This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new c4ae0512 [Improve]Support modify column type without default when 
column exists default value (#490)
c4ae0512 is described below

commit c4ae0512941f98108caaa13fe4cf2dfcece61b6e
Author: North Lin <37775475+qg-...@users.noreply.github.com>
AuthorDate: Fri Sep 20 09:57:07 2024 +0800

    [Improve]Support modify column type without default when column exists 
default value (#490)
---
 .../flink/exception/IllegalArgumentException.java  |  4 ++
 .../flink/sink/schema/SchemaChangeHelper.java      | 11 ++++
 .../flink/sink/schema/SchemaChangeManager.java     | 55 +++++++++++++++--
 .../flink/sink/schema/SchemaManagerITCase.java     | 70 +++++++++++++++++++---
 4 files changed, 127 insertions(+), 13 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/IllegalArgumentException.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/IllegalArgumentException.java
index 4c0ae093..7b242891 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/IllegalArgumentException.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/IllegalArgumentException.java
@@ -25,4 +25,8 @@ public class IllegalArgumentException extends DorisException {
     public IllegalArgumentException(String arg, String value) {
         super("argument '" + arg + "' is illegal, value is '" + value + "'.");
     }
+
+    public IllegalArgumentException(String msg) {
+        super(msg);
+    }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
index 74b57417..d0630b03 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java
@@ -44,6 +44,7 @@ public class SchemaChangeHelper {
     private static final String CREATE_DATABASE_DDL = "CREATE DATABASE IF NOT 
EXISTS %s";
     private static final String MODIFY_TYPE_DDL = "ALTER TABLE %s MODIFY 
COLUMN %s %s";
     private static final String MODIFY_COMMENT_DDL = "ALTER TABLE %s MODIFY 
COLUMN %s COMMENT '%s'";
+    private static final String SHOW_FULL_COLUMN_DDL = "SHOW FULL COLUMNS FROM 
`%s`.`%s`";
 
     public static void compareSchema(
             Map<String, FieldSchema> updateFiledSchemaMap,
@@ -166,6 +167,7 @@ public class SchemaChangeHelper {
         String columnName = fieldSchema.getName();
         String dataType = fieldSchema.getTypeString();
         String comment = fieldSchema.getComment();
+        String defaultValue = fieldSchema.getDefaultValue();
         StringBuilder modifyDDL =
                 new StringBuilder(
                         String.format(
@@ -173,6 +175,11 @@ public class SchemaChangeHelper {
                                 
DorisSchemaFactory.quoteTableIdentifier(tableIdentifier),
                                 DorisSchemaFactory.identifier(columnName),
                                 dataType));
+        if (StringUtils.isNotBlank(defaultValue)) {
+            modifyDDL
+                    .append(" DEFAULT ")
+                    
.append(DorisSchemaFactory.quoteDefaultValue(defaultValue));
+        }
         commentColumn(modifyDDL, comment);
         return modifyDDL.toString();
     }
@@ -183,6 +190,10 @@ public class SchemaChangeHelper {
         }
     }
 
+    public static String buildShowFullColumnDDL(String database, String table) 
{
+        return String.format(SHOW_FULL_COLUMN_DDL, database, table);
+    }
+
     public static List<DDLSchema> getDdlSchemas() {
         return ddlSchemas;
     }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
index c946bee7..50ec1d34 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
@@ -23,6 +23,7 @@ import org.apache.flink.util.StringUtils;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.NullNode;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.doris.flink.catalog.doris.DorisSystem;
 import org.apache.doris.flink.catalog.doris.FieldSchema;
@@ -123,11 +124,30 @@ public class SchemaChangeManager implements Serializable {
             throws IOException, IllegalArgumentException {
         if (!checkColumnExists(database, table, field.getName())) {
             LOG.warn(
-                    "The column {} is not exists in table {}, can not modify 
it type",
+                    "The column {} is not exists in table {}, can not modify 
it's type",
                     field.getName(),
                     table);
             return false;
         }
+
+        String ddl = SchemaChangeHelper.buildShowFullColumnDDL(database, 
table);
+        String defaultValue = getDefaultValue(ddl, database, field.getName());
+        if (!StringUtils.isNullOrWhitespaceOnly(field.getDefaultValue())) {
+            // Can not change default value
+            if (!field.getDefaultValue().equals(defaultValue)) {
+                LOG.warn(
+                        "Column:{} can not change default value from {} to {}, 
fallback it",
+                        field.getName(),
+                        defaultValue,
+                        field.getDefaultValue());
+                field.setDefaultValue(defaultValue);
+            }
+        } else {
+            // If user does not give a default value, need fill it from
+            // original table schema to avoid change type failed if default 
value exists
+            field.setDefaultValue(defaultValue);
+        }
+
         // If user does not give a comment, need fill it from
         // original table schema to avoid miss comment
         if (StringUtils.isNullOrWhitespaceOnly(field.getComment())) {
@@ -214,15 +234,42 @@ public class SchemaChangeManager implements Serializable {
         }
     }
 
+    private String getDefaultValue(String ddl, String database, String column)
+            throws IOException, IllegalArgumentException {
+        String responseEntity = executeThenReturnResponse(ddl, database);
+        JsonNode responseNode = objectMapper.readTree(responseEntity);
+        String code = responseNode.get("code").asText("-1");
+        if (code.equals("0")) {
+            JsonNode data = responseNode.get("data").get("data");
+            for (JsonNode node : data) {
+                if (node.get(0).asText().equals(column)) {
+                    JsonNode defaultValueNode = node.get(5);
+                    return (defaultValueNode instanceof NullNode)
+                            ? null
+                            : defaultValueNode.asText();
+                }
+            }
+            return null;
+        } else {
+            throw new DorisSchemaChangeException(
+                    "Failed to get default value, response: " + 
responseEntity);
+        }
+    }
+
     /** execute sql in doris. */
-    public boolean execute(String ddl, String database)
+    private String executeThenReturnResponse(String ddl, String database)
             throws IOException, IllegalArgumentException {
         if (StringUtils.isNullOrWhitespaceOnly(ddl)) {
-            return false;
+            throw new IllegalArgumentException("ddl can not be null or empty 
string!");
         }
         LOG.info("Execute SQL: {}", ddl);
         HttpPost httpPost = buildHttpPost(ddl, database);
-        String responseEntity = handleResponse(httpPost);
+        return handleResponse(httpPost);
+    }
+
+    public boolean execute(String ddl, String database)
+            throws IOException, IllegalArgumentException {
+        String responseEntity = executeThenReturnResponse(ddl, database);
         return handleSchemaChange(responseEntity);
     }
 
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java
index 37ca3a2d..c4f7f282 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java
@@ -17,7 +17,9 @@
 
 package org.apache.doris.flink.sink.schema;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.doris.flink.catalog.doris.DataModel;
+import org.apache.doris.flink.catalog.doris.DorisSchemaFactory;
 import org.apache.doris.flink.catalog.doris.FieldSchema;
 import org.apache.doris.flink.catalog.doris.TableSchema;
 import org.apache.doris.flink.cfg.DorisOptions;
@@ -57,7 +59,7 @@ public class SchemaManagerITCase extends 
AbstractITCaseService {
         schemaChangeManager = new SchemaChangeManager(options);
     }
 
-    private void initDorisSchemaChangeTable(String table) {
+    private void initDorisSchemaChangeTable(String table, String defaultValue) 
{
         ContainerUtils.executeSQLStatement(
                 getDorisQueryConnection(),
                 LOG,
@@ -67,17 +69,22 @@ public class SchemaManagerITCase extends 
AbstractITCaseService {
                         "CREATE TABLE %s.%s ( \n"
                                 + "`id` varchar(32),\n"
                                 + "`age` int\n"
+                                + (StringUtils.isNotBlank(defaultValue)
+                                        ? " DEFAULT "
+                                                + 
DorisSchemaFactory.quoteDefaultValue(defaultValue)
+                                        : "")
                                 + ") DISTRIBUTED BY HASH(`id`) BUCKETS 1\n"
                                 + "PROPERTIES (\n"
                                 + "\"replication_num\" = \"1\"\n"
                                 + ")\n",
-                        DATABASE, table));
+                        DATABASE,
+                        table));
     }
 
     @Test
     public void testAddColumn() throws IOException, IllegalArgumentException {
         String addColumnTbls = "add_column";
-        initDorisSchemaChangeTable(addColumnTbls);
+        initDorisSchemaChangeTable(addColumnTbls, null);
         FieldSchema field = new FieldSchema("c1", "int", "");
         schemaChangeManager.addColumn(DATABASE, addColumnTbls, field);
         boolean exists = schemaChangeManager.addColumn(DATABASE, 
addColumnTbls, field);
@@ -91,7 +98,7 @@ public class SchemaManagerITCase extends 
AbstractITCaseService {
     public void testAddColumnWithChineseComment()
             throws IOException, IllegalArgumentException, InterruptedException 
{
         String addColumnTbls = "add_column";
-        initDorisSchemaChangeTable(addColumnTbls);
+        initDorisSchemaChangeTable(addColumnTbls, null);
 
         // add a column by UTF-8 encoding
         String addColumnName = "col_with_comment1";
@@ -147,7 +154,7 @@ public class SchemaManagerITCase extends 
AbstractITCaseService {
     @Test
     public void testDropColumn() throws IOException, IllegalArgumentException {
         String dropColumnTbls = "drop_column";
-        initDorisSchemaChangeTable(dropColumnTbls);
+        initDorisSchemaChangeTable(dropColumnTbls, null);
         schemaChangeManager.dropColumn(DATABASE, dropColumnTbls, "age");
         boolean success = schemaChangeManager.dropColumn(DATABASE, 
dropColumnTbls, "age");
         Assert.assertTrue(success);
@@ -159,7 +166,7 @@ public class SchemaManagerITCase extends 
AbstractITCaseService {
     @Test
     public void testRenameColumn() throws IOException, 
IllegalArgumentException {
         String renameColumnTbls = "rename_column";
-        initDorisSchemaChangeTable(renameColumnTbls);
+        initDorisSchemaChangeTable(renameColumnTbls, null);
         schemaChangeManager.renameColumn(DATABASE, renameColumnTbls, "age", 
"age1");
         boolean exists = schemaChangeManager.checkColumnExists(DATABASE, 
renameColumnTbls, "age1");
         Assert.assertTrue(exists);
@@ -171,7 +178,7 @@ public class SchemaManagerITCase extends 
AbstractITCaseService {
     @Test
     public void testModifyColumnComment() throws IOException, 
IllegalArgumentException {
         String modifyColumnCommentTbls = "modify_column_comment";
-        initDorisSchemaChangeTable(modifyColumnCommentTbls);
+        initDorisSchemaChangeTable(modifyColumnCommentTbls, null);
         String columnName = "age";
         String newComment = "new comment of age";
         schemaChangeManager.modifyColumnComment(
@@ -187,7 +194,7 @@ public class SchemaManagerITCase extends 
AbstractITCaseService {
         String modifyColumnTbls = "modify_column_type";
         String columnName = "age";
         String newColumnType = "bigint";
-        initDorisSchemaChangeTable(modifyColumnTbls);
+        initDorisSchemaChangeTable(modifyColumnTbls, null);
         FieldSchema field = new FieldSchema(columnName, newColumnType, "");
         schemaChangeManager.modifyColumnDataType(DATABASE, modifyColumnTbls, 
field);
 
@@ -200,7 +207,7 @@ public class SchemaManagerITCase extends 
AbstractITCaseService {
     public void testModifyColumnTypeAndComment()
             throws IOException, IllegalArgumentException, InterruptedException 
{
         String modifyColumnTbls = "modify_column_type_and_comment";
-        initDorisSchemaChangeTable(modifyColumnTbls);
+        initDorisSchemaChangeTable(modifyColumnTbls, null);
         String columnName = "age";
         String newColumnType = "bigint";
         String newComment = "new comment of age";
@@ -238,4 +245,49 @@ public class SchemaManagerITCase extends 
AbstractITCaseService {
         Thread.sleep(3_000);
         Assert.assertNotNull(schemaChangeManager.getTableSchema(databaseName, 
tableName));
     }
+
+    @Test
+    public void testModifyColumnTypeWithoutDefault()
+            throws IOException, IllegalArgumentException, InterruptedException 
{
+        String modifyColumnTbls = "modify_column_type_without_default_value";
+        String columnName = "age";
+        String newColumnType = "bigint";
+        initDorisSchemaChangeTable(modifyColumnTbls, "18");
+        FieldSchema field = new FieldSchema(columnName, newColumnType, null, 
"");
+        schemaChangeManager.modifyColumnDataType(DATABASE, modifyColumnTbls, 
field);
+
+        Thread.sleep(3_000);
+        String columnType = getColumnType(modifyColumnTbls, columnName);
+        Assert.assertEquals(newColumnType, columnType.toLowerCase());
+    }
+
+    @Test
+    public void testModifyColumnTypeWithDefault()
+            throws IOException, IllegalArgumentException, InterruptedException 
{
+        String modifyColumnTbls = "modify_column_type_with_default_value";
+        String columnName = "age";
+        String newColumnType = "bigint";
+        initDorisSchemaChangeTable(modifyColumnTbls, "18");
+        FieldSchema field = new FieldSchema(columnName, newColumnType, "18", 
"");
+        schemaChangeManager.modifyColumnDataType(DATABASE, modifyColumnTbls, 
field);
+
+        Thread.sleep(3_000);
+        String columnType = getColumnType(modifyColumnTbls, columnName);
+        Assert.assertEquals(newColumnType, columnType.toLowerCase());
+    }
+
+    @Test
+    public void testModifyColumnTypeWithDefaultAndChange()
+            throws IOException, IllegalArgumentException, InterruptedException 
{
+        String modifyColumnTbls = 
"modify_column_type_with_default_value_and_change";
+        String columnName = "age";
+        String newColumnType = "bigint";
+        initDorisSchemaChangeTable(modifyColumnTbls, "18");
+        FieldSchema field = new FieldSchema(columnName, newColumnType, "19", 
"new comment");
+        schemaChangeManager.modifyColumnDataType(DATABASE, modifyColumnTbls, 
field);
+
+        Thread.sleep(3_000);
+        String columnType = getColumnType(modifyColumnTbls, columnName);
+        Assert.assertEquals(newColumnType, columnType.toLowerCase());
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to