This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a9e7bc  [fix](cdc) fix datetime precision and regular matching format 
errors after turning on single-sink (#298)
1a9e7bc is described below

commit 1a9e7bc57ec7723025cad13476d1c569d7527d60
Author: Petrichor <31833513+vinle...@users.noreply.github.com>
AuthorDate: Wed Jan 24 17:06:14 2024 +0800

    [fix](cdc) fix datetime precision and regular matching format errors after 
turning on single-sink (#298)
---
 .../doris/flink/catalog/DorisTypeMapper.java       |  2 +
 .../apache/doris/flink/tools/cdc/DatabaseSync.java |  7 +++-
 .../doris/flink/tools/cdc/mysql/MysqlType.java     | 40 +++++++++++++++++-
 .../TestJsonDebeziumSchemaChangeImplV2.java        | 48 ++++++++++++++++++++++
 4 files changed, 93 insertions(+), 4 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
index cc5fe4b..e125a30 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
@@ -70,6 +70,8 @@ public class DorisTypeMapper {
 
     /** Max size of varchar type of Doris. */
     public static final int MAX_VARCHAR_SIZE = 65533;
+    /* Max precision of datetime type of Doris. */
+    public static final int MAX_SUPPORTED_DATE_TIME_PRECISION = 6;
 
     public static DataType toFlinkType(
             String columnName, String columnType, int precision, int scale) {
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index e153039..e71ed51 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -80,7 +80,7 @@ public abstract class DatabaseSync {
     protected String tablePrefix;
     protected String tableSuffix;
     protected boolean singleSink;
-    private Map<String, String> tableMapping = new HashMap<>();
+    private final Map<String, String> tableMapping = new HashMap<>();
 
     public abstract void registerDriver() throws SQLException;
 
@@ -93,7 +93,7 @@ public abstract class DatabaseSync {
     /** Get the prefix of a specific tableList, for example, mysql is 
database, oracle is schema. */
     public abstract String getTableListPrefix();
 
-    public DatabaseSync() throws SQLException {
+    protected DatabaseSync() throws SQLException {
         registerDriver();
     }
 
@@ -315,6 +315,9 @@ public abstract class DatabaseSync {
                     .collect(Collectors.joining("|"));
         } else {
             // includingTablePattern and ^excludingPattern
+            if (includingTables == null) {
+                includingTables = ".*";
+            }
             String includingPattern =
                     String.format("(%s)\\.(%s)", getTableListPrefix(), 
includingTables);
             if (StringUtils.isNullOrWhitespaceOnly(excludingTables)) {
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java
index 704b8b3..60a5eda 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java
@@ -17,11 +17,20 @@
 
 package org.apache.doris.flink.tools.cdc.mysql;
 
+import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.doris.flink.catalog.doris.DorisType;
 
+import static 
org.apache.doris.flink.catalog.DorisTypeMapper.MAX_SUPPORTED_DATE_TIME_PRECISION;
+
 public class MysqlType {
+
+    // MySQL driver returns width of timestamp types instead of precision.
+    // 19 characters are used for zero-precision timestamps while others
+    // require 19 + precision + 1 characters with the additional character
+    // required for the decimal separator.
+    private static final int ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE = 19;
     private static final String BIT = "BIT";
     private static final String BOOLEAN = "BOOLEAN";
     private static final String BOOL = "BOOL";
@@ -145,8 +154,35 @@ public class MysqlType {
                 return DorisType.DATE_V2;
             case DATETIME:
             case TIMESTAMP:
-                int dtScale = length > 19 ? length - 20 : 0;
-                return String.format("%s(%s)", DorisType.DATETIME_V2, 
Math.min(dtScale, 6));
+                // default precision is 0
+                // see 
https://dev.mysql.com/doc/refman/8.0/en/date-and-time-type-syntax.html
+                if (length == null
+                        || length <= 0
+                        || length == ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE) {
+                    return String.format("%s(%s)", DorisType.DATETIME_V2, 0);
+                } else if (length > ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE + 1) {
+                    // Timestamp with a fraction of seconds.
+                    // For example, 2024-01-01 01:01:01.1
+                    // The decimal point will occupy 1 character.
+                    // Thus,the length of the timestamp is 21.
+                    return String.format(
+                            "%s(%s)",
+                            DorisType.DATETIME_V2,
+                            Math.min(
+                                    length - 
ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE - 1,
+                                    MAX_SUPPORTED_DATE_TIME_PRECISION));
+                } else if (length <= TimestampType.MAX_PRECISION) {
+                    // For Debezium JSON data, the timestamp/datetime length 
ranges from 0 to 9.
+                    return String.format(
+                            "%s(%s)",
+                            DorisType.DATETIME_V2,
+                            Math.min(length, 
MAX_SUPPORTED_DATE_TIME_PRECISION));
+                } else {
+                    throw new UnsupportedOperationException(
+                            "Unsupported length: "
+                                    + length
+                                    + " for MySQL TIMESTAMP/DATETIME types");
+                }
             case CHAR:
             case VARCHAR:
                 Preconditions.checkNotNull(length);
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
index 11df3e0..8aca521 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.flink.sink.writer.serializer.jsondebezium;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.collect.Maps;
 import org.apache.commons.collections.CollectionUtils;
@@ -267,4 +268,51 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends 
TestJsonDebeziumChangeBa
         Assert.assertEquals("age4", 
tableSchema.getFields().get("age4").getName());
         schemaChange.setSourceConnector(SourceConnector.MYSQL.connectorName);
     }
+
+    @Test
+    public void testDateTimeFullOrigin() throws JsonProcessingException {
+        Map<String, FieldSchema> srcFiledSchemaMap = new LinkedHashMap<>();
+        srcFiledSchemaMap.put("id", new FieldSchema("id", "INT", null, null));
+        srcFiledSchemaMap.put(
+                "test_dt_0", new FieldSchema("test_dt_0", "DATETIMEV2(0)", 
null, null));
+        srcFiledSchemaMap.put(
+                "test_dt_1", new FieldSchema("test_dt_1", "DATETIMEV2(1)", 
null, null));
+        srcFiledSchemaMap.put(
+                "test_dt_3", new FieldSchema("test_dt_3", "DATETIMEV2(3)", 
null, null));
+        srcFiledSchemaMap.put(
+                "test_dt_6", new FieldSchema("test_dt_6", "DATETIMEV2(6)", 
null, null));
+        srcFiledSchemaMap.put(
+                "test_ts_0", new FieldSchema("test_ts_0", "DATETIMEV2(0)", 
null, null));
+        srcFiledSchemaMap.put(
+                "test_ts_1",
+                new FieldSchema("test_ts_1", "DATETIMEV2(1)", 
"current_timestamp", null));
+        srcFiledSchemaMap.put(
+                "test_ts_3",
+                new FieldSchema("test_ts_3", "DATETIMEV2(3)", 
"current_timestamp", null));
+        srcFiledSchemaMap.put(
+                "test_ts_6",
+                new FieldSchema("test_ts_6", "DATETIMEV2(6)", 
"current_timestamp", null));
+
+        schemaChange.setSourceConnector("mysql");
+        String columnsString =
+                
"[{\"name\":\"id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":false,\"enumValues\":[]},{\"name\":\"test_dt_0\",\"jdbcType\":93,\"typeName\":\"DATETIME\",\"typeExpression\":\"DATETIME\",\"charsetName\":null,\"position\":2,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValu
 [...]
+        JsonNode columns = objectMapper.readTree(columnsString);
+        schemaChange.fillOriginSchema(columns);
+        Map<String, FieldSchema> originFieldSchemaMap = 
schemaChange.getOriginFieldSchemaMap();
+
+        Iterator<Entry<String, FieldSchema>> originFieldSchemaIterator =
+                originFieldSchemaMap.entrySet().iterator();
+        for (Entry<String, FieldSchema> entry : srcFiledSchemaMap.entrySet()) {
+            FieldSchema srcFiledSchema = entry.getValue();
+            Entry<String, FieldSchema> originField = 
originFieldSchemaIterator.next();
+
+            Assert.assertEquals(entry.getKey(), originField.getKey());
+            Assert.assertEquals(srcFiledSchema.getName(), 
originField.getValue().getName());
+            Assert.assertEquals(
+                    srcFiledSchema.getTypeString(), 
originField.getValue().getTypeString());
+            Assert.assertEquals(
+                    srcFiledSchema.getDefaultValue(), 
originField.getValue().getDefaultValue());
+            Assert.assertEquals(srcFiledSchema.getComment(), 
originField.getValue().getComment());
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to