This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 70ba9f1ebb7d44d69cd97654c3ddf16a9cee673e
Author: xzj7019 <131111794+xzj7...@users.noreply.github.com>
AuthorDate: Mon Sep 18 11:26:55 2023 +0800

    [fix](nereids) fix cte filter pushdown if the filters can be aggregated 
(#24489)
    
    Current cte common filter extraction doesn't work if the filters can be 
aggregated, which will lead the common filter can't be pushed down inside cte. 
Consider the following case:
    with main as (select c1 from t1) select * from (select m1.* from main m1, 
main m2 where m1.c1 = m2.c1) abc where c1 = 1;
    The common c1=1 filter can't be pushed down.
    
    This pr fixed the original extraction logic from set to list to make the 
logic works, and this will also resolve the tpcds query4/11's pattern works 
well also.
---
 .../nereids/rules/rewrite/RewriteCteChildren.java  |  7 ++--
 .../nereids_p0/cte/test_cte_filter_pushdown.out    | 43 ++++++++++++++++++++
 .../nereids_p0/cte/test_cte_filter_pushdown.groovy | 47 ++++++++++++++++++++++
 3 files changed, 94 insertions(+), 3 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/RewriteCteChildren.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/RewriteCteChildren.java
index d88ef62e314..5aa286e67f9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/RewriteCteChildren.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/RewriteCteChildren.java
@@ -145,10 +145,10 @@ public class RewriteCteChildren extends 
DefaultPlanRewriter<CascadesContext> imp
         Set<RelationId> consumerIds = 
cascadesContext.getCteIdToConsumers().get(cteId).stream()
                 .map(LogicalCTEConsumer::getRelationId)
                 .collect(Collectors.toSet());
-        Set<Set<Expression>> filtersAboveEachConsumer = 
cascadesContext.getConsumerIdToFilters().entrySet().stream()
+        List<Set<Expression>> filtersAboveEachConsumer = 
cascadesContext.getConsumerIdToFilters().entrySet().stream()
                 .filter(kv -> consumerIds.contains(kv.getKey()))
                 .map(Entry::getValue)
-                .collect(Collectors.toSet());
+                .collect(Collectors.toList());
         Set<Expression> someone = 
filtersAboveEachConsumer.stream().findFirst().orElse(null);
         if (someone == null) {
             return child;
@@ -156,11 +156,12 @@ public class RewriteCteChildren extends 
DefaultPlanRewriter<CascadesContext> imp
         int filterSize = 
cascadesContext.getCteIdToConsumers().get(cteId).size();
         Set<Expression> conjuncts = new HashSet<>();
         for (Expression f : someone) {
-            int matchCount = 1;
+            int matchCount = 0;
             Set<SlotReference> slots = f.collect(e -> e instanceof 
SlotReference);
             Set<Expression> mightBeJoined = new HashSet<>();
             for (Set<Expression> another : filtersAboveEachConsumer) {
                 if (another.equals(someone)) {
+                    matchCount++;
                     continue;
                 }
                 Set<Expression> matched = new HashSet<>();
diff --git a/regression-test/data/nereids_p0/cte/test_cte_filter_pushdown.out 
b/regression-test/data/nereids_p0/cte/test_cte_filter_pushdown.out
new file mode 100644
index 00000000000..0c632f4fc29
--- /dev/null
+++ b/regression-test/data/nereids_p0/cte/test_cte_filter_pushdown.out
@@ -0,0 +1,43 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !cte_filter_pushdown_1 --
+PhysicalCteAnchor ( cteId=CTEId#0 )
+--PhysicalCteProducer ( cteId=CTEId#0 )
+----PhysicalWindow
+------PhysicalQuickSort
+--------PhysicalProject
+----------filter((main.k1 = 1))
+------------PhysicalOlapScan[test]
+--PhysicalResultSink
+----PhysicalDistribute
+------PhysicalProject
+--------hashJoin[INNER_JOIN](m1.k1 = m2.k1)
+----------PhysicalDistribute
+------------filter((temp.k1 = 1))
+--------------PhysicalCteConsumer ( cteId=CTEId#0 )
+----------PhysicalDistribute
+------------PhysicalProject
+--------------filter((m2.k1 = 1))
+----------------PhysicalCteConsumer ( cteId=CTEId#0 )
+
+-- !cte_filter_pushdown_2 --
+PhysicalCteAnchor ( cteId=CTEId#0 )
+--PhysicalCteProducer ( cteId=CTEId#0 )
+----PhysicalProject
+------filter((main.k1 = 1))
+--------PhysicalWindow
+----------PhysicalQuickSort
+------------PhysicalDistribute
+--------------PhysicalProject
+----------------PhysicalOlapScan[test]
+--PhysicalResultSink
+----PhysicalDistribute
+------PhysicalProject
+--------hashJoin[INNER_JOIN](m1.k1 = m2.k1)
+----------PhysicalDistribute
+------------filter((temp.k1 = 1))
+--------------PhysicalCteConsumer ( cteId=CTEId#0 )
+----------PhysicalDistribute
+------------PhysicalProject
+--------------filter((m2.k1 = 1))
+----------------PhysicalCteConsumer ( cteId=CTEId#0 )
+
diff --git 
a/regression-test/suites/nereids_p0/cte/test_cte_filter_pushdown.groovy 
b/regression-test/suites/nereids_p0/cte/test_cte_filter_pushdown.groovy
new file mode 100644
index 00000000000..8f08721f6cd
--- /dev/null
+++ b/regression-test/suites/nereids_p0/cte/test_cte_filter_pushdown.groovy
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+suite("test_cte_filter_pushdown)") {
+    sql "SET enable_nereids_planner=true"
+    sql "SET enable_pipeline_engine=true"
+    sql "SET enable_fallback_to_original_planner=false"
+
+    // CTE filter pushing down with the same filter
+    qt_cte_filter_pushdown_1 """
+            explain shape plan
+            with main AS (
+               select k1, row_number() over (partition by k1) rn
+               from nereids_test_query_db.test
+           )
+           select * from (
+               select m1.* from main m1, main m2
+               where m1.k1 = m2.k1
+           ) temp
+           where k1 = 1;
+    """
+    qt_cte_filter_pushdown_2 """
+            explain shape plan
+            with main AS (
+               select k1, row_number() over (partition by k2) rn
+               from nereids_test_query_db.test
+           )
+           select * from (
+               select m1.* from main m1, main m2
+               where m1.k1 = m2.k1
+           ) temp
+           where k1 = 1;
+    """
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to