This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new c4eed254 [fix](source)fix timestamp format push down error (#528) c4eed254 is described below commit c4eed254f434316bc5e82b8aaf0b655540b04325 Author: Petrichor <31833513+vinle...@users.noreply.github.com> AuthorDate: Tue Dec 17 14:18:10 2024 +0800 [fix](source)fix timestamp format push down error (#528) --- .../doris/flink/table/DorisExpressionVisitor.java | 58 ++++++- .../doris/flink/source/DorisSourceITCase.java | 186 +++++++++++++++++++++ 2 files changed, 239 insertions(+), 5 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java index 66242e1e..93f15beb 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisExpressionVisitor.java @@ -28,9 +28,17 @@ import org.apache.flink.table.functions.BuiltInFunctionDefinitions; import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.util.StringUtils; +import org.apache.doris.flink.exception.DorisRuntimeException; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; import java.util.List; public class DorisExpressionVisitor implements ExpressionVisitor<String> { + private static final String DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss"; + private static final String DATETIMEV2_PATTERN = "yyyy-MM-dd HH:mm:ss.SSSSSS"; + DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(DATETIME_PATTERN); + DateTimeFormatter dateTimev2Formatter = DateTimeFormatter.ofPattern(DATETIMEV2_PATTERN); @Override public String visit(CallExpression call) { @@ -94,11 +102,47 @@ public class DorisExpressionVisitor implements ExpressionVisitor<String> { @Override public String visit(ValueLiteralExpression valueLiteral) { LogicalTypeRoot typeRoot = valueLiteral.getOutputDataType().getLogicalType().getTypeRoot(); - if (typeRoot.equals(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) - || typeRoot.equals(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE) - || typeRoot.equals(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE) - || typeRoot.equals(LogicalTypeRoot.DATE)) { - return "'" + valueLiteral + "'"; + + switch (typeRoot) { + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + case TIMESTAMP_WITH_TIME_ZONE: + case DATE: + return "'" + valueLiteral + "'"; + case TIMESTAMP_WITHOUT_TIME_ZONE: + Class<?> conversionClass = valueLiteral.getOutputDataType().getConversionClass(); + if (LocalDateTime.class.isAssignableFrom(conversionClass)) { + try { + LocalDateTime localDateTime = + valueLiteral + .getValueAs(LocalDateTime.class) + .orElseThrow( + () -> + new RuntimeException( + "Failed to get LocalDateTime value")); + int nano = localDateTime.getNano(); + if (nano == 0) { + // if nanoseconds equals to zero, the timestamp is in seconds. + return wrapWithQuotes(localDateTime.format(dateTimeFormatter)); + } else { + // 1. Even though the datetime precision in Doris is set to 3, the + // microseconds format such as "yyyy-MM-dd HH:mm:ss.SSSSSS" can still + // function properly in the Doris query plan. + // 2. If the timestamp is in nanoseconds, format it like 'yyyy-MM-dd + // HH:mm:ss.SSSSSS'. This will have no impact on the result. Because + // when parsing the imported DATETIME type data on the BE side (for + // example, through Stream load, Spark load, etc.), or when using the FE + // side with Nereids enabled, the decimals that exceed the current + // precision will be rounded. + return wrapWithQuotes(localDateTime.format(dateTimev2Formatter)); + } + + } catch (Exception e) { + throw new DorisRuntimeException(e.getMessage()); + } + } + break; + default: + return valueLiteral.toString(); } return valueLiteral.toString(); } @@ -117,4 +161,8 @@ public class DorisExpressionVisitor implements ExpressionVisitor<String> { public String visit(Expression expression) { return null; } + + private static String wrapWithQuotes(String value) { + return "'" + value + "'"; + } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java index 6f148301..18de700e 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.runtime.minicluster.RpcServiceSharing; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.test.util.MiniClusterWithClientResource; @@ -59,6 +60,8 @@ public class DorisSourceITCase extends AbstractITCaseService { private static final String TABLE_READ_TBL_OLD_API = "tbl_read_tbl_old_api"; private static final String TABLE_READ_TBL_ALL_OPTIONS = "tbl_read_tbl_all_options"; private static final String TABLE_READ_TBL_PUSH_DOWN = "tbl_read_tbl_push_down"; + private static final String TABLE_READ_TBL_TIMESTAMP_PUSH_DOWN = + "tbl_read_tbl_timestamp_push_down"; private static final String TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL = "tbl_read_tbl_push_down_with_union_all"; static final String TABLE_CSV_JM = "tbl_csv_jm_source"; @@ -311,6 +314,138 @@ public class DorisSourceITCase extends AbstractITCaseService { "testTableSourceFilterAndProjectionPushDown", expected, actual.toArray()); } + @Test + public void testTableSourceTimestampFilterAndProjectionPushDown() throws Exception { + initializeTimestampTable(TABLE_READ_TBL_TIMESTAMP_PUSH_DOWN); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); + final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); + + String sourceDDL = + String.format( + "CREATE TABLE doris_source_datetime_filter_and_projection_push_down (" + + "`id` int ,\n" + + "`name` timestamp,\n" + + "`age` int,\n" + + "`birthday` timestamp,\n" + + "`brilliant_time` timestamp(6)\n" + + ") WITH (" + + " 'connector' = '" + + DorisConfigOptions.IDENTIFIER + + "'," + + " 'fenodes' = '%s'," + + " 'table.identifier' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'" + + ")", + getFenodes(), + DATABASE + "." + TABLE_READ_TBL_TIMESTAMP_PUSH_DOWN, + getDorisUsername(), + getDorisPassword()); + tEnv.executeSql(sourceDDL); + + List<String> actualProjectionResult = + generateExecuteSQLResult( + tEnv, + "SELECT id,birthday,brilliant_time FROM doris_source_datetime_filter_and_projection_push_down order by id"); + + List<String> actualPushDownDatetimeResult = + generateExecuteSQLResult( + tEnv, + "SELECT id,birthday FROM doris_source_datetime_filter_and_projection_push_down where birthday >= '2023-01-01 00:00:00' order by id"); + List<String> actualPushDownMicrosecondResult = + generateExecuteSQLResult( + tEnv, + "SELECT id,brilliant_time FROM doris_source_datetime_filter_and_projection_push_down where brilliant_time > '2023-01-01 00:00:00.000001' order by id"); + List<String> actualPushDownNanosecondResult = + generateExecuteSQLResult( + tEnv, + "SELECT id,brilliant_time FROM doris_source_datetime_filter_and_projection_push_down where brilliant_time > '2023-01-01 00:00:00.000009001' order by id"); + + List<String> actualPushDownNanosecondRoundDownResult = + generateExecuteSQLResult( + tEnv, + "SELECT id,brilliant_time FROM doris_source_datetime_filter_and_projection_push_down where brilliant_time >= '2023-01-01 00:00:00.999999001' order by id"); + List<String> actualPushDownNanosecondRoundUpResult = + generateExecuteSQLResult( + tEnv, + "SELECT id,brilliant_time FROM doris_source_datetime_filter_and_projection_push_down where brilliant_time >= '2023-01-01 00:00:00.999999999' order by id"); + + String[] expectedProjectionResult = + new String[] { + "+I[1, 2023-01-01T00:00, 2023-01-01T00:00:00.000001]", + "+I[2, 2023-01-01T00:00:01, 2023-01-01T00:00:00.005]", + "+I[3, 2023-01-01T00:00:02, 2023-01-01T00:00:00.000009]", + "+I[4, 2023-01-01T00:00:02, 2023-01-01T00:00:00.999999]", + "+I[5, 2023-01-01T00:00:02, 2023-01-01T00:00:00.999999]", + "+I[6, 2023-01-01T00:00:02, 2023-01-01T00:00:01]" + }; + String[] expectedPushDownDatetimeResult = + new String[] { + "+I[1, 2023-01-01T00:00]", + "+I[2, 2023-01-01T00:00:01]", + "+I[3, 2023-01-01T00:00:02]", + "+I[4, 2023-01-01T00:00:02]", + "+I[5, 2023-01-01T00:00:02]", + "+I[6, 2023-01-01T00:00:02]" + }; + String[] expectedPushDownWithMicrosecondResult = + new String[] { + "+I[2, 2023-01-01T00:00:00.005]", + "+I[3, 2023-01-01T00:00:00.000009]", + "+I[4, 2023-01-01T00:00:00.999999]", + "+I[5, 2023-01-01T00:00:00.999999]", + "+I[6, 2023-01-01T00:00:01]" + }; + + String[] expectedPushDownWithNanosecondResult = + new String[] { + "+I[2, 2023-01-01T00:00:00.005]", + "+I[4, 2023-01-01T00:00:00.999999]", + "+I[5, 2023-01-01T00:00:00.999999]", + "+I[6, 2023-01-01T00:00:01]" + }; + + String[] expectedPushDownWithNanosecondRoundDownResult = + new String[] { + "+I[4, 2023-01-01T00:00:00.999999]", + "+I[5, 2023-01-01T00:00:00.999999]", + "+I[6, 2023-01-01T00:00:01]" + }; + + String[] expectedPushDownWithNanosecondRoundUpResult = + new String[] { + "+I[4, 2023-01-01T00:00:00.999999]", + "+I[5, 2023-01-01T00:00:00.999999]", + "+I[6, 2023-01-01T00:00:01]" + }; + checkResultInAnyOrder( + "testTableSourceTimestampFilterAndProjectionPushDown", + expectedProjectionResult, + actualProjectionResult.toArray()); + checkResultInAnyOrder( + "testTableSourceTimestampFilterAndProjectionPushDown", + expectedPushDownDatetimeResult, + actualPushDownDatetimeResult.toArray()); + checkResultInAnyOrder( + "testTableSourceTimestampFilterAndProjectionPushDown", + expectedPushDownWithMicrosecondResult, + actualPushDownMicrosecondResult.toArray()); + checkResultInAnyOrder( + "testTableSourceTimestampFilterAndProjectionPushDown", + expectedPushDownWithNanosecondResult, + actualPushDownNanosecondResult.toArray()); + checkResultInAnyOrder( + "testTableSourceTimestampFilterAndProjectionPushDown", + expectedPushDownWithNanosecondRoundDownResult, + actualPushDownNanosecondRoundDownResult.toArray()); + checkResultInAnyOrder( + "testTableSourceTimestampFilterAndProjectionPushDown", + expectedPushDownWithNanosecondRoundUpResult, + actualPushDownNanosecondRoundUpResult.toArray()); + } + @Test public void testTableSourceFilterWithUnionAll() throws Exception { LOG.info("starting to execute testTableSourceFilterWithUnionAll case."); @@ -566,6 +701,44 @@ public class DorisSourceITCase extends AbstractITCaseService { String.format("insert into %s.%s values ('apache',12)", DATABASE, table)); } + private void initializeTimestampTable(String table) { + ContainerUtils.executeSQLStatement( + getDorisQueryConnection(), + LOG, + String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE), + String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table), + String.format( + "CREATE TABLE %s.%s ( \n" + + "`id` int,\n" + + "`name` varchar(256),\n" + + "`age` int,\n" + + "`birthday` datetime,\n" + + "`brilliant_time` datetime(6),\n" + + ") DISTRIBUTED BY HASH(`id`) BUCKETS 3\n" + + "PROPERTIES (\n" + + "\"replication_num\" = \"1\"\n" + + ")\n", + DATABASE, table), + String.format( + "insert into %s.%s values (1,'Kevin',54,'2023-01-01T00:00:00','2023-01-01T00:00:00.000001')", + DATABASE, table), + String.format( + "insert into %s.%s values (2,'Dylan',25,'2023-01-01T00:00:01','2023-01-01T00:00:00.005000')", + DATABASE, table), + String.format( + "insert into %s.%s values (3,'Darren',65,'2023-01-01T00:00:02','2023-01-01T00:00:00.000009')", + DATABASE, table), + String.format( + "insert into %s.%s values (4,'Warren',75,'2023-01-01T00:00:02','2023-01-01T00:00:00.999999')", + DATABASE, table), + String.format( + "insert into %s.%s values (5,'Simba',75,'2023-01-01T00:00:02','2023-01-01T00:00:00.999999001')", + DATABASE, table), + String.format( + "insert into %s.%s values (6,'Jimmy',75,'2023-01-01T00:00:02','2023-01-01T00:00:00.999999999')", + DATABASE, table)); + } + private void initializeTableWithData(String table) { ContainerUtils.executeSQLStatement( getDorisQueryConnection(), @@ -610,4 +783,17 @@ public class DorisSourceITCase extends AbstractITCaseService { } return rows; } + + private List<String> generateExecuteSQLResult(StreamTableEnvironment tEnv, String executeSql) + throws Exception { + List<String> actualResultList = new ArrayList<>(); + TableResult tableResult = tEnv.executeSql(executeSql); + try (CloseableIterator<Row> iterator = tableResult.collect()) { + while (iterator.hasNext()) { + + actualResultList.add(iterator.next().toString()); + } + } + return actualResultList; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org