This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new d66d55e5 [fix]Fix SQLParser parsing the default value of datetime type as the current time (#464) d66d55e5 is described below commit d66d55e5bf881ba3462868dd9717f085e4ddd5be Author: wudongliang <46414265+donglian...@users.noreply.github.com> AuthorDate: Thu Aug 8 17:26:08 2024 +0800 [fix]Fix SQLParser parsing the default value of datetime type as the current time (#464) --- .../flink/sink/schema/SQLParserSchemaManager.java | 39 +++++++++++++-- .../sink/schema/SQLParserSchemaManagerTest.java | 55 ++++++++++++++++++---- 2 files changed, 81 insertions(+), 13 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java index 9289c886..67a2ddac 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java @@ -32,6 +32,7 @@ import net.sf.jsqlparser.statement.create.table.CreateTable; import net.sf.jsqlparser.statement.create.table.Index; import org.apache.commons.collections.CollectionUtils; import org.apache.doris.flink.catalog.doris.DorisSchemaFactory; +import org.apache.doris.flink.catalog.doris.DorisType; import org.apache.doris.flink.catalog.doris.FieldSchema; import org.apache.doris.flink.catalog.doris.TableSchema; import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeUtils; @@ -42,9 +43,14 @@ import org.slf4j.LoggerFactory; import java.io.Serializable; import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; +import java.util.Locale; import java.util.Map; +import java.util.Objects; +import java.util.Set; /** Use {@link CCJSqlParserUtil} to parse SQL statements. */ public class SQLParserSchemaManager implements Serializable { @@ -54,6 +60,16 @@ public class SQLParserSchemaManager implements Serializable { private static final String PRIMARY = "PRIMARY"; private static final String PRIMARY_KEY = "PRIMARY KEY"; private static final String UNIQUE = "UNIQUE"; + private static final String DORIS_CURRENT_TIMESTAMP = "CURRENT_TIMESTAMP"; + private static final Set<String> sourceConnectorTimeValues = + new HashSet<>( + Arrays.asList( + "SYSDATE", + "SYSTIMESTAMP", + "CURRENT_TIMESTAMP", + "NOW()", + "CURRENT TIMESTAMP", + "GETDATE()")); /** * Doris' schema change only supports ADD, DROP, and RENAME operations. This method is only used @@ -126,7 +142,8 @@ public class SQLParserSchemaManager implements Serializable { ColDataType colDataType = column.getColDataType(); String dataType = parseDataType(colDataType, sourceConnector); List<String> columnSpecs = column.getColumnSpecs(); - String defaultValue = extractDefaultValue(columnSpecs); + String defaultValue = + extractDefaultValue(dataType, columnSpecs); String comment = extractComment(columnSpecs); FieldSchema fieldSchema = new FieldSchema( @@ -232,7 +249,7 @@ public class SQLParserSchemaManager implements Serializable { String datatype = parseDataType(colDataType, sourceConnector); List<String> columnSpecs = columnDataType.getColumnSpecs(); - String defaultValue = extractDefaultValue(columnSpecs); + String defaultValue = extractDefaultValue(datatype, columnSpecs); String comment = extractComment(columnSpecs); FieldSchema fieldSchema = new FieldSchema(columnName, datatype, defaultValue, comment); String addColumnDDL = SchemaChangeHelper.buildAddColumnDDL(dorisTable, fieldSchema); @@ -267,11 +284,25 @@ public class SQLParserSchemaManager implements Serializable { } @VisibleForTesting - public String extractDefaultValue(List<String> columnSpecs) { + public String extractDefaultValue(String dateType, List<String> columnSpecs) { if (CollectionUtils.isEmpty(columnSpecs)) { return null; } - return extractAdjacentString(columnSpecs, DEFAULT); + String adjacentDefaultValue = extractAdjacentString(columnSpecs, DEFAULT); + return parseDorisDefaultValue(dateType, adjacentDefaultValue); + } + + private String parseDorisDefaultValue(String dateType, String defaultValue) { + if (Objects.isNull(defaultValue)) { + return null; + } + // In doris, DATETIME supports specifying the current time by default through + // CURRENT_TIMESTAMP. + if ((dateType.startsWith(DorisType.DATETIME) || dateType.startsWith(DorisType.DATETIME_V2)) + && sourceConnectorTimeValues.contains(defaultValue.toUpperCase(Locale.ROOT))) { + return DORIS_CURRENT_TIMESTAMP; + } + return defaultValue; } private String extractAdjacentString(List<String> columnSpecs, String key) { diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java index d8cb7c3f..d65deeb0 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java @@ -17,6 +17,7 @@ package org.apache.doris.flink.sink.schema; +import org.apache.doris.flink.catalog.doris.DorisType; import org.apache.doris.flink.catalog.doris.TableSchema; import org.apache.doris.flink.tools.cdc.DorisTableConfig; import org.apache.doris.flink.tools.cdc.SourceConnector; @@ -139,7 +140,7 @@ public class SQLParserSchemaManagerTest { public void testExtractDefaultValue() { String expectDefault = "100"; List<String> columnSpecs = Arrays.asList("default", "'100'", "comment", ""); - String actualDefault = schemaManager.extractDefaultValue(columnSpecs); + String actualDefault = schemaManager.extractDefaultValue(DorisType.INT, columnSpecs); Assert.assertEquals(expectDefault, actualDefault); } @@ -147,14 +148,14 @@ public class SQLParserSchemaManagerTest { public void testExtractDefaultValueQuotes() { String expectDefault = "100"; List<String> columnSpecs = Arrays.asList("default", "\"100\"", "comment", ""); - String actualDefault = schemaManager.extractDefaultValue(columnSpecs); + String actualDefault = schemaManager.extractDefaultValue(DorisType.BIGINT, columnSpecs); Assert.assertEquals(expectDefault, actualDefault); } @Test public void testExtractDefaultValueNull() { List<String> columnSpecs = Arrays.asList("Default", null, "comment", null); - String actualDefault = schemaManager.extractDefaultValue(columnSpecs); + String actualDefault = schemaManager.extractDefaultValue(DorisType.STRING, columnSpecs); Assert.assertNull(actualDefault); } @@ -162,7 +163,7 @@ public class SQLParserSchemaManagerTest { public void testExtractDefaultValueEmpty() { String expectDefault = null; List<String> columnSpecs = Arrays.asList("DEFAULT", "comment", null); - String actualDefault = schemaManager.extractDefaultValue(columnSpecs); + String actualDefault = schemaManager.extractDefaultValue(DorisType.STRING, columnSpecs); Assert.assertEquals(expectDefault, actualDefault); } @@ -170,17 +171,53 @@ public class SQLParserSchemaManagerTest { public void testExtractDefaultValueA() { String expectDefault = "aaa"; List<String> columnSpecs = Arrays.asList("default", "aaa"); - String actualDefault = schemaManager.extractDefaultValue(columnSpecs); + String actualDefault = schemaManager.extractDefaultValue(DorisType.STRING, columnSpecs); Assert.assertEquals(expectDefault, actualDefault); } @Test public void testExtractDefaultValueNULL() { List<String> columnSpecs = Collections.singletonList("default"); - String actualDefault = schemaManager.extractDefaultValue(columnSpecs); + String actualDefault = schemaManager.extractDefaultValue(DorisType.STRING, columnSpecs); Assert.assertNull(actualDefault); } + @Test + public void testExtractDefaultValueDateTime() { + List<String> columnSpecs = Arrays.asList("default", "SYSTIMESTAMP"); + String actualDefault = schemaManager.extractDefaultValue(DorisType.DATETIME, columnSpecs); + Assert.assertEquals("CURRENT_TIMESTAMP", actualDefault); + } + + @Test + public void testExtractDefaultValueDateTimeV2() { + List<String> columnSpecs = Arrays.asList("default", "GETDATE()"); + String actualDefault = + schemaManager.extractDefaultValue(DorisType.DATETIME_V2, columnSpecs); + Assert.assertEquals("CURRENT_TIMESTAMP", actualDefault); + } + + @Test + public void testExtractDefaultValueDateTimeV2Time() { + List<String> columnSpecs = Arrays.asList("default", "2024-03-14 17:50:36.002"); + String actualDefault = schemaManager.extractDefaultValue("DATETIMEV2(3)", columnSpecs); + Assert.assertEquals("2024-03-14 17:50:36.002", actualDefault); + } + + @Test + public void testExtractDefaultValueDateTimeV2CurrentTime() { + List<String> columnSpecs = Arrays.asList("default", "now()"); + String actualDefault = schemaManager.extractDefaultValue("DATETIMEV2(3)", columnSpecs); + Assert.assertEquals("CURRENT_TIMESTAMP", actualDefault); + } + + @Test + public void testExtractDefaultValueDate() { + List<String> columnSpecs = Arrays.asList("default", "2024-03-14 17:50:36"); + String actualDefault = schemaManager.extractDefaultValue(DorisType.DATE, columnSpecs); + Assert.assertEquals("2024-03-14 17:50:36", actualDefault); + } + @Test public void testRemoveContinuousChar() { // Test removing continuous target characters from both ends @@ -288,7 +325,7 @@ public class SQLParserSchemaManagerTest { SourceConnector.ORACLE, ddl, dorisTable, null); String expected = - "TableSchema{database='doris', table='auto_tab', tableComment='null', fields={employee_id=FieldSchema{name='employee_id', typeString='BIGINT', defaultValue='null', comment='null'}, first_name=FieldSchema{name='first_name', typeString='VARCHAR(150)', defaultValue='null', comment='null'}, last_name=FieldSchema{name='last_name', typeString='VARCHAR(150)', defaultValue='null', comment='null'}, email=FieldSchema{name='email', typeString='VARCHAR(300)', defaultValue='null', com [...] + "TableSchema{database='doris', table='auto_tab', tableComment='null', fields={employee_id=FieldSchema{name='employee_id', typeString='BIGINT', defaultValue='null', comment='null'}, first_name=FieldSchema{name='first_name', typeString='VARCHAR(150)', defaultValue='null', comment='null'}, last_name=FieldSchema{name='last_name', typeString='VARCHAR(150)', defaultValue='null', comment='null'}, email=FieldSchema{name='email', typeString='VARCHAR(300)', defaultValue='null', com [...] Assert.assertEquals(expected, tableSchema.toString()); } @@ -314,7 +351,7 @@ public class SQLParserSchemaManagerTest { SourceConnector.ORACLE, ddl, dorisTable, null); String expected = - "TableSchema{database='doris', table='auto_tab', tableComment='null', fields={employee_id=FieldSchema{name='employee_id', typeString='BIGINT', defaultValue='null', comment='null'}, first_name=FieldSchema{name='first_name', typeString='VARCHAR(150)', defaultValue='null', comment='null'}, last_name=FieldSchema{name='last_name', typeString='VARCHAR(150)', defaultValue='null', comment='null'}, email=FieldSchema{name='email', typeString='VARCHAR(300)', defaultValue='null', com [...] + "TableSchema{database='doris', table='auto_tab', tableComment='null', fields={employee_id=FieldSchema{name='employee_id', typeString='BIGINT', defaultValue='null', comment='null'}, first_name=FieldSchema{name='first_name', typeString='VARCHAR(150)', defaultValue='null', comment='null'}, last_name=FieldSchema{name='last_name', typeString='VARCHAR(150)', defaultValue='null', comment='null'}, email=FieldSchema{name='email', typeString='VARCHAR(300)', defaultValue='null', com [...] Assert.assertEquals(expected, tableSchema.toString()); } @@ -341,7 +378,7 @@ public class SQLParserSchemaManagerTest { dorisTable, new DorisTableConfig(new HashMap<>())); String expected = - "TableSchema{database='doris', table='auto_tab', tableComment='null', fields={order_id=FieldSchema{name='order_id', typeString='BIGINT', defaultValue='null', comment='null'}, customer_id=FieldSchema{name='customer_id', typeString='BIGINT', defaultValue='null', comment='null'}, order_date=FieldSchema{name='order_date', typeString='DATETIMEV2', defaultValue='SYSDATE', comment='null'}, status=FieldSchema{name='status', typeString='VARCHAR(60)', defaultValue='null', comment=' [...] + "TableSchema{database='doris', table='auto_tab', tableComment='null', fields={order_id=FieldSchema{name='order_id', typeString='BIGINT', defaultValue='null', comment='null'}, customer_id=FieldSchema{name='customer_id', typeString='BIGINT', defaultValue='null', comment='null'}, order_date=FieldSchema{name='order_date', typeString='DATETIMEV2', defaultValue='CURRENT_TIMESTAMP', comment='null'}, status=FieldSchema{name='status', typeString='VARCHAR(60)', defaultValue='null', [...] Assert.assertEquals(expected, tableSchema.toString()); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org