This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new c4eed254 [fix](source)fix timestamp format push down error (#528)
c4eed254 is described below

commit c4eed254f434316bc5e82b8aaf0b655540b04325
Author: Petrichor <31833513+vinle...@users.noreply.github.com>
AuthorDate: Tue Dec 17 14:18:10 2024 +0800

    [fix](source)fix timestamp format push down error (#528)
---
 .../doris/flink/table/DorisExpressionVisitor.java  |  58 ++++++-
 .../doris/flink/source/DorisSourceITCase.java      | 186 +++++++++++++++++++++
 2 files changed, 239 insertions(+), 5 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java
index 66242e1e..93f15beb 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java
@@ -28,9 +28,17 @@ import 
org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.util.StringUtils;
 
+import org.apache.doris.flink.exception.DorisRuntimeException;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
 import java.util.List;
 
 public class DorisExpressionVisitor implements ExpressionVisitor<String> {
+    private static final String DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss";
+    private static final String DATETIMEV2_PATTERN = "yyyy-MM-dd 
HH:mm:ss.SSSSSS";
+    DateTimeFormatter dateTimeFormatter = 
DateTimeFormatter.ofPattern(DATETIME_PATTERN);
+    DateTimeFormatter dateTimev2Formatter = 
DateTimeFormatter.ofPattern(DATETIMEV2_PATTERN);
 
     @Override
     public String visit(CallExpression call) {
@@ -94,11 +102,47 @@ public class DorisExpressionVisitor implements 
ExpressionVisitor<String> {
     @Override
     public String visit(ValueLiteralExpression valueLiteral) {
         LogicalTypeRoot typeRoot = 
valueLiteral.getOutputDataType().getLogicalType().getTypeRoot();
-        if (typeRoot.equals(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)
-                || 
typeRoot.equals(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)
-                || typeRoot.equals(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE)
-                || typeRoot.equals(LogicalTypeRoot.DATE)) {
-            return "'" + valueLiteral + "'";
+
+        switch (typeRoot) {
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+            case TIMESTAMP_WITH_TIME_ZONE:
+            case DATE:
+                return "'" + valueLiteral + "'";
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                Class<?> conversionClass = 
valueLiteral.getOutputDataType().getConversionClass();
+                if (LocalDateTime.class.isAssignableFrom(conversionClass)) {
+                    try {
+                        LocalDateTime localDateTime =
+                                valueLiteral
+                                        .getValueAs(LocalDateTime.class)
+                                        .orElseThrow(
+                                                () ->
+                                                        new RuntimeException(
+                                                                "Failed to get 
LocalDateTime value"));
+                        int nano = localDateTime.getNano();
+                        if (nano == 0) {
+                            // if nanoseconds equals to zero, the timestamp is 
in seconds.
+                            return 
wrapWithQuotes(localDateTime.format(dateTimeFormatter));
+                        } else {
+                            // 1. Even though the datetime precision in Doris 
is set to 3, the
+                            // microseconds format such as "yyyy-MM-dd 
HH:mm:ss.SSSSSS" can still
+                            // function properly in the Doris query plan.
+                            // 2. If the timestamp is in nanoseconds, format 
it like 'yyyy-MM-dd
+                            // HH:mm:ss.SSSSSS'. This will have no impact on 
the result. Because
+                            // when parsing the imported DATETIME type data on 
the BE side (for
+                            // example, through Stream load, Spark load, 
etc.), or when using the FE
+                            // side with Nereids enabled, the decimals that 
exceed the current
+                            // precision will be rounded.
+                            return 
wrapWithQuotes(localDateTime.format(dateTimev2Formatter));
+                        }
+
+                    } catch (Exception e) {
+                        throw new DorisRuntimeException(e.getMessage());
+                    }
+                }
+                break;
+            default:
+                return valueLiteral.toString();
         }
         return valueLiteral.toString();
     }
@@ -117,4 +161,8 @@ public class DorisExpressionVisitor implements 
ExpressionVisitor<String> {
     public String visit(Expression expression) {
         return null;
     }
+
+    private static String wrapWithQuotes(String value) {
+        return "'" + value + "'";
+    }
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
index 6f148301..18de700e 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.runtime.minicluster.RpcServiceSharing;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
@@ -59,6 +60,8 @@ public class DorisSourceITCase extends AbstractITCaseService {
     private static final String TABLE_READ_TBL_OLD_API = 
"tbl_read_tbl_old_api";
     private static final String TABLE_READ_TBL_ALL_OPTIONS = 
"tbl_read_tbl_all_options";
     private static final String TABLE_READ_TBL_PUSH_DOWN = 
"tbl_read_tbl_push_down";
+    private static final String TABLE_READ_TBL_TIMESTAMP_PUSH_DOWN =
+            "tbl_read_tbl_timestamp_push_down";
     private static final String TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL =
             "tbl_read_tbl_push_down_with_union_all";
     static final String TABLE_CSV_JM = "tbl_csv_jm_source";
@@ -311,6 +314,138 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
                 "testTableSourceFilterAndProjectionPushDown", expected, 
actual.toArray());
     }
 
+    @Test
+    public void testTableSourceTimestampFilterAndProjectionPushDown() throws 
Exception {
+        initializeTimestampTable(TABLE_READ_TBL_TIMESTAMP_PUSH_DOWN);
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(DEFAULT_PARALLELISM);
+        EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
+        final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
settings);
+
+        String sourceDDL =
+                String.format(
+                        "CREATE TABLE 
doris_source_datetime_filter_and_projection_push_down ("
+                                + "`id` int ,\n"
+                                + "`name` timestamp,\n"
+                                + "`age` int,\n"
+                                + "`birthday` timestamp,\n"
+                                + "`brilliant_time` timestamp(6)\n"
+                                + ") WITH ("
+                                + " 'connector' = '"
+                                + DorisConfigOptions.IDENTIFIER
+                                + "',"
+                                + " 'fenodes' = '%s',"
+                                + " 'table.identifier' = '%s',"
+                                + " 'username' = '%s',"
+                                + " 'password' = '%s'"
+                                + ")",
+                        getFenodes(),
+                        DATABASE + "." + TABLE_READ_TBL_TIMESTAMP_PUSH_DOWN,
+                        getDorisUsername(),
+                        getDorisPassword());
+        tEnv.executeSql(sourceDDL);
+
+        List<String> actualProjectionResult =
+                generateExecuteSQLResult(
+                        tEnv,
+                        "SELECT id,birthday,brilliant_time FROM 
doris_source_datetime_filter_and_projection_push_down order by id");
+
+        List<String> actualPushDownDatetimeResult =
+                generateExecuteSQLResult(
+                        tEnv,
+                        "SELECT id,birthday FROM 
doris_source_datetime_filter_and_projection_push_down where birthday >= 
'2023-01-01 00:00:00' order by id");
+        List<String> actualPushDownMicrosecondResult =
+                generateExecuteSQLResult(
+                        tEnv,
+                        "SELECT id,brilliant_time FROM 
doris_source_datetime_filter_and_projection_push_down where brilliant_time > 
'2023-01-01 00:00:00.000001' order by id");
+        List<String> actualPushDownNanosecondResult =
+                generateExecuteSQLResult(
+                        tEnv,
+                        "SELECT id,brilliant_time FROM 
doris_source_datetime_filter_and_projection_push_down where brilliant_time > 
'2023-01-01 00:00:00.000009001' order by id");
+
+        List<String> actualPushDownNanosecondRoundDownResult =
+                generateExecuteSQLResult(
+                        tEnv,
+                        "SELECT id,brilliant_time FROM 
doris_source_datetime_filter_and_projection_push_down where brilliant_time >= 
'2023-01-01 00:00:00.999999001' order by id");
+        List<String> actualPushDownNanosecondRoundUpResult =
+                generateExecuteSQLResult(
+                        tEnv,
+                        "SELECT id,brilliant_time FROM 
doris_source_datetime_filter_and_projection_push_down where brilliant_time >= 
'2023-01-01 00:00:00.999999999' order by id");
+
+        String[] expectedProjectionResult =
+                new String[] {
+                    "+I[1, 2023-01-01T00:00, 2023-01-01T00:00:00.000001]",
+                    "+I[2, 2023-01-01T00:00:01, 2023-01-01T00:00:00.005]",
+                    "+I[3, 2023-01-01T00:00:02, 2023-01-01T00:00:00.000009]",
+                    "+I[4, 2023-01-01T00:00:02, 2023-01-01T00:00:00.999999]",
+                    "+I[5, 2023-01-01T00:00:02, 2023-01-01T00:00:00.999999]",
+                    "+I[6, 2023-01-01T00:00:02, 2023-01-01T00:00:01]"
+                };
+        String[] expectedPushDownDatetimeResult =
+                new String[] {
+                    "+I[1, 2023-01-01T00:00]",
+                    "+I[2, 2023-01-01T00:00:01]",
+                    "+I[3, 2023-01-01T00:00:02]",
+                    "+I[4, 2023-01-01T00:00:02]",
+                    "+I[5, 2023-01-01T00:00:02]",
+                    "+I[6, 2023-01-01T00:00:02]"
+                };
+        String[] expectedPushDownWithMicrosecondResult =
+                new String[] {
+                    "+I[2, 2023-01-01T00:00:00.005]",
+                    "+I[3, 2023-01-01T00:00:00.000009]",
+                    "+I[4, 2023-01-01T00:00:00.999999]",
+                    "+I[5, 2023-01-01T00:00:00.999999]",
+                    "+I[6, 2023-01-01T00:00:01]"
+                };
+
+        String[] expectedPushDownWithNanosecondResult =
+                new String[] {
+                    "+I[2, 2023-01-01T00:00:00.005]",
+                    "+I[4, 2023-01-01T00:00:00.999999]",
+                    "+I[5, 2023-01-01T00:00:00.999999]",
+                    "+I[6, 2023-01-01T00:00:01]"
+                };
+
+        String[] expectedPushDownWithNanosecondRoundDownResult =
+                new String[] {
+                    "+I[4, 2023-01-01T00:00:00.999999]",
+                    "+I[5, 2023-01-01T00:00:00.999999]",
+                    "+I[6, 2023-01-01T00:00:01]"
+                };
+
+        String[] expectedPushDownWithNanosecondRoundUpResult =
+                new String[] {
+                    "+I[4, 2023-01-01T00:00:00.999999]",
+                    "+I[5, 2023-01-01T00:00:00.999999]",
+                    "+I[6, 2023-01-01T00:00:01]"
+                };
+        checkResultInAnyOrder(
+                "testTableSourceTimestampFilterAndProjectionPushDown",
+                expectedProjectionResult,
+                actualProjectionResult.toArray());
+        checkResultInAnyOrder(
+                "testTableSourceTimestampFilterAndProjectionPushDown",
+                expectedPushDownDatetimeResult,
+                actualPushDownDatetimeResult.toArray());
+        checkResultInAnyOrder(
+                "testTableSourceTimestampFilterAndProjectionPushDown",
+                expectedPushDownWithMicrosecondResult,
+                actualPushDownMicrosecondResult.toArray());
+        checkResultInAnyOrder(
+                "testTableSourceTimestampFilterAndProjectionPushDown",
+                expectedPushDownWithNanosecondResult,
+                actualPushDownNanosecondResult.toArray());
+        checkResultInAnyOrder(
+                "testTableSourceTimestampFilterAndProjectionPushDown",
+                expectedPushDownWithNanosecondRoundDownResult,
+                actualPushDownNanosecondRoundDownResult.toArray());
+        checkResultInAnyOrder(
+                "testTableSourceTimestampFilterAndProjectionPushDown",
+                expectedPushDownWithNanosecondRoundUpResult,
+                actualPushDownNanosecondRoundUpResult.toArray());
+    }
+
     @Test
     public void testTableSourceFilterWithUnionAll() throws Exception {
         LOG.info("starting to execute testTableSourceFilterWithUnionAll 
case.");
@@ -566,6 +701,44 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
                 String.format("insert into %s.%s  values ('apache',12)", 
DATABASE, table));
     }
 
+    private void initializeTimestampTable(String table) {
+        ContainerUtils.executeSQLStatement(
+                getDorisQueryConnection(),
+                LOG,
+                String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE),
+                String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table),
+                String.format(
+                        "CREATE TABLE %s.%s ( \n"
+                                + "`id` int,\n"
+                                + "`name` varchar(256),\n"
+                                + "`age` int,\n"
+                                + "`birthday` datetime,\n"
+                                + "`brilliant_time` datetime(6),\n"
+                                + ") DISTRIBUTED BY HASH(`id`) BUCKETS 3\n"
+                                + "PROPERTIES (\n"
+                                + "\"replication_num\" = \"1\"\n"
+                                + ")\n",
+                        DATABASE, table),
+                String.format(
+                        "insert into %s.%s  values 
(1,'Kevin',54,'2023-01-01T00:00:00','2023-01-01T00:00:00.000001')",
+                        DATABASE, table),
+                String.format(
+                        "insert into %s.%s  values 
(2,'Dylan',25,'2023-01-01T00:00:01','2023-01-01T00:00:00.005000')",
+                        DATABASE, table),
+                String.format(
+                        "insert into %s.%s  values 
(3,'Darren',65,'2023-01-01T00:00:02','2023-01-01T00:00:00.000009')",
+                        DATABASE, table),
+                String.format(
+                        "insert into %s.%s  values 
(4,'Warren',75,'2023-01-01T00:00:02','2023-01-01T00:00:00.999999')",
+                        DATABASE, table),
+                String.format(
+                        "insert into %s.%s  values 
(5,'Simba',75,'2023-01-01T00:00:02','2023-01-01T00:00:00.999999001')",
+                        DATABASE, table),
+                String.format(
+                        "insert into %s.%s  values 
(6,'Jimmy',75,'2023-01-01T00:00:02','2023-01-01T00:00:00.999999999')",
+                        DATABASE, table));
+    }
+
     private void initializeTableWithData(String table) {
         ContainerUtils.executeSQLStatement(
                 getDorisQueryConnection(),
@@ -610,4 +783,17 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
         }
         return rows;
     }
+
+    private List<String> generateExecuteSQLResult(StreamTableEnvironment tEnv, 
String executeSql)
+            throws Exception {
+        List<String> actualResultList = new ArrayList<>();
+        TableResult tableResult = tEnv.executeSql(executeSql);
+        try (CloseableIterator<Row> iterator = tableResult.collect()) {
+            while (iterator.hasNext()) {
+
+                actualResultList.add(iterator.next().toString());
+            }
+        }
+        return actualResultList;
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to