This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new d66d55e5 [fix]Fix SQLParser parsing the default value of datetime type 
as the current time (#464)
d66d55e5 is described below

commit d66d55e5bf881ba3462868dd9717f085e4ddd5be
Author: wudongliang <46414265+donglian...@users.noreply.github.com>
AuthorDate: Thu Aug 8 17:26:08 2024 +0800

    [fix]Fix SQLParser parsing the default value of datetime type as the 
current time (#464)
---
 .../flink/sink/schema/SQLParserSchemaManager.java  | 39 +++++++++++++--
 .../sink/schema/SQLParserSchemaManagerTest.java    | 55 ++++++++++++++++++----
 2 files changed, 81 insertions(+), 13 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java
index 9289c886..67a2ddac 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java
@@ -32,6 +32,7 @@ import net.sf.jsqlparser.statement.create.table.CreateTable;
 import net.sf.jsqlparser.statement.create.table.Index;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.doris.flink.catalog.doris.DorisSchemaFactory;
+import org.apache.doris.flink.catalog.doris.DorisType;
 import org.apache.doris.flink.catalog.doris.FieldSchema;
 import org.apache.doris.flink.catalog.doris.TableSchema;
 import 
org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeUtils;
@@ -42,9 +43,14 @@ import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
 
 /** Use {@link CCJSqlParserUtil} to parse SQL statements. */
 public class SQLParserSchemaManager implements Serializable {
@@ -54,6 +60,16 @@ public class SQLParserSchemaManager implements Serializable {
     private static final String PRIMARY = "PRIMARY";
     private static final String PRIMARY_KEY = "PRIMARY KEY";
     private static final String UNIQUE = "UNIQUE";
+    private static final String DORIS_CURRENT_TIMESTAMP = "CURRENT_TIMESTAMP";
+    private static final Set<String> sourceConnectorTimeValues =
+            new HashSet<>(
+                    Arrays.asList(
+                            "SYSDATE",
+                            "SYSTIMESTAMP",
+                            "CURRENT_TIMESTAMP",
+                            "NOW()",
+                            "CURRENT TIMESTAMP",
+                            "GETDATE()"));
 
     /**
      * Doris' schema change only supports ADD, DROP, and RENAME operations. 
This method is only used
@@ -126,7 +142,8 @@ public class SQLParserSchemaManager implements Serializable 
{
                                     ColDataType colDataType = 
column.getColDataType();
                                     String dataType = 
parseDataType(colDataType, sourceConnector);
                                     List<String> columnSpecs = 
column.getColumnSpecs();
-                                    String defaultValue = 
extractDefaultValue(columnSpecs);
+                                    String defaultValue =
+                                            extractDefaultValue(dataType, 
columnSpecs);
                                     String comment = 
extractComment(columnSpecs);
                                     FieldSchema fieldSchema =
                                             new FieldSchema(
@@ -232,7 +249,7 @@ public class SQLParserSchemaManager implements Serializable 
{
             String datatype = parseDataType(colDataType, sourceConnector);
 
             List<String> columnSpecs = columnDataType.getColumnSpecs();
-            String defaultValue = extractDefaultValue(columnSpecs);
+            String defaultValue = extractDefaultValue(datatype, columnSpecs);
             String comment = extractComment(columnSpecs);
             FieldSchema fieldSchema = new FieldSchema(columnName, datatype, 
defaultValue, comment);
             String addColumnDDL = 
SchemaChangeHelper.buildAddColumnDDL(dorisTable, fieldSchema);
@@ -267,11 +284,25 @@ public class SQLParserSchemaManager implements 
Serializable {
     }
 
     @VisibleForTesting
-    public String extractDefaultValue(List<String> columnSpecs) {
+    public String extractDefaultValue(String dateType, List<String> 
columnSpecs) {
         if (CollectionUtils.isEmpty(columnSpecs)) {
             return null;
         }
-        return extractAdjacentString(columnSpecs, DEFAULT);
+        String adjacentDefaultValue = extractAdjacentString(columnSpecs, 
DEFAULT);
+        return parseDorisDefaultValue(dateType, adjacentDefaultValue);
+    }
+
+    private String parseDorisDefaultValue(String dateType, String 
defaultValue) {
+        if (Objects.isNull(defaultValue)) {
+            return null;
+        }
+        // In doris, DATETIME supports specifying the current time by default 
through
+        // CURRENT_TIMESTAMP.
+        if ((dateType.startsWith(DorisType.DATETIME) || 
dateType.startsWith(DorisType.DATETIME_V2))
+                && 
sourceConnectorTimeValues.contains(defaultValue.toUpperCase(Locale.ROOT))) {
+            return DORIS_CURRENT_TIMESTAMP;
+        }
+        return defaultValue;
     }
 
     private String extractAdjacentString(List<String> columnSpecs, String key) 
{
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java
index d8cb7c3f..d65deeb0 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManagerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.flink.sink.schema;
 
+import org.apache.doris.flink.catalog.doris.DorisType;
 import org.apache.doris.flink.catalog.doris.TableSchema;
 import org.apache.doris.flink.tools.cdc.DorisTableConfig;
 import org.apache.doris.flink.tools.cdc.SourceConnector;
@@ -139,7 +140,7 @@ public class SQLParserSchemaManagerTest {
     public void testExtractDefaultValue() {
         String expectDefault = "100";
         List<String> columnSpecs = Arrays.asList("default", "'100'", 
"comment", "");
-        String actualDefault = schemaManager.extractDefaultValue(columnSpecs);
+        String actualDefault = 
schemaManager.extractDefaultValue(DorisType.INT, columnSpecs);
         Assert.assertEquals(expectDefault, actualDefault);
     }
 
@@ -147,14 +148,14 @@ public class SQLParserSchemaManagerTest {
     public void testExtractDefaultValueQuotes() {
         String expectDefault = "100";
         List<String> columnSpecs = Arrays.asList("default", "\"100\"", 
"comment", "");
-        String actualDefault = schemaManager.extractDefaultValue(columnSpecs);
+        String actualDefault = 
schemaManager.extractDefaultValue(DorisType.BIGINT, columnSpecs);
         Assert.assertEquals(expectDefault, actualDefault);
     }
 
     @Test
     public void testExtractDefaultValueNull() {
         List<String> columnSpecs = Arrays.asList("Default", null, "comment", 
null);
-        String actualDefault = schemaManager.extractDefaultValue(columnSpecs);
+        String actualDefault = 
schemaManager.extractDefaultValue(DorisType.STRING, columnSpecs);
         Assert.assertNull(actualDefault);
     }
 
@@ -162,7 +163,7 @@ public class SQLParserSchemaManagerTest {
     public void testExtractDefaultValueEmpty() {
         String expectDefault = null;
         List<String> columnSpecs = Arrays.asList("DEFAULT", "comment", null);
-        String actualDefault = schemaManager.extractDefaultValue(columnSpecs);
+        String actualDefault = 
schemaManager.extractDefaultValue(DorisType.STRING, columnSpecs);
         Assert.assertEquals(expectDefault, actualDefault);
     }
 
@@ -170,17 +171,53 @@ public class SQLParserSchemaManagerTest {
     public void testExtractDefaultValueA() {
         String expectDefault = "aaa";
         List<String> columnSpecs = Arrays.asList("default", "aaa");
-        String actualDefault = schemaManager.extractDefaultValue(columnSpecs);
+        String actualDefault = 
schemaManager.extractDefaultValue(DorisType.STRING, columnSpecs);
         Assert.assertEquals(expectDefault, actualDefault);
     }
 
     @Test
     public void testExtractDefaultValueNULL() {
         List<String> columnSpecs = Collections.singletonList("default");
-        String actualDefault = schemaManager.extractDefaultValue(columnSpecs);
+        String actualDefault = 
schemaManager.extractDefaultValue(DorisType.STRING, columnSpecs);
         Assert.assertNull(actualDefault);
     }
 
+    @Test
+    public void testExtractDefaultValueDateTime() {
+        List<String> columnSpecs = Arrays.asList("default", "SYSTIMESTAMP");
+        String actualDefault = 
schemaManager.extractDefaultValue(DorisType.DATETIME, columnSpecs);
+        Assert.assertEquals("CURRENT_TIMESTAMP", actualDefault);
+    }
+
+    @Test
+    public void testExtractDefaultValueDateTimeV2() {
+        List<String> columnSpecs = Arrays.asList("default", "GETDATE()");
+        String actualDefault =
+                schemaManager.extractDefaultValue(DorisType.DATETIME_V2, 
columnSpecs);
+        Assert.assertEquals("CURRENT_TIMESTAMP", actualDefault);
+    }
+
+    @Test
+    public void testExtractDefaultValueDateTimeV2Time() {
+        List<String> columnSpecs = Arrays.asList("default", "2024-03-14 
17:50:36.002");
+        String actualDefault = 
schemaManager.extractDefaultValue("DATETIMEV2(3)", columnSpecs);
+        Assert.assertEquals("2024-03-14 17:50:36.002", actualDefault);
+    }
+
+    @Test
+    public void testExtractDefaultValueDateTimeV2CurrentTime() {
+        List<String> columnSpecs = Arrays.asList("default", "now()");
+        String actualDefault = 
schemaManager.extractDefaultValue("DATETIMEV2(3)", columnSpecs);
+        Assert.assertEquals("CURRENT_TIMESTAMP", actualDefault);
+    }
+
+    @Test
+    public void testExtractDefaultValueDate() {
+        List<String> columnSpecs = Arrays.asList("default", "2024-03-14 
17:50:36");
+        String actualDefault = 
schemaManager.extractDefaultValue(DorisType.DATE, columnSpecs);
+        Assert.assertEquals("2024-03-14 17:50:36", actualDefault);
+    }
+
     @Test
     public void testRemoveContinuousChar() {
         // Test removing continuous target characters from both ends
@@ -288,7 +325,7 @@ public class SQLParserSchemaManagerTest {
                         SourceConnector.ORACLE, ddl, dorisTable, null);
 
         String expected =
-                "TableSchema{database='doris', table='auto_tab', 
tableComment='null', fields={employee_id=FieldSchema{name='employee_id', 
typeString='BIGINT', defaultValue='null', comment='null'}, 
first_name=FieldSchema{name='first_name', typeString='VARCHAR(150)', 
defaultValue='null', comment='null'}, last_name=FieldSchema{name='last_name', 
typeString='VARCHAR(150)', defaultValue='null', comment='null'}, 
email=FieldSchema{name='email', typeString='VARCHAR(300)', defaultValue='null', 
com [...]
+                "TableSchema{database='doris', table='auto_tab', 
tableComment='null', fields={employee_id=FieldSchema{name='employee_id', 
typeString='BIGINT', defaultValue='null', comment='null'}, 
first_name=FieldSchema{name='first_name', typeString='VARCHAR(150)', 
defaultValue='null', comment='null'}, last_name=FieldSchema{name='last_name', 
typeString='VARCHAR(150)', defaultValue='null', comment='null'}, 
email=FieldSchema{name='email', typeString='VARCHAR(300)', defaultValue='null', 
com [...]
         Assert.assertEquals(expected, tableSchema.toString());
     }
 
@@ -314,7 +351,7 @@ public class SQLParserSchemaManagerTest {
                         SourceConnector.ORACLE, ddl, dorisTable, null);
 
         String expected =
-                "TableSchema{database='doris', table='auto_tab', 
tableComment='null', fields={employee_id=FieldSchema{name='employee_id', 
typeString='BIGINT', defaultValue='null', comment='null'}, 
first_name=FieldSchema{name='first_name', typeString='VARCHAR(150)', 
defaultValue='null', comment='null'}, last_name=FieldSchema{name='last_name', 
typeString='VARCHAR(150)', defaultValue='null', comment='null'}, 
email=FieldSchema{name='email', typeString='VARCHAR(300)', defaultValue='null', 
com [...]
+                "TableSchema{database='doris', table='auto_tab', 
tableComment='null', fields={employee_id=FieldSchema{name='employee_id', 
typeString='BIGINT', defaultValue='null', comment='null'}, 
first_name=FieldSchema{name='first_name', typeString='VARCHAR(150)', 
defaultValue='null', comment='null'}, last_name=FieldSchema{name='last_name', 
typeString='VARCHAR(150)', defaultValue='null', comment='null'}, 
email=FieldSchema{name='email', typeString='VARCHAR(300)', defaultValue='null', 
com [...]
         Assert.assertEquals(expected, tableSchema.toString());
     }
 
@@ -341,7 +378,7 @@ public class SQLParserSchemaManagerTest {
                         dorisTable,
                         new DorisTableConfig(new HashMap<>()));
         String expected =
-                "TableSchema{database='doris', table='auto_tab', 
tableComment='null', fields={order_id=FieldSchema{name='order_id', 
typeString='BIGINT', defaultValue='null', comment='null'}, 
customer_id=FieldSchema{name='customer_id', typeString='BIGINT', 
defaultValue='null', comment='null'}, order_date=FieldSchema{name='order_date', 
typeString='DATETIMEV2', defaultValue='SYSDATE', comment='null'}, 
status=FieldSchema{name='status', typeString='VARCHAR(60)', 
defaultValue='null', comment=' [...]
+                "TableSchema{database='doris', table='auto_tab', 
tableComment='null', fields={order_id=FieldSchema{name='order_id', 
typeString='BIGINT', defaultValue='null', comment='null'}, 
customer_id=FieldSchema{name='customer_id', typeString='BIGINT', 
defaultValue='null', comment='null'}, order_date=FieldSchema{name='order_date', 
typeString='DATETIMEV2', defaultValue='CURRENT_TIMESTAMP', comment='null'}, 
status=FieldSchema{name='status', typeString='VARCHAR(60)', 
defaultValue='null', [...]
         Assert.assertEquals(expected, tableSchema.toString());
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to