This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new d840ccc [fix](lookup)enhance compatibility for lookup join with the java.sql.Timestamp (#341) d840ccc is described below commit d840cccb12d2828765998760a22ab92c53122440 Author: Petrichor <31833513+vinle...@users.noreply.github.com> AuthorDate: Wed Mar 13 14:27:34 2024 +0800 [fix](lookup)enhance compatibility for lookup join with the java.sql.Timestamp (#341) --- .../converter/DorisRowConverter.java | 5 +++- .../convert/DorisRowConverterTest.java | 28 ++++++++++++++++------ 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java index afda237..130d236 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java @@ -47,6 +47,7 @@ import org.apache.doris.flink.serialization.RowBatch; import java.io.Serializable; import java.math.BigDecimal; import java.sql.Date; +import java.sql.Timestamp; import java.time.LocalDate; import java.time.LocalDateTime; import java.util.Arrays; @@ -195,9 +196,11 @@ public class DorisRowConverter implements Serializable { return val -> { if (val instanceof LocalDateTime) { return TimestampData.fromLocalDateTime((LocalDateTime) val); + } else if (val instanceof Timestamp) { + return TimestampData.fromTimestamp((Timestamp) val); } else { throw new UnsupportedOperationException( - "timestamp type must be java.time.LocalDateTime, the actual type is: " + "timestamp type must be java.time.LocalDateTime or java.sql.Timestamp, the actual type is: " + val.getClass().getName()); } }; diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java index b63d033..c9016c5 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java @@ -37,6 +37,7 @@ import org.junit.Test; import java.io.IOException; import java.io.Serializable; import java.math.BigDecimal; +import java.sql.Timestamp; import java.time.LocalDate; import java.time.LocalDateTime; import java.util.ArrayList; @@ -65,7 +66,9 @@ public class DorisRowConverterTest implements Serializable { Column.physical("f13", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()), Column.physical("f14", DataTypes.DATE()), Column.physical("f15", DataTypes.CHAR(1)), - Column.physical("f16", DataTypes.VARCHAR(256))); + Column.physical("f16", DataTypes.VARCHAR(256)), + Column.physical("f17", DataTypes.TIMESTAMP_WITH_TIME_ZONE()), + Column.physical("f18", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())); DorisRowConverter converter = new DorisRowConverter((RowType) schema.toPhysicalRowDataType().getLogicalType()); @@ -73,6 +76,8 @@ public class DorisRowConverterTest implements Serializable { LocalDateTime time1 = LocalDateTime.of(2021, 1, 1, 8, 1, 1, 1000); LocalDateTime time2 = LocalDateTime.of(2021, 1, 1, 8, 1, 1, 1000); LocalDate date1 = LocalDate.of(2021, 1, 1); + Timestamp timestamp1 = Timestamp.valueOf(time1); + Timestamp timestamp2 = Timestamp.valueOf(time2); List<Object> record = Arrays.asList( null, @@ -90,7 +95,9 @@ public class DorisRowConverterTest implements Serializable { time2, date1, "a", - "doris"); + "doris", + timestamp1, + timestamp2); GenericRowData rowData = converter.convertInternal(record); RowDataSerializer serializer = @@ -101,12 +108,12 @@ public class DorisRowConverterTest implements Serializable { .setFieldNames( new String[] { "f1", "f2", "f3", "f4", "f5", "f6", "f7", "f8", "f9", "f10", - "f11", "f12", "f13", "f14", "f15", "f16" + "f11", "f12", "f13", "f14", "f15", "f16", "f17", "f18" }) .build(); String s = new String(serializer.serialize(rowData).getRow()); Assert.assertEquals( - "\\N|true|1.2|1.2345|24|10|1|32|64|128|10.12|2021-01-01 08:01:01.000001|2021-01-01 08:01:01.000001|2021-01-01|a|doris", + "\\N|true|1.2|1.2345|24|10|1|32|64|128|10.12|2021-01-01 08:01:01.000001|2021-01-01 08:01:01.000001|2021-01-01|a|doris|2021-01-01 08:01:01.000001|2021-01-01 08:01:01.000001", s); } @@ -129,12 +136,16 @@ public class DorisRowConverterTest implements Serializable { Column.physical("f13", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()), Column.physical("f14", DataTypes.DATE()), Column.physical("f15", DataTypes.CHAR(1)), - Column.physical("f16", DataTypes.VARCHAR(256))); + Column.physical("f16", DataTypes.VARCHAR(256)), + Column.physical("f17", DataTypes.TIMESTAMP_WITH_TIME_ZONE()), + Column.physical("f18", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())); DorisRowConverter converter = new DorisRowConverter((RowType) schema.toPhysicalRowDataType().getLogicalType()); // Doris DatetimeV2 supports up to 6 decimal places (microseconds). LocalDateTime time1 = LocalDateTime.of(2021, 1, 1, 8, 1, 1, 1000); LocalDateTime time2 = LocalDateTime.of(2021, 1, 1, 8, 1, 1, 1000); + Timestamp timestamp1 = Timestamp.valueOf(time1); + Timestamp timestamp2 = Timestamp.valueOf(time2); LocalDate date1 = LocalDate.of(2021, 1, 1); GenericRowData rowData = GenericRowData.of( @@ -153,13 +164,16 @@ public class DorisRowConverterTest implements Serializable { TimestampData.fromLocalDateTime(time2), (int) date1.toEpochDay(), StringData.fromString("a"), - StringData.fromString("doris")); + StringData.fromString("doris"), + TimestampData.fromTimestamp(timestamp1), + TimestampData.fromTimestamp(timestamp2)); List<Object> row = new ArrayList<>(); for (int i = 0; i < rowData.getArity(); i++) { row.add(converter.convertExternal(rowData, i)); } + // System.out.println(row.toString()); Assert.assertEquals( - "[null, true, 1.2, 1.2345, 24, 10, 1, 32, 64, 128, 10.123, 2021-01-01 08:01:01.000001, 2021-01-01 08:01:01.000001, 2021-01-01, a, doris]", + "[null, true, 1.2, 1.2345, 24, 10, 1, 32, 64, 128, 10.123, 2021-01-01 08:01:01.000001, 2021-01-01 08:01:01.000001, 2021-01-01, a, doris, 2021-01-01 08:01:01.000001, 2021-01-01 08:01:01.000001]", row.toString()); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org