This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new d840ccc  [fix](lookup)enhance compatibility for lookup join with the 
java.sql.Timestamp  (#341)
d840ccc is described below

commit d840cccb12d2828765998760a22ab92c53122440
Author: Petrichor <31833513+vinle...@users.noreply.github.com>
AuthorDate: Wed Mar 13 14:27:34 2024 +0800

    [fix](lookup)enhance compatibility for lookup join with the 
java.sql.Timestamp  (#341)
---
 .../converter/DorisRowConverter.java               |  5 +++-
 .../convert/DorisRowConverterTest.java             | 28 ++++++++++++++++------
 2 files changed, 25 insertions(+), 8 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
index afda237..130d236 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
@@ -47,6 +47,7 @@ import org.apache.doris.flink.serialization.RowBatch;
 import java.io.Serializable;
 import java.math.BigDecimal;
 import java.sql.Date;
+import java.sql.Timestamp;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.util.Arrays;
@@ -195,9 +196,11 @@ public class DorisRowConverter implements Serializable {
                 return val -> {
                     if (val instanceof LocalDateTime) {
                         return TimestampData.fromLocalDateTime((LocalDateTime) 
val);
+                    } else if (val instanceof Timestamp) {
+                        return TimestampData.fromTimestamp((Timestamp) val);
                     } else {
                         throw new UnsupportedOperationException(
-                                "timestamp type must be 
java.time.LocalDateTime, the actual type is: "
+                                "timestamp type must be 
java.time.LocalDateTime or java.sql.Timestamp, the actual type is: "
                                         + val.getClass().getName());
                     }
                 };
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java
index b63d033..c9016c5 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java
@@ -37,6 +37,7 @@ import org.junit.Test;
 import java.io.IOException;
 import java.io.Serializable;
 import java.math.BigDecimal;
+import java.sql.Timestamp;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
@@ -65,7 +66,9 @@ public class DorisRowConverterTest implements Serializable {
                         Column.physical("f13", 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()),
                         Column.physical("f14", DataTypes.DATE()),
                         Column.physical("f15", DataTypes.CHAR(1)),
-                        Column.physical("f16", DataTypes.VARCHAR(256)));
+                        Column.physical("f16", DataTypes.VARCHAR(256)),
+                        Column.physical("f17", 
DataTypes.TIMESTAMP_WITH_TIME_ZONE()),
+                        Column.physical("f18", 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()));
 
         DorisRowConverter converter =
                 new DorisRowConverter((RowType) 
schema.toPhysicalRowDataType().getLogicalType());
@@ -73,6 +76,8 @@ public class DorisRowConverterTest implements Serializable {
         LocalDateTime time1 = LocalDateTime.of(2021, 1, 1, 8, 1, 1, 1000);
         LocalDateTime time2 = LocalDateTime.of(2021, 1, 1, 8, 1, 1, 1000);
         LocalDate date1 = LocalDate.of(2021, 1, 1);
+        Timestamp timestamp1 = Timestamp.valueOf(time1);
+        Timestamp timestamp2 = Timestamp.valueOf(time2);
         List<Object> record =
                 Arrays.asList(
                         null,
@@ -90,7 +95,9 @@ public class DorisRowConverterTest implements Serializable {
                         time2,
                         date1,
                         "a",
-                        "doris");
+                        "doris",
+                        timestamp1,
+                        timestamp2);
         GenericRowData rowData = converter.convertInternal(record);
 
         RowDataSerializer serializer =
@@ -101,12 +108,12 @@ public class DorisRowConverterTest implements 
Serializable {
                         .setFieldNames(
                                 new String[] {
                                     "f1", "f2", "f3", "f4", "f5", "f6", "f7", 
"f8", "f9", "f10",
-                                    "f11", "f12", "f13", "f14", "f15", "f16"
+                                    "f11", "f12", "f13", "f14", "f15", "f16", 
"f17", "f18"
                                 })
                         .build();
         String s = new String(serializer.serialize(rowData).getRow());
         Assert.assertEquals(
-                "\\N|true|1.2|1.2345|24|10|1|32|64|128|10.12|2021-01-01 
08:01:01.000001|2021-01-01 08:01:01.000001|2021-01-01|a|doris",
+                "\\N|true|1.2|1.2345|24|10|1|32|64|128|10.12|2021-01-01 
08:01:01.000001|2021-01-01 08:01:01.000001|2021-01-01|a|doris|2021-01-01 
08:01:01.000001|2021-01-01 08:01:01.000001",
                 s);
     }
 
@@ -129,12 +136,16 @@ public class DorisRowConverterTest implements 
Serializable {
                         Column.physical("f13", 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()),
                         Column.physical("f14", DataTypes.DATE()),
                         Column.physical("f15", DataTypes.CHAR(1)),
-                        Column.physical("f16", DataTypes.VARCHAR(256)));
+                        Column.physical("f16", DataTypes.VARCHAR(256)),
+                        Column.physical("f17", 
DataTypes.TIMESTAMP_WITH_TIME_ZONE()),
+                        Column.physical("f18", 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()));
         DorisRowConverter converter =
                 new DorisRowConverter((RowType) 
schema.toPhysicalRowDataType().getLogicalType());
         // Doris DatetimeV2 supports up to 6 decimal places (microseconds).
         LocalDateTime time1 = LocalDateTime.of(2021, 1, 1, 8, 1, 1, 1000);
         LocalDateTime time2 = LocalDateTime.of(2021, 1, 1, 8, 1, 1, 1000);
+        Timestamp timestamp1 = Timestamp.valueOf(time1);
+        Timestamp timestamp2 = Timestamp.valueOf(time2);
         LocalDate date1 = LocalDate.of(2021, 1, 1);
         GenericRowData rowData =
                 GenericRowData.of(
@@ -153,13 +164,16 @@ public class DorisRowConverterTest implements 
Serializable {
                         TimestampData.fromLocalDateTime(time2),
                         (int) date1.toEpochDay(),
                         StringData.fromString("a"),
-                        StringData.fromString("doris"));
+                        StringData.fromString("doris"),
+                        TimestampData.fromTimestamp(timestamp1),
+                        TimestampData.fromTimestamp(timestamp2));
         List<Object> row = new ArrayList<>();
         for (int i = 0; i < rowData.getArity(); i++) {
             row.add(converter.convertExternal(rowData, i));
         }
+        //        System.out.println(row.toString());
         Assert.assertEquals(
-                "[null, true, 1.2, 1.2345, 24, 10, 1, 32, 64, 128, 10.123, 
2021-01-01 08:01:01.000001, 2021-01-01 08:01:01.000001, 2021-01-01, a, doris]",
+                "[null, true, 1.2, 1.2345, 24, 10, 1, 32, 64, 128, 10.123, 
2021-01-01 08:01:01.000001, 2021-01-01 08:01:01.000001, 2021-01-01, a, doris, 
2021-01-01 08:01:01.000001, 2021-01-01 08:01:01.000001]",
                 row.toString());
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to