This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new dab8ed6aa2 [fix](Nereids)Runtimefilter pushdown through TopN or Window 
bug (#24432)
dab8ed6aa2 is described below

commit dab8ed6aa2fc4d7f294ef2459d54f124f58e1956
Author: minghong <engle...@gmail.com>
AuthorDate: Fri Sep 15 14:42:46 2023 +0800

    [fix](Nereids)Runtimefilter pushdown through TopN or Window bug (#24432)
---
 .../processor/post/RuntimeFilterGenerator.java     | 50 ++++++++++------------
 .../doris/nereids/trees/plans/algebra/Window.java  | 34 +++++++++++++++
 .../nereids/postprocess/RuntimeFilterTest.java     | 21 +++++++++
 3 files changed, 78 insertions(+), 27 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
index de4d65f6c5..a572d3c7bd 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
@@ -51,7 +51,9 @@ import 
org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalSchemaScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalWindow;
 import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
 import org.apache.doris.nereids.util.ExpressionUtils;
 import org.apache.doris.nereids.util.JoinUtils;
@@ -124,33 +126,6 @@ public class RuntimeFilterGenerator extends 
PlanPostProcessor {
         } else {
             pushDownRuntimeFilterCommon(join, context);
         }
-        //
-        // if (DENIED_JOIN_TYPES.contains(join.getJoinType()) || 
join.isMarkJoin()) {
-        //     // aliasTransMap is also used for judging whether the slot can 
be as rf target.
-        //     // for denied join type, the forbidden slots will be removed 
from the map.
-        //     // for example: a full outer join b on a.id = b.id, all slots 
will be removed out.
-        //     // for left outer join, only remove the right side slots and 
leave the left side.
-        //     // in later visit, the additional checking for the join type 
will be invoked for different cases:
-        //     // case 1: a left join b on a.id = b.id, checking whether rf on 
b.id can be pushed to a,
-        //     the answer is no,
-        //     //         since current join type is left outer join which is 
in denied list;
-        //     // case 2: (a left join b on a.id = b.id) inner join c on a.id2 
= c.id2, checking whether rf on c.id2 can
-        //     //         be pushed to a, the answer is yes, since the current 
join is inner join which is permitted.
-        //     if (join.getJoinType() == JoinType.LEFT_OUTER_JOIN) {
-        //         Set<Slot> slots = join.right().getOutputSet();
-        //         slots.forEach(aliasTransferMap::remove);
-        //     } else {
-        //         Set<Slot> slots = join.getOutputSet();
-        //         slots.forEach(aliasTransferMap::remove);
-        //     }
-        // } else {
-        //     collectPushDownCTEInfos(join, context);
-        //     if (!getPushDownCTECandidates(ctx).isEmpty()) {
-        //         pushDownRuntimeFilterIntoCTE(ctx);
-        //     } else {
-        //         pushDownRuntimeFilterCommon(join, context);
-        //     }
-        // }
         return join;
     }
 
@@ -168,6 +143,26 @@ public class RuntimeFilterGenerator extends 
PlanPostProcessor {
         return producer;
     }
 
+    @Override
+    public PhysicalPlan visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, 
CascadesContext context) {
+        topN.child().accept(this, context);
+        PhysicalPlan child = (PhysicalPlan) topN.child();
+        for (Slot slot : child.getOutput()) {
+            
context.getRuntimeFilterContext().getAliasTransferMap().remove(slot);
+        }
+        return topN;
+    }
+
+    @Override
+    public PhysicalPlan visitPhysicalWindow(PhysicalWindow<? extends Plan> 
window, CascadesContext context) {
+        window.child().accept(this, context);
+        Set<SlotReference> commonPartitionKeys = 
window.getCommonPartitionKeyFromWindowExpressions();
+        window.child().getOutput().stream().filter(slot -> 
!commonPartitionKeys.contains(slot)).forEach(
+                slot -> 
context.getRuntimeFilterContext().getAliasTransferMap().remove(slot)
+        );
+        return window;
+    }
+
     @Override
     public PhysicalPlan visitPhysicalNestedLoopJoin(PhysicalNestedLoopJoin<? 
extends Plan, ? extends Plan> join,
             CascadesContext context) {
@@ -326,6 +321,7 @@ public class RuntimeFilterGenerator extends 
PlanPostProcessor {
         if (!checkPhysicalRelationType(scan)) {
             return;
         }
+
         if (scan instanceof PhysicalCTEConsumer) {
             Set<CTEId> processedCTE = 
context.getRuntimeFilterContext().getProcessedCTE();
             CTEId cteId = ((PhysicalCTEConsumer) scan).getCteId();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Window.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Window.java
index 35f6547b8a..00d290940e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Window.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Window.java
@@ -22,16 +22,24 @@ import org.apache.doris.analysis.Expr;
 import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.nereids.glue.translator.ExpressionTranslator;
 import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.nereids.trees.expressions.Alias;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.expressions.WindowExpression;
 import org.apache.doris.nereids.trees.expressions.WindowFrame;
 import org.apache.doris.nereids.trees.expressions.WindowFrame.FrameBoundType;
 import org.apache.doris.nereids.trees.expressions.WindowFrame.FrameBoundary;
 import org.apache.doris.nereids.trees.expressions.WindowFrame.FrameUnitsType;
 import org.apache.doris.nereids.trees.expressions.literal.Literal;
 
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+
 import java.math.BigDecimal;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 /**
  * interface for LogicalWindow and PhysicalWindow
@@ -86,4 +94,30 @@ public interface Window {
         }
     }
 
+    /**
+     *
+     * select rank() over (partition by A, B) as r, sum(x) over(A, C) as s 
from T;
+     * A is a common partition key for all windowExpressions.
+     * for a common Partition key A, we could push filter A=1 through this 
window.
+     */
+    default Set<SlotReference> getCommonPartitionKeyFromWindowExpressions() {
+        ImmutableSet.Builder<SlotReference> commonPartitionKeySet = 
ImmutableSet.builder();
+        Map<Expression, Integer> partitionKeyCount = Maps.newHashMap();
+        for (Expression expr : getWindowExpressions()) {
+            if (expr instanceof Alias && expr.child(0) instanceof 
WindowExpression) {
+                WindowExpression winExpr = (WindowExpression) expr.child(0);
+                for (Expression partitionKey : winExpr.getPartitionKeys()) {
+                    int count = partitionKeyCount.getOrDefault(partitionKey, 
0);
+                    partitionKeyCount.put(partitionKey, count + 1);
+                }
+            }
+        }
+        int winExprCount = getWindowExpressions().size();
+        for (Map.Entry<Expression, Integer> entry : 
partitionKeyCount.entrySet()) {
+            if (entry.getValue() == winExprCount && entry.getKey() instanceof 
SlotReference) {
+                commonPartitionKeySet.add((SlotReference) entry.getKey());
+            }
+        }
+        return commonPartitionKeySet.build();
+    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
index fc08b0c35d..b429c9defa 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
@@ -255,4 +255,25 @@ public class RuntimeFilterTest extends SSBTestBase {
                     filter.getTargetExprs().get(0).getName())));
         }
     }
+
+    @Test
+    public void testRuntimeFilterBlockByWindow() {
+        String sql = "SELECT * FROM (select rank() over(partition by 
lo_partkey), lo_custkey from lineorder) t JOIN customer on lo_custkey = 
c_custkey";
+        List<RuntimeFilter> filters = getRuntimeFilters(sql).get();
+        Assertions.assertEquals(0, filters.size());
+    }
+
+    @Test
+    public void testRuntimeFilterNotBlockByWindow() {
+        String sql = "SELECT * FROM (select rank() over(partition by 
lo_custkey), lo_custkey from lineorder) t JOIN customer on lo_custkey = 
c_custkey";
+        List<RuntimeFilter> filters = getRuntimeFilters(sql).get();
+        Assertions.assertEquals(1, filters.size());
+    }
+
+    @Test
+    public void testRuntimeFilterBlockByTopN() {
+        String sql = "SELECT * FROM (select lo_custkey from lineorder order by 
lo_custkey limit 10) t JOIN customer on lo_custkey = c_custkey";
+        List<RuntimeFilter> filters = getRuntimeFilters(sql).get();
+        Assertions.assertEquals(0, filters.size());
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to