This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 1a9e7bc [fix](cdc) fix datetime precision and regular matching format errors after turning on single-sink (#298) 1a9e7bc is described below commit 1a9e7bc57ec7723025cad13476d1c569d7527d60 Author: Petrichor <31833513+vinle...@users.noreply.github.com> AuthorDate: Wed Jan 24 17:06:14 2024 +0800 [fix](cdc) fix datetime precision and regular matching format errors after turning on single-sink (#298) --- .../doris/flink/catalog/DorisTypeMapper.java | 2 + .../apache/doris/flink/tools/cdc/DatabaseSync.java | 7 +++- .../doris/flink/tools/cdc/mysql/MysqlType.java | 40 +++++++++++++++++- .../TestJsonDebeziumSchemaChangeImplV2.java | 48 ++++++++++++++++++++++ 4 files changed, 93 insertions(+), 4 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java index cc5fe4b..e125a30 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java @@ -70,6 +70,8 @@ public class DorisTypeMapper { /** Max size of varchar type of Doris. */ public static final int MAX_VARCHAR_SIZE = 65533; + /* Max precision of datetime type of Doris. */ + public static final int MAX_SUPPORTED_DATE_TIME_PRECISION = 6; public static DataType toFlinkType( String columnName, String columnType, int precision, int scale) { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index e153039..e71ed51 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -80,7 +80,7 @@ public abstract class DatabaseSync { protected String tablePrefix; protected String tableSuffix; protected boolean singleSink; - private Map<String, String> tableMapping = new HashMap<>(); + private final Map<String, String> tableMapping = new HashMap<>(); public abstract void registerDriver() throws SQLException; @@ -93,7 +93,7 @@ public abstract class DatabaseSync { /** Get the prefix of a specific tableList, for example, mysql is database, oracle is schema. */ public abstract String getTableListPrefix(); - public DatabaseSync() throws SQLException { + protected DatabaseSync() throws SQLException { registerDriver(); } @@ -315,6 +315,9 @@ public abstract class DatabaseSync { .collect(Collectors.joining("|")); } else { // includingTablePattern and ^excludingPattern + if (includingTables == null) { + includingTables = ".*"; + } String includingPattern = String.format("(%s)\\.(%s)", getTableListPrefix(), includingTables); if (StringUtils.isNullOrWhitespaceOnly(excludingTables)) { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java index 704b8b3..60a5eda 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlType.java @@ -17,11 +17,20 @@ package org.apache.doris.flink.tools.cdc.mysql; +import org.apache.flink.table.types.logical.TimestampType; import org.apache.flink.util.Preconditions; import org.apache.doris.flink.catalog.doris.DorisType; +import static org.apache.doris.flink.catalog.DorisTypeMapper.MAX_SUPPORTED_DATE_TIME_PRECISION; + public class MysqlType { + + // MySQL driver returns width of timestamp types instead of precision. + // 19 characters are used for zero-precision timestamps while others + // require 19 + precision + 1 characters with the additional character + // required for the decimal separator. + private static final int ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE = 19; private static final String BIT = "BIT"; private static final String BOOLEAN = "BOOLEAN"; private static final String BOOL = "BOOL"; @@ -145,8 +154,35 @@ public class MysqlType { return DorisType.DATE_V2; case DATETIME: case TIMESTAMP: - int dtScale = length > 19 ? length - 20 : 0; - return String.format("%s(%s)", DorisType.DATETIME_V2, Math.min(dtScale, 6)); + // default precision is 0 + // see https://dev.mysql.com/doc/refman/8.0/en/date-and-time-type-syntax.html + if (length == null + || length <= 0 + || length == ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE) { + return String.format("%s(%s)", DorisType.DATETIME_V2, 0); + } else if (length > ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE + 1) { + // Timestamp with a fraction of seconds. + // For example, 2024-01-01 01:01:01.1 + // The decimal point will occupy 1 character. + // Thus,the length of the timestamp is 21. + return String.format( + "%s(%s)", + DorisType.DATETIME_V2, + Math.min( + length - ZERO_PRECISION_TIMESTAMP_COLUMN_SIZE - 1, + MAX_SUPPORTED_DATE_TIME_PRECISION)); + } else if (length <= TimestampType.MAX_PRECISION) { + // For Debezium JSON data, the timestamp/datetime length ranges from 0 to 9. + return String.format( + "%s(%s)", + DorisType.DATETIME_V2, + Math.min(length, MAX_SUPPORTED_DATE_TIME_PRECISION)); + } else { + throw new UnsupportedOperationException( + "Unsupported length: " + + length + + " for MySQL TIMESTAMP/DATETIME types"); + } case CHAR: case VARCHAR: Preconditions.checkNotNull(length); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java index 11df3e0..8aca521 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/TestJsonDebeziumSchemaChangeImplV2.java @@ -17,6 +17,7 @@ package org.apache.doris.flink.sink.writer.serializer.jsondebezium; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.Maps; import org.apache.commons.collections.CollectionUtils; @@ -267,4 +268,51 @@ public class TestJsonDebeziumSchemaChangeImplV2 extends TestJsonDebeziumChangeBa Assert.assertEquals("age4", tableSchema.getFields().get("age4").getName()); schemaChange.setSourceConnector(SourceConnector.MYSQL.connectorName); } + + @Test + public void testDateTimeFullOrigin() throws JsonProcessingException { + Map<String, FieldSchema> srcFiledSchemaMap = new LinkedHashMap<>(); + srcFiledSchemaMap.put("id", new FieldSchema("id", "INT", null, null)); + srcFiledSchemaMap.put( + "test_dt_0", new FieldSchema("test_dt_0", "DATETIMEV2(0)", null, null)); + srcFiledSchemaMap.put( + "test_dt_1", new FieldSchema("test_dt_1", "DATETIMEV2(1)", null, null)); + srcFiledSchemaMap.put( + "test_dt_3", new FieldSchema("test_dt_3", "DATETIMEV2(3)", null, null)); + srcFiledSchemaMap.put( + "test_dt_6", new FieldSchema("test_dt_6", "DATETIMEV2(6)", null, null)); + srcFiledSchemaMap.put( + "test_ts_0", new FieldSchema("test_ts_0", "DATETIMEV2(0)", null, null)); + srcFiledSchemaMap.put( + "test_ts_1", + new FieldSchema("test_ts_1", "DATETIMEV2(1)", "current_timestamp", null)); + srcFiledSchemaMap.put( + "test_ts_3", + new FieldSchema("test_ts_3", "DATETIMEV2(3)", "current_timestamp", null)); + srcFiledSchemaMap.put( + "test_ts_6", + new FieldSchema("test_ts_6", "DATETIMEV2(6)", "current_timestamp", null)); + + schemaChange.setSourceConnector("mysql"); + String columnsString = + "[{\"name\":\"id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":false,\"enumValues\":[]},{\"name\":\"test_dt_0\",\"jdbcType\":93,\"typeName\":\"DATETIME\",\"typeExpression\":\"DATETIME\",\"charsetName\":null,\"position\":2,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValu [...] + JsonNode columns = objectMapper.readTree(columnsString); + schemaChange.fillOriginSchema(columns); + Map<String, FieldSchema> originFieldSchemaMap = schemaChange.getOriginFieldSchemaMap(); + + Iterator<Entry<String, FieldSchema>> originFieldSchemaIterator = + originFieldSchemaMap.entrySet().iterator(); + for (Entry<String, FieldSchema> entry : srcFiledSchemaMap.entrySet()) { + FieldSchema srcFiledSchema = entry.getValue(); + Entry<String, FieldSchema> originField = originFieldSchemaIterator.next(); + + Assert.assertEquals(entry.getKey(), originField.getKey()); + Assert.assertEquals(srcFiledSchema.getName(), originField.getValue().getName()); + Assert.assertEquals( + srcFiledSchema.getTypeString(), originField.getValue().getTypeString()); + Assert.assertEquals( + srcFiledSchema.getDefaultValue(), originField.getValue().getDefaultValue()); + Assert.assertEquals(srcFiledSchema.getComment(), originField.getValue().getComment()); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org