This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c7bd664 [improve] Concat doris.filter.query option when push down 
(#552)
3c7bd664 is described below

commit 3c7bd664ee52361a91df85fa24336c335a4dcceb
Author: wudi <676366...@qq.com>
AuthorDate: Tue Feb 11 14:28:54 2025 +0800

    [improve] Concat doris.filter.query option when push down (#552)
---
 .../org/apache/doris/flink/rest/RestService.java   |  4 +-
 .../doris/flink/table/DorisDynamicTableSource.java |  9 ++-
 .../doris/flink/source/DorisSourceITCase.java      | 78 ++++++++++++++++++++++
 3 files changed, 86 insertions(+), 5 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
index 75c1e27d..523a39e2 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
@@ -615,12 +615,12 @@ public class RestService implements Serializable {
         }
 
         if (queryPlan == null) {
-            logger.error(SHOULD_NOT_HAPPEN_MESSAGE);
+            logger.error(SHOULD_NOT_HAPPEN_MESSAGE + " res: " + response);
             throw new ShouldNeverHappenException();
         }
 
         if (queryPlan.getStatus() != REST_RESPONSE_STATUS_OK) {
-            String errMsg = "Doris FE's response is not OK, status is " + 
queryPlan.getStatus();
+            String errMsg = "Doris FE's response is not OK, res: " + response;
             logger.error(errMsg);
             throw new DorisException(errMsg);
         }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
index 9763a888..a68cf189 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
@@ -90,8 +90,12 @@ public final class DorisDynamicTableSource
 
     @Override
     public ScanRuntimeProvider getScanRuntimeProvider(ScanContext 
runtimeProviderContext) {
-        if (StringUtils.isNullOrWhitespaceOnly(readOptions.getFilterQuery())) {
+        if (!resolvedFilterQuery.isEmpty()) {
             String filterQuery = 
resolvedFilterQuery.stream().collect(Collectors.joining(" AND "));
+            if 
(!StringUtils.isNullOrWhitespaceOnly(readOptions.getFilterQuery())) {
+                filterQuery =
+                        String.format("(%s) AND (%s)", 
readOptions.getFilterQuery(), filterQuery);
+            }
             readOptions.setFilterQuery(filterQuery);
         }
 
@@ -195,8 +199,7 @@ public final class DorisDynamicTableSource
         DorisExpressionVisitor expressionVisitor = new 
DorisExpressionVisitor();
         for (ResolvedExpression filter : filters) {
             String filterQuery = filter.accept(expressionVisitor);
-            if 
(StringUtils.isNullOrWhitespaceOnly(readOptions.getFilterQuery())
-                    && !StringUtils.isNullOrWhitespaceOnly(filterQuery)) {
+            if (!StringUtils.isNullOrWhitespaceOnly(filterQuery)) {
                 acceptedFilters.add(filter);
                 this.resolvedFilterQuery.add(filterQuery);
             } else {
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
index 18de700e..3eb96597 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
@@ -44,6 +44,8 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.LocalDate;
+import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
@@ -68,6 +70,8 @@ public class DorisSourceITCase extends AbstractITCaseService {
     static final String TABLE_CSV_TM = "tbl_csv_tm_source";
     private static final String 
TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL_NOT_EQ_FILTER =
             "tbl_read_tbl_push_down_with_union_all_not_eq_filter";
+    private static final String TABLE_READ_TBL_PUSH_DOWN_WITH_FILTER_QUERY =
+            "tbl_read_tbl_push_down_with_filter_query";
 
     @Rule
     public final MiniClusterWithClientResource miniClusterResource =
@@ -490,6 +494,80 @@ public class DorisSourceITCase extends 
AbstractITCaseService {
         checkResultInAnyOrder("testTableSourceFilterWithUnionAll", expected, 
actual.toArray());
     }
 
+    @Test
+    public void testTableSourceFilterWithFilterQuery() throws Exception {
+        LOG.info("starting to execute testTableSourceFilterWithFilterQuery 
case.");
+        // init doris table
+        ContainerUtils.executeSQLStatement(
+                getDorisQueryConnection(),
+                LOG,
+                String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE),
+                String.format(
+                        "DROP TABLE IF EXISTS %s.%s",
+                        DATABASE, TABLE_READ_TBL_PUSH_DOWN_WITH_FILTER_QUERY),
+                String.format(
+                        "CREATE TABLE %s.%s ( \n"
+                                + "`name` varchar(256),\n"
+                                + "`dt` date,\n"
+                                + "`age` int\n"
+                                + ") DISTRIBUTED BY HASH(`name`) BUCKETS 10\n"
+                                + "PROPERTIES (\n"
+                                + "\"replication_num\" = \"1\"\n"
+                                + ")\n",
+                        DATABASE, TABLE_READ_TBL_PUSH_DOWN_WITH_FILTER_QUERY),
+                String.format(
+                        "insert into %s.%s  values 
('doris',date_sub(now(),INTERVAL 7 DAY), 18)",
+                        DATABASE, TABLE_READ_TBL_PUSH_DOWN_WITH_FILTER_QUERY),
+                String.format(
+                        "insert into %s.%s  values ('flink','2025-02-10', 10)",
+                        DATABASE, TABLE_READ_TBL_PUSH_DOWN_WITH_FILTER_QUERY),
+                String.format(
+                        "insert into %s.%s  values ('apache',now(), 12)",
+                        DATABASE, TABLE_READ_TBL_PUSH_DOWN_WITH_FILTER_QUERY));
+
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(DEFAULT_PARALLELISM);
+        final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        String sourceDDL =
+                String.format(
+                        "CREATE TABLE doris_source_filter_with_filter_query ("
+                                + " name STRING,"
+                                + " dt DATE,"
+                                + " age INT"
+                                + ") WITH ("
+                                + " 'connector' = '"
+                                + DorisConfigOptions.IDENTIFIER
+                                + "',"
+                                + " 'fenodes' = '%s',"
+                                + " 'table.identifier' = '%s',"
+                                + " 'username' = '%s',"
+                                + " 'password' = '%s',"
+                                + " 'doris.filter.query' = ' (dt = 
DATE_FORMAT(TIMESTAMPADD(DAY , -7, NOW()), ''yyyy-MM-dd'')) '"
+                                + ")",
+                        getFenodes(),
+                        DATABASE + "." + 
TABLE_READ_TBL_PUSH_DOWN_WITH_FILTER_QUERY,
+                        getDorisUsername(),
+                        getDorisPassword());
+        tEnv.executeSql(sourceDDL);
+        String querySql =
+                "  SELECT * FROM doris_source_filter_with_filter_query where 
name = 'doris' and age > 2";
+        TableResult tableResult = tEnv.executeSql(querySql);
+
+        List<String> actual = new ArrayList<>();
+        try (CloseableIterator<Row> iterator = tableResult.collect()) {
+            while (iterator.hasNext()) {
+                actual.add(iterator.next().toString());
+            }
+        }
+
+        String nowDate =
+                
LocalDate.now().minusDays(7).format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
+
+        String[] expected = new String[] {"+I[doris, " + nowDate + ", 18]"};
+        checkResultInAnyOrder("testTableSourceFilterWithFilterQuery", 
expected, actual.toArray());
+    }
+
     @Test
     public void testTableSourceFilterWithUnionAllNotEqualFilter() throws 
Exception {
         LOG.info("starting to execute 
testTableSourceFilterWithUnionAllNotEqualFilter case.");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to