This is an automated email from the ASF dual-hosted git repository.

englefly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f4f950b9bd9 [fix](nereids)push more than one runtime filters into cte 
(#30901)
f4f950b9bd9 is described below

commit f4f950b9bd934771074001c23feae67a9793d120
Author: minghong <engle...@gmail.com>
AuthorDate: Wed Feb 21 09:55:30 2024 +0800

    [fix](nereids)push more than one runtime filters into cte (#30901)
    
    * push rf into cte, used by tpcds95
---
 .../glue/translator/PhysicalPlanTranslator.java    |  6 +--
 .../glue/translator/RuntimeFilterTranslator.java   |  7 ---
 .../processor/post/RuntimeFilterContext.java       | 41 ++++++++--------
 .../processor/post/RuntimeFilterGenerator.java     | 57 ++++++++++++----------
 .../nereids/postprocess/RuntimeFilterTest.java     |  2 +-
 .../noStatsRfPrune/query95.out                     |  2 +-
 .../no_stats_shape/query95.out                     |  2 +-
 7 files changed, 58 insertions(+), 59 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index e1571778a15..c9cb534c9c2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -1284,8 +1284,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                 .forEach(s -> rightChildOutputMap.put(s.getExprId(), s));
 
         // translate runtime filter
-        context.getRuntimeTranslator().ifPresent(runtimeFilterTranslator -> 
runtimeFilterTranslator
-                .getRuntimeFilterOfHashJoinNode(physicalHashJoin)
+        context.getRuntimeTranslator().ifPresent(runtimeFilterTranslator -> 
physicalHashJoin.getRuntimeFilters()
                 .forEach(filter -> 
runtimeFilterTranslator.createLegacyRuntimeFilter(filter, hashJoinNode, 
context)));
 
         // make intermediate tuple
@@ -1484,8 +1483,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
             }
             // translate runtime filter
             context.getRuntimeTranslator().ifPresent(runtimeFilterTranslator 
-> {
-                Set<RuntimeFilter> filters = runtimeFilterTranslator
-                        .getRuntimeFilterOfHashJoinNode(nestedLoopJoin);
+                List<RuntimeFilter> filters = 
nestedLoopJoin.getRuntimeFilters();
                 filters.forEach(filter -> runtimeFilterTranslator
                         .createLegacyRuntimeFilter(filter, nestedLoopJoinNode, 
context));
                 if (filters.stream().anyMatch(filter -> filter.getType() == 
TRuntimeFilterType.BITMAP)) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
index a2fbc28ecd8..787a2bd8181 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
@@ -28,7 +28,6 @@ import org.apache.doris.nereids.trees.expressions.ExprId;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.Slot;
 import org.apache.doris.nereids.trees.expressions.SlotReference;
-import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin;
 import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
 import org.apache.doris.planner.CTEScanNode;
 import org.apache.doris.planner.DataStreamSink;
@@ -47,7 +46,6 @@ import com.google.common.collect.ImmutableMap;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 /**
  * translate runtime filter
@@ -58,11 +56,6 @@ public class RuntimeFilterTranslator {
 
     public RuntimeFilterTranslator(RuntimeFilterContext context) {
         this.context = context;
-        context.generatePhysicalHashJoinToRuntimeFilter();
-    }
-
-    public Set<RuntimeFilter> 
getRuntimeFilterOfHashJoinNode(AbstractPhysicalJoin join) {
-        return context.getRuntimeFilterOnHashJoinNode(join);
     }
 
     public RuntimeFilterContext getContext() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
index eb5767fc1f6..bf137e3c580 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
@@ -110,8 +110,6 @@ public class RuntimeFilterContext {
     // exprId to olap scan node slotRef because the slotRef will be changed 
when translating.
     private final Map<ExprId, SlotRef> exprIdToOlapScanNodeSlotRef = 
Maps.newHashMap();
 
-    private final Map<AbstractPhysicalJoin, Set<RuntimeFilter>> 
runtimeFilterOnHashJoinNode = Maps.newHashMap();
-
     // alias -> alias's child, if there's a key that is alias's child, the 
key-value will change by this way
     // Alias(A) = B, now B -> A in map, and encounter Alias(B) -> C, the kv 
will be C -> A.
     // you can see disjoint set data structure to learn the processing 
detailed.
@@ -191,19 +189,31 @@ public class RuntimeFilterContext {
     public void removeFilter(ExprId targetId, PhysicalHashJoin builderNode) {
         List<RuntimeFilter> filters = targetExprIdToFilter.get(targetId);
         if (filters != null) {
-            Iterator<RuntimeFilter> iter = filters.iterator();
-            while (iter.hasNext()) {
-                RuntimeFilter rf = iter.next();
+            Iterator<RuntimeFilter> filterIter = filters.iterator();
+            while (filterIter.hasNext()) {
+                RuntimeFilter rf = filterIter.next();
                 if (rf.getBuilderNode().equals(builderNode)) {
-                    builderNode.getRuntimeFilters().remove(rf);
-                    for (int i = 0; i < rf.getTargetSlots().size(); i++) {
-                        Slot targetSlot = rf.getTargetSlots().get(i);
+                    Iterator<Slot> targetSlotIter = 
rf.getTargetSlots().listIterator();
+                    Iterator<PhysicalRelation> targetScanIter = 
rf.getTargetScans().iterator();
+                    Iterator<Expression> targetExpressionIter = 
rf.getTargetExpressions().iterator();
+                    Slot targetSlot;
+                    PhysicalRelation targetScan;
+                    while (targetScanIter.hasNext() && 
targetSlotIter.hasNext() && targetExpressionIter.hasNext()) {
+                        targetExpressionIter.next();
+                        targetScan = targetScanIter.next();
+                        targetSlot = targetSlotIter.next();
                         if (targetSlot.getExprId().equals(targetId)) {
-                            
rf.getTargetScans().get(i).removeAppliedRuntimeFilter(rf);
+                            targetScan.removeAppliedRuntimeFilter(rf);
+                            targetExpressionIter.remove();
+                            targetScanIter.remove();
+                            targetSlotIter.remove();
                         }
                     }
-                    iter.remove();
-                    prunedRF.add(rf);
+                    if (rf.getTargetSlots().isEmpty()) {
+                        builderNode.getRuntimeFilters().remove(rf);
+                        filterIter.remove();
+                        prunedRF.add(rf);
+                    }
                 }
             }
         }
@@ -255,15 +265,6 @@ public class RuntimeFilterContext {
         return scanNodeOfLegacyRuntimeFilterTarget;
     }
 
-    public Set<RuntimeFilter> 
getRuntimeFilterOnHashJoinNode(AbstractPhysicalJoin join) {
-        return runtimeFilterOnHashJoinNode.getOrDefault(join, 
Collections.emptySet());
-    }
-
-    public void generatePhysicalHashJoinToRuntimeFilter() {
-        targetExprIdToFilter.values().forEach(filters -> 
filters.forEach(filter -> runtimeFilterOnHashJoinNode
-                .computeIfAbsent(filter.getBuilderNode(), k -> 
Sets.newHashSet()).add(filter)));
-    }
-
     public Map<ExprId, List<RuntimeFilter>> getTargetExprIdToFilter() {
         return targetExprIdToFilter;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
index d4c5a96d3c5..5a7fbecb6ec 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
@@ -158,7 +158,7 @@ public class RuntimeFilterGenerator extends 
PlanPostProcessor {
                             // the most right deep buildNode from 
rfsToPushDown is used as buildNode for pushDown rf
                             // since the srcExpr are the same, all buildNodes 
of rfToPushDown are in the same tree path
                             // the longest ancestors means its corresponding 
rf build node is the most right deep one.
-                            RuntimeFilter rightDeep = rfsToPushDown.get(0);
+                            List<RuntimeFilter> rightDeepRfs = 
Lists.newArrayList();
                             List<Plan> rightDeepAncestors = 
rfsToPushDown.get(0).getBuilderNode().getAncestors();
                             int rightDeepAncestorsSize = 
rightDeepAncestors.size();
                             RuntimeFilter leftTop = rfsToPushDown.get(0);
@@ -166,10 +166,15 @@ public class RuntimeFilterGenerator extends 
PlanPostProcessor {
                             for (RuntimeFilter rf : rfsToPushDown) {
                                 List<Plan> ancestors = 
rf.getBuilderNode().getAncestors();
                                 int currentAncestorsSize = ancestors.size();
-                                if (currentAncestorsSize > 
rightDeepAncestorsSize) {
-                                    rightDeep = rf;
-                                    rightDeepAncestorsSize = 
currentAncestorsSize;
-                                    rightDeepAncestors = ancestors;
+                                if (currentAncestorsSize >= 
rightDeepAncestorsSize) {
+                                    if (currentAncestorsSize == 
rightDeepAncestorsSize) {
+                                        rightDeepRfs.add(rf);
+                                    } else {
+                                        rightDeepAncestorsSize = 
currentAncestorsSize;
+                                        rightDeepAncestors = ancestors;
+                                        rightDeepRfs.clear();
+                                        rightDeepRfs.add(rf);
+                                    }
                                 }
                                 if (currentAncestorsSize < 
leftTopAncestorsSize) {
                                     leftTopAncestorsSize = 
currentAncestorsSize;
@@ -187,7 +192,7 @@ public class RuntimeFilterGenerator extends 
PlanPostProcessor {
                                 if (cursor instanceof AbstractPhysicalJoin) {
                                     AbstractPhysicalJoin cursorJoin = 
(AbstractPhysicalJoin) cursor;
                                     valid = 
(!RuntimeFilterGenerator.DENIED_JOIN_TYPES
-                                                    
.contains(cursorJoin.getJoinType())
+                                            .contains(cursorJoin.getJoinType())
                                             || cursorJoin.isMarkJoin()) && 
valid;
                                 }
                                 if (!valid) {
@@ -199,27 +204,29 @@ public class RuntimeFilterGenerator extends 
PlanPostProcessor {
                                 break;
                             }
 
-                            Expression rightDeepTargetExpressionOnCTE = null;
-                            int targetCount = 
rightDeep.getTargetExpressions().size();
-                            for (int i = 0; i < targetCount; i++) {
-                                PhysicalRelation rel = 
rightDeep.getTargetScans().get(i);
-                                if (rel instanceof PhysicalCTEConsumer
-                                        && ((PhysicalCTEConsumer) 
rel).getCteId().equals(cteId)) {
-                                    rightDeepTargetExpressionOnCTE = 
rightDeep.getTargetExpressions().get(i);
-                                    break;
+                            for (RuntimeFilter rfToPush : rightDeepRfs) {
+                                Expression rightDeepTargetExpressionOnCTE = 
null;
+                                int targetCount = 
rfToPush.getTargetExpressions().size();
+                                for (int i = 0; i < targetCount; i++) {
+                                    PhysicalRelation rel = 
rfToPush.getTargetScans().get(i);
+                                    if (rel instanceof PhysicalCTEConsumer
+                                            && ((PhysicalCTEConsumer) 
rel).getCteId().equals(cteId)) {
+                                        rightDeepTargetExpressionOnCTE = 
rfToPush.getTargetExpressions().get(i);
+                                        break;
+                                    }
                                 }
-                            }
 
-                            boolean pushedDown = 
doPushDownIntoCTEProducerInternal(
-                                    rightDeep,
-                                    rightDeepTargetExpressionOnCTE,
-                                    rfCtx,
-                                    rfCtx.getCteProduceMap().get(cteId)
-                                    );
-                            if (pushedDown) {
-                                rfCtx.removeFilter(
-                                        
rightDeepTargetExpressionOnCTE.getInputSlotExprIds().iterator().next(),
-                                        (PhysicalHashJoin) 
rightDeep.getBuilderNode());
+                                boolean pushedDown = 
doPushDownIntoCTEProducerInternal(
+                                        rfToPush,
+                                        rightDeepTargetExpressionOnCTE,
+                                        rfCtx,
+                                        rfCtx.getCteProduceMap().get(cteId)
+                                );
+                                if (pushedDown) {
+                                    rfCtx.removeFilter(
+                                            
rightDeepTargetExpressionOnCTE.getInputSlotExprIds().iterator().next(),
+                                            (PhysicalHashJoin) 
rfToPush.getBuilderNode());
+                                }
                             }
                         }
                     }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
index ce647427d21..f354ed5b02f 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
@@ -295,7 +295,7 @@ public class RuntimeFilterTest extends SSBTestBase {
                 .rewrite()
                 .implement();
         PhysicalPlan plan = checker.getPhysicalPlan();
-        new PlanPostProcessors(checker.getCascadesContext()).process(plan);
+        plan = new 
PlanPostProcessors(checker.getCascadesContext()).process(plan);
         System.out.println(plan.treeString());
         new PhysicalPlanTranslator(new 
PlanTranslatorContext(checker.getCascadesContext())).translatePlan(plan);
         RuntimeFilterContext context = 
checker.getCascadesContext().getRuntimeFilterContext();
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query95.out 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query95.out
index 6e12e33b54d..82a42a4795c 100644
--- 
a/regression-test/data/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query95.out
+++ 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query95.out
@@ -26,7 +26,7 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
 ----------------------hashJoin[INNER_JOIN] hashCondition=((ws1.ws_web_site_sk 
= web_site.web_site_sk)) otherCondition=() build RFs:RF5 
web_site_sk->[ws_web_site_sk]
 ------------------------hashJoin[INNER_JOIN] 
hashCondition=((ws1.ws_ship_addr_sk = customer_address.ca_address_sk)) 
otherCondition=() build RFs:RF4 ca_address_sk->[ws_ship_addr_sk]
 --------------------------hashJoin[INNER_JOIN] 
hashCondition=((ws1.ws_ship_date_sk = date_dim.d_date_sk)) otherCondition=() 
build RFs:RF3 d_date_sk->[ws_ship_date_sk]
-----------------------------hashJoin[RIGHT_SEMI_JOIN] 
hashCondition=((ws1.ws_order_number = web_returns.wr_order_number)) 
otherCondition=() build RFs:RF7 
ws_order_number->[ws_order_number,ws_order_number]
+----------------------------hashJoin[RIGHT_SEMI_JOIN] 
hashCondition=((ws1.ws_order_number = web_returns.wr_order_number)) 
otherCondition=() build RFs:RF2 ws_order_number->[wr_order_number];RF7 
ws_order_number->[ws_order_number,ws_order_number]
 ------------------------------PhysicalProject
 --------------------------------hashJoin[INNER_JOIN] 
hashCondition=((web_returns.wr_order_number = ws_wh.ws_order_number)) 
otherCondition=()
 ----------------------------------PhysicalDistribute[DistributionSpecHash]
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query95.out 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query95.out
index 262c9046490..bf426f81483 100644
--- 
a/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query95.out
+++ 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query95.out
@@ -26,7 +26,7 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
 ----------------------hashJoin[INNER_JOIN] hashCondition=((ws1.ws_web_site_sk 
= web_site.web_site_sk)) otherCondition=() build RFs:RF5 
web_site_sk->[ws_web_site_sk]
 ------------------------hashJoin[INNER_JOIN] 
hashCondition=((ws1.ws_ship_addr_sk = customer_address.ca_address_sk)) 
otherCondition=() build RFs:RF4 ca_address_sk->[ws_ship_addr_sk]
 --------------------------hashJoin[INNER_JOIN] 
hashCondition=((ws1.ws_ship_date_sk = date_dim.d_date_sk)) otherCondition=() 
build RFs:RF3 d_date_sk->[ws_ship_date_sk]
-----------------------------hashJoin[RIGHT_SEMI_JOIN] 
hashCondition=((ws1.ws_order_number = web_returns.wr_order_number)) 
otherCondition=() build RFs:RF7 
ws_order_number->[ws_order_number,ws_order_number]
+----------------------------hashJoin[RIGHT_SEMI_JOIN] 
hashCondition=((ws1.ws_order_number = web_returns.wr_order_number)) 
otherCondition=() build RFs:RF2 ws_order_number->[wr_order_number];RF7 
ws_order_number->[ws_order_number,ws_order_number]
 ------------------------------PhysicalProject
 --------------------------------hashJoin[INNER_JOIN] 
hashCondition=((web_returns.wr_order_number = ws_wh.ws_order_number)) 
otherCondition=() build RFs:RF1 wr_order_number->[ws_order_number]
 ----------------------------------PhysicalDistribute[DistributionSpecHash]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to