This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a92f2a7 [Fix](cdc) Fix sql_parse schema table annotation and field 
type parsing inaccuracies (#540)
1a92f2a7 is described below

commit 1a92f2a719256b38baa65dcee8f686ccb9580095
Author: SHHH <781240...@qq.com>
AuthorDate: Fri Jan 10 14:04:38 2025 +0800

    [Fix](cdc) Fix sql_parse schema table annotation and field type parsing 
inaccuracies (#540)
---
 .../flink/sink/schema/SQLParserSchemaManager.java  | 73 +++++++++++++++++-----
 .../sink/schema/SQLParserSchemaManagerTest.java    | 33 +++++++++-
 2 files changed, 86 insertions(+), 20 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java
index 67a2ddac..626f17ec 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java
@@ -61,6 +61,8 @@ public class SQLParserSchemaManager implements Serializable {
     private static final String PRIMARY_KEY = "PRIMARY KEY";
     private static final String UNIQUE = "UNIQUE";
     private static final String DORIS_CURRENT_TIMESTAMP = "CURRENT_TIMESTAMP";
+    private static final List<String> TYPE_MODIFIER =
+            Arrays.asList("UNSIGNED", "ZEROFILL", "PRECISION");
     private static final Set<String> sourceConnectorTimeValues =
             new HashSet<>(
                     Arrays.asList(
@@ -139,15 +141,13 @@ public class SQLParserSchemaManager implements 
Serializable {
                         .forEach(
                                 column -> {
                                     String columnName = column.getColumnName();
-                                    ColDataType colDataType = 
column.getColDataType();
-                                    String dataType = 
parseDataType(colDataType, sourceConnector);
                                     List<String> columnSpecs = 
column.getColumnSpecs();
-                                    String defaultValue =
-                                            extractDefaultValue(dataType, 
columnSpecs);
-                                    String comment = 
extractComment(columnSpecs);
                                     FieldSchema fieldSchema =
-                                            new FieldSchema(
-                                                    columnName, dataType, 
defaultValue, comment);
+                                            getFieldSchema(
+                                                    column.getColumnName(),
+                                                    column.getColumnSpecs(),
+                                                    column.getColDataType(),
+                                                    sourceConnector);
                                     columnFields.put(columnName, fieldSchema);
                                     extractColumnPrimaryKey(columnName, 
columnSpecs, pkKeys);
                                 });
@@ -181,6 +181,20 @@ public class SQLParserSchemaManager implements 
Serializable {
         return null;
     }
 
+    private String extractTypeModifier(List<String> columnSpecs) {
+        if (CollectionUtils.isEmpty(columnSpecs)) {
+            return "";
+        }
+        StringBuilder builder = new StringBuilder();
+        for (String columnSpec : columnSpecs) {
+            String columnSpecUpperCase = columnSpec.toUpperCase(Locale.ROOT);
+            if (TYPE_MODIFIER.contains(columnSpecUpperCase)) {
+                builder.append(" ").append(columnSpecUpperCase);
+            }
+        }
+        return builder.toString();
+    }
+
     private void extractIndexesPrimaryKey(List<Index> indexes, List<String> 
pkKeys) {
         if (CollectionUtils.isEmpty(indexes)) {
             return;
@@ -215,10 +229,23 @@ public class SQLParserSchemaManager implements 
Serializable {
         if (CollectionUtils.isEmpty(tableOptionsStrings)) {
             return null;
         }
+
+        for (int i = 0; i < tableOptionsStrings.size(); i++) {
+            String columnSpec = tableOptionsStrings.get(i);
+            // If you encounter a COMMENT and the next element is an equal 
sign (=)
+            if (COMMENT.equalsIgnoreCase(columnSpec)
+                    && i + 1 < tableOptionsStrings.size()
+                    && "=".equals(tableOptionsStrings.get(i + 1))) {
+                tableOptionsStrings.remove(i + 1);
+                break;
+            }
+        }
+
         return extractAdjacentString(tableOptionsStrings, COMMENT);
     }
 
-    private String parseDataType(ColDataType colDataType, SourceConnector 
sourceConnector) {
+    private String parseDataType(
+            ColDataType colDataType, String typeModifier, SourceConnector 
sourceConnector) {
         String dataType = colDataType.getDataType();
         int length = 0;
         int scale = 0;
@@ -229,7 +256,8 @@ public class SQLParserSchemaManager implements Serializable 
{
                 scale = Integer.parseInt(argumentsStringList.get(1));
             }
         }
-        return JsonDebeziumChangeUtils.buildDorisTypeName(sourceConnector, 
dataType, length, scale);
+        return JsonDebeziumChangeUtils.buildDorisTypeName(
+                sourceConnector, dataType + typeModifier, length, scale);
     }
 
     private String processDropColumnOperation(AlterExpression alterExpression, 
String dorisTable) {
@@ -244,14 +272,12 @@ public class SQLParserSchemaManager implements 
Serializable {
         List<ColumnDataType> colDataTypeList = 
alterExpression.getColDataTypeList();
         List<String> addColumnList = new ArrayList<>();
         for (ColumnDataType columnDataType : colDataTypeList) {
-            String columnName = columnDataType.getColumnName();
-            ColDataType colDataType = columnDataType.getColDataType();
-            String datatype = parseDataType(colDataType, sourceConnector);
-
-            List<String> columnSpecs = columnDataType.getColumnSpecs();
-            String defaultValue = extractDefaultValue(datatype, columnSpecs);
-            String comment = extractComment(columnSpecs);
-            FieldSchema fieldSchema = new FieldSchema(columnName, datatype, 
defaultValue, comment);
+            FieldSchema fieldSchema =
+                    getFieldSchema(
+                            columnDataType.getColumnName(),
+                            columnDataType.getColumnSpecs(),
+                            columnDataType.getColDataType(),
+                            sourceConnector);
             String addColumnDDL = 
SchemaChangeHelper.buildAddColumnDDL(dorisTable, fieldSchema);
             LOG.info("Parsed add column DDL SQL is: {}", addColumnDDL);
             addColumnList.add(addColumnDDL);
@@ -259,6 +285,19 @@ public class SQLParserSchemaManager implements 
Serializable {
         return addColumnList;
     }
 
+    private FieldSchema getFieldSchema(
+            String columnName,
+            List<String> columnSpecs,
+            ColDataType colDataType,
+            SourceConnector sourceConnector) {
+        String typeModifier = extractTypeModifier(columnSpecs);
+        String datatype = parseDataType(colDataType, typeModifier, 
sourceConnector);
+
+        String defaultValue = extractDefaultValue(datatype, columnSpecs);
+        String comment = extractComment(columnSpecs);
+        return new FieldSchema(columnName, datatype, defaultValue, comment);
+    }
+
     private String processChangeColumnOperation(
             AlterExpression alterExpression, String dorisTable) {
         String columnNewName = 
alterExpression.getColDataTypeList().get(0).getColumnName();
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java
index d65deeb0..c7b23c18 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java
@@ -53,10 +53,12 @@ public class SQLParserSchemaManagerTest {
                 "ALTER TABLE `doris`.`tab` ADD COLUMN `create_time` 
DATETIMEV2(3) DEFAULT CURRENT_TIMESTAMP COMMENT 'time_comment'");
         expectDDLs.add("ALTER TABLE `doris`.`tab` RENAME COLUMN `c10` `c11`");
         expectDDLs.add("ALTER TABLE `doris`.`tab` RENAME COLUMN `c12` `c13`");
+        expectDDLs.add(
+                "ALTER TABLE `doris`.`tab` ADD COLUMN `card` LARGEINT COMMENT 
'card_comment'");
 
         SourceConnector mysql = SourceConnector.MYSQL;
         String ddl =
-                "alter table t1 drop c1, drop column c2, add c3 int default 
100, add column `decimal_type` decimal(38,9) DEFAULT '1.123456789' COMMENT 
'decimal_type_comment', add `create_time` datetime(3) DEFAULT 
CURRENT_TIMESTAMP(3) comment 'time_comment', rename column c10 to c11, change 
column c12 c13 varchar(10)";
+                "alter table t1 drop c1, drop column c2, add c3 int default 
100, add column `decimal_type` decimal(38,9) DEFAULT '1.123456789' COMMENT 
'decimal_type_comment', add `create_time` datetime(3) DEFAULT 
CURRENT_TIMESTAMP(3) comment 'time_comment', rename column c10 to c11, change 
column c12 c13 varchar(10), add card bigint(20) unsigned NOT NULL COMMENT 
'card_comment'";
         List<String> actualDDLs = schemaManager.parseAlterDDLs(mysql, ddl, 
dorisTable);
         for (String actualDDL : actualDDLs) {
             Assert.assertTrue(expectDDLs.contains(actualDDL));
@@ -257,13 +259,38 @@ public class SQLParserSchemaManagerTest {
                         + "  `decimal_type3` decimal(38,9) DEFAULT 
'1.123456789' COMMENT 'comment_test',\n"
                         + "  `create_time3` datetime(3) DEFAULT 
CURRENT_TIMESTAMP(3) COMMENT 'ttime_aaa',\n"
                         + "  PRIMARY KEY (`id`)\n"
-                        + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 
COLLATE=utf8mb4_0900_ai_ci";
+                        + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 
COLLATE=utf8mb4_0900_ai_ci comment='test_sinka'";
+        TableSchema tableSchema =
+                schemaManager.parseCreateTableStatement(
+                        SourceConnector.MYSQL, ddl, dorisTable, null);
+
+        String expected =
+                "TableSchema{database='doris', table='auto_tab', 
tableComment='test_sinka', fields={`id`=FieldSchema{name='`id`', 
typeString='INT', defaultValue='10000', comment='id_test'}, 
`create_time`=FieldSchema{name='`create_time`', typeString='DATETIMEV2(3)', 
defaultValue='CURRENT_TIMESTAMP', comment='null'}, 
`c1`=FieldSchema{name='`c1`', typeString='INT', defaultValue='999', 
comment='null'}, `decimal_type`=FieldSchema{name='`decimal_type`', 
typeString='DECIMALV3(9,3)', defaultValu [...]
+        Assert.assertEquals(expected, tableSchema.toString());
+    }
+
+    @Test
+    public void testParseCreateTableUnsignedStatement() {
+        String dorisTable = "doris.auto_tab";
+        String ddl =
+                "CREATE TABLE `test_sinka` (\n"
+                        + "  `id` BIGINT NOT NULL DEFAULT '10000' COMMENT 
'id_test',\n"
+                        + "  `id2` BIGINT UNSIGNED ZEROFILL NOT NULL DEFAULT 
'10000' COMMENT 'id2_comment',\n"
+                        + "  `create_time` datetime(3) DEFAULT 
CURRENT_TIMESTAMP(3),\n"
+                        + "  `c1` int DEFAULT '999',\n"
+                        + "  `decimal_type` decimal(9,3) DEFAULT '1.000' 
COMMENT 'decimal_tes',\n"
+                        + "  `aaa` varchar(100) DEFAULT NULL,\n"
+                        + "  `decimal_type3` decimal(38,9) DEFAULT 
'1.123456789' COMMENT 'comment_test',\n"
+                        + "  `create_time3` datetime(3) DEFAULT 
CURRENT_TIMESTAMP(3) COMMENT 'ttime_aaa',\n"
+                        + "  PRIMARY KEY (`id`)\n"
+                        + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 
COLLATE=utf8mb4_0900_ai_ci comment 'test_sinka'";
         TableSchema tableSchema =
                 schemaManager.parseCreateTableStatement(
                         SourceConnector.MYSQL, ddl, dorisTable, null);
 
         String expected =
-                "TableSchema{database='doris', table='auto_tab', 
tableComment='null', fields={`id`=FieldSchema{name='`id`', typeString='INT', 
defaultValue='10000', comment='id_test'}, 
`create_time`=FieldSchema{name='`create_time`', typeString='DATETIMEV2(3)', 
defaultValue='CURRENT_TIMESTAMP', comment='null'}, 
`c1`=FieldSchema{name='`c1`', typeString='INT', defaultValue='999', 
comment='null'}, `decimal_type`=FieldSchema{name='`decimal_type`', 
typeString='DECIMALV3(9,3)', defaultValue='1.0 [...]
+                "TableSchema{database='doris', table='auto_tab', 
tableComment='test_sinka', fields={`id`=FieldSchema{name='`id`', 
typeString='BIGINT', defaultValue='10000', comment='id_test'}, 
`id2`=FieldSchema{name='`id2`', typeString='LARGEINT', defaultValue='10000', 
comment='id2_comment'}, `create_time`=FieldSchema{name='`create_time`', 
typeString='DATETIMEV2(3)', defaultValue='CURRENT_TIMESTAMP', comment='null'}, 
`c1`=FieldSchema{name='`c1`', typeString='INT', defaultValue='999', com [...]
+        System.out.println(tableSchema.toString());
         Assert.assertEquals(expected, tableSchema.toString());
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to