This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 1a92f2a7 [Fix](cdc) Fix sql_parse schema table annotation and field type parsing inaccuracies (#540) 1a92f2a7 is described below commit 1a92f2a719256b38baa65dcee8f686ccb9580095 Author: SHHH <781240...@qq.com> AuthorDate: Fri Jan 10 14:04:38 2025 +0800 [Fix](cdc) Fix sql_parse schema table annotation and field type parsing inaccuracies (#540) --- .../flink/sink/schema/SQLParserSchemaManager.java | 73 +++++++++++++++++----- .../sink/schema/SQLParserSchemaManagerTest.java | 33 +++++++++- 2 files changed, 86 insertions(+), 20 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java index 67a2ddac..626f17ec 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java @@ -61,6 +61,8 @@ public class SQLParserSchemaManager implements Serializable { private static final String PRIMARY_KEY = "PRIMARY KEY"; private static final String UNIQUE = "UNIQUE"; private static final String DORIS_CURRENT_TIMESTAMP = "CURRENT_TIMESTAMP"; + private static final List<String> TYPE_MODIFIER = + Arrays.asList("UNSIGNED", "ZEROFILL", "PRECISION"); private static final Set<String> sourceConnectorTimeValues = new HashSet<>( Arrays.asList( @@ -139,15 +141,13 @@ public class SQLParserSchemaManager implements Serializable { .forEach( column -> { String columnName = column.getColumnName(); - ColDataType colDataType = column.getColDataType(); - String dataType = parseDataType(colDataType, sourceConnector); List<String> columnSpecs = column.getColumnSpecs(); - String defaultValue = - extractDefaultValue(dataType, columnSpecs); - String comment = extractComment(columnSpecs); FieldSchema fieldSchema = - new FieldSchema( - columnName, dataType, defaultValue, comment); + getFieldSchema( + column.getColumnName(), + column.getColumnSpecs(), + column.getColDataType(), + sourceConnector); columnFields.put(columnName, fieldSchema); extractColumnPrimaryKey(columnName, columnSpecs, pkKeys); }); @@ -181,6 +181,20 @@ public class SQLParserSchemaManager implements Serializable { return null; } + private String extractTypeModifier(List<String> columnSpecs) { + if (CollectionUtils.isEmpty(columnSpecs)) { + return ""; + } + StringBuilder builder = new StringBuilder(); + for (String columnSpec : columnSpecs) { + String columnSpecUpperCase = columnSpec.toUpperCase(Locale.ROOT); + if (TYPE_MODIFIER.contains(columnSpecUpperCase)) { + builder.append(" ").append(columnSpecUpperCase); + } + } + return builder.toString(); + } + private void extractIndexesPrimaryKey(List<Index> indexes, List<String> pkKeys) { if (CollectionUtils.isEmpty(indexes)) { return; @@ -215,10 +229,23 @@ public class SQLParserSchemaManager implements Serializable { if (CollectionUtils.isEmpty(tableOptionsStrings)) { return null; } + + for (int i = 0; i < tableOptionsStrings.size(); i++) { + String columnSpec = tableOptionsStrings.get(i); + // If you encounter a COMMENT and the next element is an equal sign (=) + if (COMMENT.equalsIgnoreCase(columnSpec) + && i + 1 < tableOptionsStrings.size() + && "=".equals(tableOptionsStrings.get(i + 1))) { + tableOptionsStrings.remove(i + 1); + break; + } + } + return extractAdjacentString(tableOptionsStrings, COMMENT); } - private String parseDataType(ColDataType colDataType, SourceConnector sourceConnector) { + private String parseDataType( + ColDataType colDataType, String typeModifier, SourceConnector sourceConnector) { String dataType = colDataType.getDataType(); int length = 0; int scale = 0; @@ -229,7 +256,8 @@ public class SQLParserSchemaManager implements Serializable { scale = Integer.parseInt(argumentsStringList.get(1)); } } - return JsonDebeziumChangeUtils.buildDorisTypeName(sourceConnector, dataType, length, scale); + return JsonDebeziumChangeUtils.buildDorisTypeName( + sourceConnector, dataType + typeModifier, length, scale); } private String processDropColumnOperation(AlterExpression alterExpression, String dorisTable) { @@ -244,14 +272,12 @@ public class SQLParserSchemaManager implements Serializable { List<ColumnDataType> colDataTypeList = alterExpression.getColDataTypeList(); List<String> addColumnList = new ArrayList<>(); for (ColumnDataType columnDataType : colDataTypeList) { - String columnName = columnDataType.getColumnName(); - ColDataType colDataType = columnDataType.getColDataType(); - String datatype = parseDataType(colDataType, sourceConnector); - - List<String> columnSpecs = columnDataType.getColumnSpecs(); - String defaultValue = extractDefaultValue(datatype, columnSpecs); - String comment = extractComment(columnSpecs); - FieldSchema fieldSchema = new FieldSchema(columnName, datatype, defaultValue, comment); + FieldSchema fieldSchema = + getFieldSchema( + columnDataType.getColumnName(), + columnDataType.getColumnSpecs(), + columnDataType.getColDataType(), + sourceConnector); String addColumnDDL = SchemaChangeHelper.buildAddColumnDDL(dorisTable, fieldSchema); LOG.info("Parsed add column DDL SQL is: {}", addColumnDDL); addColumnList.add(addColumnDDL); @@ -259,6 +285,19 @@ public class SQLParserSchemaManager implements Serializable { return addColumnList; } + private FieldSchema getFieldSchema( + String columnName, + List<String> columnSpecs, + ColDataType colDataType, + SourceConnector sourceConnector) { + String typeModifier = extractTypeModifier(columnSpecs); + String datatype = parseDataType(colDataType, typeModifier, sourceConnector); + + String defaultValue = extractDefaultValue(datatype, columnSpecs); + String comment = extractComment(columnSpecs); + return new FieldSchema(columnName, datatype, defaultValue, comment); + } + private String processChangeColumnOperation( AlterExpression alterExpression, String dorisTable) { String columnNewName = alterExpression.getColDataTypeList().get(0).getColumnName(); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java index d65deeb0..c7b23c18 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java @@ -53,10 +53,12 @@ public class SQLParserSchemaManagerTest { "ALTER TABLE `doris`.`tab` ADD COLUMN `create_time` DATETIMEV2(3) DEFAULT CURRENT_TIMESTAMP COMMENT 'time_comment'"); expectDDLs.add("ALTER TABLE `doris`.`tab` RENAME COLUMN `c10` `c11`"); expectDDLs.add("ALTER TABLE `doris`.`tab` RENAME COLUMN `c12` `c13`"); + expectDDLs.add( + "ALTER TABLE `doris`.`tab` ADD COLUMN `card` LARGEINT COMMENT 'card_comment'"); SourceConnector mysql = SourceConnector.MYSQL; String ddl = - "alter table t1 drop c1, drop column c2, add c3 int default 100, add column `decimal_type` decimal(38,9) DEFAULT '1.123456789' COMMENT 'decimal_type_comment', add `create_time` datetime(3) DEFAULT CURRENT_TIMESTAMP(3) comment 'time_comment', rename column c10 to c11, change column c12 c13 varchar(10)"; + "alter table t1 drop c1, drop column c2, add c3 int default 100, add column `decimal_type` decimal(38,9) DEFAULT '1.123456789' COMMENT 'decimal_type_comment', add `create_time` datetime(3) DEFAULT CURRENT_TIMESTAMP(3) comment 'time_comment', rename column c10 to c11, change column c12 c13 varchar(10), add card bigint(20) unsigned NOT NULL COMMENT 'card_comment'"; List<String> actualDDLs = schemaManager.parseAlterDDLs(mysql, ddl, dorisTable); for (String actualDDL : actualDDLs) { Assert.assertTrue(expectDDLs.contains(actualDDL)); @@ -257,13 +259,38 @@ public class SQLParserSchemaManagerTest { + " `decimal_type3` decimal(38,9) DEFAULT '1.123456789' COMMENT 'comment_test',\n" + " `create_time3` datetime(3) DEFAULT CURRENT_TIMESTAMP(3) COMMENT 'ttime_aaa',\n" + " PRIMARY KEY (`id`)\n" - + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci"; + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci comment='test_sinka'"; + TableSchema tableSchema = + schemaManager.parseCreateTableStatement( + SourceConnector.MYSQL, ddl, dorisTable, null); + + String expected = + "TableSchema{database='doris', table='auto_tab', tableComment='test_sinka', fields={`id`=FieldSchema{name='`id`', typeString='INT', defaultValue='10000', comment='id_test'}, `create_time`=FieldSchema{name='`create_time`', typeString='DATETIMEV2(3)', defaultValue='CURRENT_TIMESTAMP', comment='null'}, `c1`=FieldSchema{name='`c1`', typeString='INT', defaultValue='999', comment='null'}, `decimal_type`=FieldSchema{name='`decimal_type`', typeString='DECIMALV3(9,3)', defaultValu [...] + Assert.assertEquals(expected, tableSchema.toString()); + } + + @Test + public void testParseCreateTableUnsignedStatement() { + String dorisTable = "doris.auto_tab"; + String ddl = + "CREATE TABLE `test_sinka` (\n" + + " `id` BIGINT NOT NULL DEFAULT '10000' COMMENT 'id_test',\n" + + " `id2` BIGINT UNSIGNED ZEROFILL NOT NULL DEFAULT '10000' COMMENT 'id2_comment',\n" + + " `create_time` datetime(3) DEFAULT CURRENT_TIMESTAMP(3),\n" + + " `c1` int DEFAULT '999',\n" + + " `decimal_type` decimal(9,3) DEFAULT '1.000' COMMENT 'decimal_tes',\n" + + " `aaa` varchar(100) DEFAULT NULL,\n" + + " `decimal_type3` decimal(38,9) DEFAULT '1.123456789' COMMENT 'comment_test',\n" + + " `create_time3` datetime(3) DEFAULT CURRENT_TIMESTAMP(3) COMMENT 'ttime_aaa',\n" + + " PRIMARY KEY (`id`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci comment 'test_sinka'"; TableSchema tableSchema = schemaManager.parseCreateTableStatement( SourceConnector.MYSQL, ddl, dorisTable, null); String expected = - "TableSchema{database='doris', table='auto_tab', tableComment='null', fields={`id`=FieldSchema{name='`id`', typeString='INT', defaultValue='10000', comment='id_test'}, `create_time`=FieldSchema{name='`create_time`', typeString='DATETIMEV2(3)', defaultValue='CURRENT_TIMESTAMP', comment='null'}, `c1`=FieldSchema{name='`c1`', typeString='INT', defaultValue='999', comment='null'}, `decimal_type`=FieldSchema{name='`decimal_type`', typeString='DECIMALV3(9,3)', defaultValue='1.0 [...] + "TableSchema{database='doris', table='auto_tab', tableComment='test_sinka', fields={`id`=FieldSchema{name='`id`', typeString='BIGINT', defaultValue='10000', comment='id_test'}, `id2`=FieldSchema{name='`id2`', typeString='LARGEINT', defaultValue='10000', comment='id2_comment'}, `create_time`=FieldSchema{name='`create_time`', typeString='DATETIMEV2(3)', defaultValue='CURRENT_TIMESTAMP', comment='null'}, `c1`=FieldSchema{name='`c1`', typeString='INT', defaultValue='999', com [...] + System.out.println(tableSchema.toString()); Assert.assertEquals(expected, tableSchema.toString()); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org