This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit f1fbfeba2f8846e45841235bd38822b2adcea6f9
Author: minghong <engle...@gmail.com>
AuthorDate: Wed Feb 21 15:53:23 2024 +0800

    [fix](nereids) removeRuntimeFilter() removes more than one RFs if there are 
different types of RF from the buildNode to the target  (#31197)
    
    this bug can be produced on tpcds 95, when set runtime filter type: min-max 
+ bloom
---
 .../processor/post/RuntimeFilterContext.java       | 31 ++++++++++++++++++++--
 .../processor/post/RuntimeFilterGenerator.java     |  4 +--
 .../processor/post/RuntimeFilterPruner.java        |  4 +--
 .../post/RuntimeFilterPrunerForExternalTable.java  |  2 +-
 .../no_stats_shape/query95.out                     | 26 +++++++++---------
 .../nereids_tpcds_shape_sf100_p0/shape/query95.out | 24 ++++++++---------
 .../no_stats_shape/query95.groovy                  |  2 +-
 .../shape/query95.groovy                           |  2 +-
 8 files changed, 61 insertions(+), 34 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
index bf137e3c580..c8ffffa12e5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
@@ -181,12 +181,12 @@ public class RuntimeFilterContext {
     }
 
     /**
-     * remove rf from builderNode to target
+     * remove the given target from runtime filters from builderNode to target 
with all runtime filter types
      *
      * @param targetId rf target
      * @param builderNode rf src
      */
-    public void removeFilter(ExprId targetId, PhysicalHashJoin builderNode) {
+    public void removeFilters(ExprId targetId, PhysicalHashJoin builderNode) {
         List<RuntimeFilter> filters = targetExprIdToFilter.get(targetId);
         if (filters != null) {
             Iterator<RuntimeFilter> filterIter = filters.iterator();
@@ -219,6 +219,33 @@ public class RuntimeFilterContext {
         }
     }
 
+    /**
+     * remove one target from rf, and if there is no target, remove the rf
+     */
+    public void removeFilter(RuntimeFilter rf, ExprId targetId) {
+        Iterator<Slot> targetSlotIter = rf.getTargetSlots().listIterator();
+        Iterator<PhysicalRelation> targetScanIter = 
rf.getTargetScans().iterator();
+        Iterator<Expression> targetExpressionIter = 
rf.getTargetExpressions().iterator();
+        Slot targetSlot;
+        PhysicalRelation targetScan;
+        while (targetScanIter.hasNext() && targetSlotIter.hasNext() && 
targetExpressionIter.hasNext()) {
+            targetExpressionIter.next();
+            targetScan = targetScanIter.next();
+            targetSlot = targetSlotIter.next();
+            if (targetSlot.getExprId().equals(targetId)) {
+                targetScan.removeAppliedRuntimeFilter(rf);
+                targetExpressionIter.remove();
+                targetScanIter.remove();
+                targetSlotIter.remove();
+            }
+        }
+        if (rf.getTargetSlots().isEmpty()) {
+            rf.getBuilderNode().getRuntimeFilters().remove(rf);
+            targetExprIdToFilter.get(targetId).remove(rf);
+            prunedRF.add(rf);
+        }
+    }
+
     public void setTargetsOnScanNode(PhysicalRelation relation, Slot slot) {
         this.targetOnOlapScanNodeMap.computeIfAbsent(relation, k -> 
Lists.newArrayList()).add(slot);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
index 5a7fbecb6ec..f7a7c166bd7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
@@ -224,8 +224,8 @@ public class RuntimeFilterGenerator extends 
PlanPostProcessor {
                                 );
                                 if (pushedDown) {
                                     rfCtx.removeFilter(
-                                            
rightDeepTargetExpressionOnCTE.getInputSlotExprIds().iterator().next(),
-                                            (PhysicalHashJoin) 
rfToPush.getBuilderNode());
+                                            rfToPush,
+                                            
rightDeepTargetExpressionOnCTE.getInputSlotExprIds().iterator().next());
                                 }
                             }
                         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPruner.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPruner.java
index 3d5c9752ad9..4efafe3af90 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPruner.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPruner.java
@@ -147,7 +147,7 @@ public class RuntimeFilterPruner extends PlanPostProcessor {
                     
outputExprIdOfExpandTargets.addAll(expand.target2.getOutputExprIds());
                     rfContext.getTargetExprIdByFilterJoin(join)
                             .stream().filter(exprId -> 
outputExprIdOfExpandTargets.contains(exprId))
-                            .forEach(exprId -> rfContext.removeFilter(exprId, 
join));
+                            .forEach(exprId -> rfContext.removeFilters(exprId, 
join));
                 }
             }
             RuntimeFilterContext.EffectiveSrcType childType =
@@ -163,7 +163,7 @@ public class RuntimeFilterPruner extends PlanPostProcessor {
                     }
                 }
                 if (!isEffective) {
-                    exprIds.stream().forEach(exprId -> 
rfContext.removeFilter(exprId, join));
+                    exprIds.stream().forEach(exprId -> 
rfContext.removeFilters(exprId, join));
                 }
             }
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPrunerForExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPrunerForExternalTable.java
index 0a0cfe04ec6..2aa39a588c4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPrunerForExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPrunerForExternalTable.java
@@ -52,7 +52,7 @@ public class RuntimeFilterPrunerForExternalTable extends 
PlanPostProcessor {
                 for (int i = 0; i < rf.getTargetScans().size(); i++) {
                     PhysicalRelation scan = rf.getTargetScans().get(i);
                     if (canPrune(scan, joinAncestors)) {
-                        
rfCtx.removeFilter(rf.getTargetSlots().get(i).getExprId(), (PhysicalHashJoin) 
join);
+                        
rfCtx.removeFilters(rf.getTargetSlots().get(i).getExprId(), (PhysicalHashJoin) 
join);
                     }
                 }
             }
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query95.out 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query95.out
index bf426f81483..765c019e5d4 100644
--- 
a/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query95.out
+++ 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query95.out
@@ -3,13 +3,13 @@
 PhysicalCteAnchor ( cteId=CTEId#0 )
 --PhysicalCteProducer ( cteId=CTEId#0 )
 ----PhysicalProject
-------hashJoin[INNER_JOIN] hashCondition=((ws1.ws_order_number = 
ws2.ws_order_number)) otherCondition=(( not (ws_warehouse_sk = 
ws_warehouse_sk))) build RFs:RF0 ws_order_number->[ws_order_number]
+------hashJoin[INNER_JOIN] hashCondition=((ws1.ws_order_number = 
ws2.ws_order_number)) otherCondition=(( not (ws_warehouse_sk = 
ws_warehouse_sk))) build RFs:RF0 ws_order_number->[ws_order_number];RF1 
ws_order_number->[ws_order_number]
 --------PhysicalDistribute[DistributionSpecHash]
 ----------PhysicalProject
-------------PhysicalOlapScan[web_sales] apply RFs: RF0 RF7
+------------PhysicalOlapScan[web_sales] apply RFs: RF0 RF1 RF14 RF15
 --------PhysicalDistribute[DistributionSpecHash]
 ----------PhysicalProject
-------------PhysicalOlapScan[web_sales] apply RFs: RF7
+------------PhysicalOlapScan[web_sales] apply RFs: RF14 RF15
 --PhysicalResultSink
 ----PhysicalTopN[MERGE_SORT]
 ------PhysicalTopN[LOCAL_SORT]
@@ -19,25 +19,25 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
 --------------hashAgg[GLOBAL]
 ----------------hashAgg[LOCAL]
 ------------------PhysicalProject
---------------------hashJoin[RIGHT_SEMI_JOIN] 
hashCondition=((ws1.ws_order_number = ws_wh.ws_order_number)) otherCondition=() 
build RFs:RF6 ws_order_number->[ws_order_number]
+--------------------hashJoin[RIGHT_SEMI_JOIN] 
hashCondition=((ws1.ws_order_number = ws_wh.ws_order_number)) otherCondition=() 
build RFs:RF12 ws_order_number->[ws_order_number];RF13 
ws_order_number->[ws_order_number]
 ----------------------PhysicalDistribute[DistributionSpecHash]
 ------------------------PhysicalProject
---------------------------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: RF6
-----------------------hashJoin[INNER_JOIN] hashCondition=((ws1.ws_web_site_sk 
= web_site.web_site_sk)) otherCondition=() build RFs:RF5 
web_site_sk->[ws_web_site_sk]
-------------------------hashJoin[INNER_JOIN] 
hashCondition=((ws1.ws_ship_addr_sk = customer_address.ca_address_sk)) 
otherCondition=() build RFs:RF4 ca_address_sk->[ws_ship_addr_sk]
---------------------------hashJoin[INNER_JOIN] 
hashCondition=((ws1.ws_ship_date_sk = date_dim.d_date_sk)) otherCondition=() 
build RFs:RF3 d_date_sk->[ws_ship_date_sk]
-----------------------------hashJoin[RIGHT_SEMI_JOIN] 
hashCondition=((ws1.ws_order_number = web_returns.wr_order_number)) 
otherCondition=() build RFs:RF2 ws_order_number->[wr_order_number];RF7 
ws_order_number->[ws_order_number,ws_order_number]
+--------------------------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: 
RF12 RF13
+----------------------hashJoin[INNER_JOIN] hashCondition=((ws1.ws_web_site_sk 
= web_site.web_site_sk)) otherCondition=() build RFs:RF10 
web_site_sk->[ws_web_site_sk];RF11 web_site_sk->[ws_web_site_sk]
+------------------------hashJoin[INNER_JOIN] 
hashCondition=((ws1.ws_ship_addr_sk = customer_address.ca_address_sk)) 
otherCondition=() build RFs:RF8 ca_address_sk->[ws_ship_addr_sk];RF9 
ca_address_sk->[ws_ship_addr_sk]
+--------------------------hashJoin[INNER_JOIN] 
hashCondition=((ws1.ws_ship_date_sk = date_dim.d_date_sk)) otherCondition=() 
build RFs:RF6 d_date_sk->[ws_ship_date_sk];RF7 d_date_sk->[ws_ship_date_sk]
+----------------------------hashJoin[RIGHT_SEMI_JOIN] 
hashCondition=((ws1.ws_order_number = web_returns.wr_order_number)) 
otherCondition=() build RFs:RF4 ws_order_number->[wr_order_number];RF5 
ws_order_number->[wr_order_number];RF14 
ws_order_number->[ws_order_number,ws_order_number];RF15 
ws_order_number->[ws_order_number,ws_order_number]
 ------------------------------PhysicalProject
---------------------------------hashJoin[INNER_JOIN] 
hashCondition=((web_returns.wr_order_number = ws_wh.ws_order_number)) 
otherCondition=() build RFs:RF1 wr_order_number->[ws_order_number]
+--------------------------------hashJoin[INNER_JOIN] 
hashCondition=((web_returns.wr_order_number = ws_wh.ws_order_number)) 
otherCondition=() build RFs:RF2 wr_order_number->[ws_order_number];RF3 
wr_order_number->[ws_order_number]
 ----------------------------------PhysicalDistribute[DistributionSpecHash]
 ------------------------------------PhysicalProject
---------------------------------------PhysicalCteConsumer ( cteId=CTEId#0 ) 
apply RFs: RF1
+--------------------------------------PhysicalCteConsumer ( cteId=CTEId#0 ) 
apply RFs: RF2 RF3
 ----------------------------------PhysicalDistribute[DistributionSpecHash]
 ------------------------------------PhysicalProject
---------------------------------------PhysicalOlapScan[web_returns] apply RFs: 
RF2
+--------------------------------------PhysicalOlapScan[web_returns] apply RFs: 
RF4 RF5
 ------------------------------PhysicalDistribute[DistributionSpecHash]
 --------------------------------PhysicalProject
-----------------------------------PhysicalOlapScan[web_sales] apply RFs: RF3 
RF4 RF5
+----------------------------------PhysicalOlapScan[web_sales] apply RFs: RF6 
RF7 RF8 RF9 RF10 RF11
 ----------------------------PhysicalDistribute[DistributionSpecReplicated]
 ------------------------------PhysicalProject
 --------------------------------filter((date_dim.d_date <= '1999-04-02') and 
(date_dim.d_date >= '1999-02-01'))
diff --git 
a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query95.out 
b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query95.out
index a2c6bccca1e..17c36c6120e 100644
--- a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query95.out
+++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query95.out
@@ -3,13 +3,13 @@
 PhysicalCteAnchor ( cteId=CTEId#0 )
 --PhysicalCteProducer ( cteId=CTEId#0 )
 ----PhysicalProject
-------hashJoin[INNER_JOIN] hashCondition=((ws1.ws_order_number = 
ws2.ws_order_number)) otherCondition=(( not (ws_warehouse_sk = 
ws_warehouse_sk))) build RFs:RF0 ws_order_number->[ws_order_number]
+------hashJoin[INNER_JOIN] hashCondition=((ws1.ws_order_number = 
ws2.ws_order_number)) otherCondition=(( not (ws_warehouse_sk = 
ws_warehouse_sk))) build RFs:RF0 ws_order_number->[ws_order_number];RF1 
ws_order_number->[ws_order_number]
 --------PhysicalDistribute[DistributionSpecHash]
 ----------PhysicalProject
-------------PhysicalOlapScan[web_sales] apply RFs: RF0 RF7
+------------PhysicalOlapScan[web_sales] apply RFs: RF0 RF1 RF14 RF15
 --------PhysicalDistribute[DistributionSpecHash]
 ----------PhysicalProject
-------------PhysicalOlapScan[web_sales] apply RFs: RF7
+------------PhysicalOlapScan[web_sales] apply RFs: RF14 RF15
 --PhysicalResultSink
 ----PhysicalTopN[MERGE_SORT]
 ------PhysicalTopN[LOCAL_SORT]
@@ -19,25 +19,25 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
 --------------hashAgg[GLOBAL]
 ----------------hashAgg[LOCAL]
 ------------------PhysicalProject
---------------------hashJoin[RIGHT_SEMI_JOIN] 
hashCondition=((ws1.ws_order_number = web_returns.wr_order_number)) 
otherCondition=() build RFs:RF6 
ws_order_number->[ws_order_number,wr_order_number]
+--------------------hashJoin[RIGHT_SEMI_JOIN] 
hashCondition=((ws1.ws_order_number = web_returns.wr_order_number)) 
otherCondition=() build RFs:RF12 
ws_order_number->[ws_order_number,wr_order_number];RF13 
ws_order_number->[ws_order_number,wr_order_number]
 ----------------------PhysicalProject
-------------------------hashJoin[INNER_JOIN] 
hashCondition=((web_returns.wr_order_number = ws_wh.ws_order_number)) 
otherCondition=() build RFs:RF5 wr_order_number->[ws_order_number]
+------------------------hashJoin[INNER_JOIN] 
hashCondition=((web_returns.wr_order_number = ws_wh.ws_order_number)) 
otherCondition=() build RFs:RF10 wr_order_number->[ws_order_number];RF11 
wr_order_number->[ws_order_number]
 --------------------------PhysicalDistribute[DistributionSpecHash]
 ----------------------------PhysicalProject
-------------------------------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: 
RF5 RF6
+------------------------------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: 
RF10 RF11 RF12 RF13
 --------------------------PhysicalDistribute[DistributionSpecHash]
 ----------------------------PhysicalProject
-------------------------------PhysicalOlapScan[web_returns] apply RFs: RF6
-----------------------hashJoin[RIGHT_SEMI_JOIN] 
hashCondition=((ws1.ws_order_number = ws_wh.ws_order_number)) otherCondition=() 
build RFs:RF7 ws_order_number->[ws_order_number,ws_order_number]
+------------------------------PhysicalOlapScan[web_returns] apply RFs: RF12 
RF13
+----------------------hashJoin[RIGHT_SEMI_JOIN] 
hashCondition=((ws1.ws_order_number = ws_wh.ws_order_number)) otherCondition=() 
build RFs:RF14 ws_order_number->[ws_order_number,ws_order_number];RF15 
ws_order_number->[ws_order_number,ws_order_number]
 ------------------------PhysicalDistribute[DistributionSpecHash]
 --------------------------PhysicalProject
 ----------------------------PhysicalCteConsumer ( cteId=CTEId#0 )
 ------------------------PhysicalDistribute[DistributionSpecHash]
---------------------------hashJoin[INNER_JOIN] 
hashCondition=((ws1.ws_web_site_sk = web_site.web_site_sk)) otherCondition=() 
build RFs:RF3 web_site_sk->[ws_web_site_sk]
-----------------------------hashJoin[INNER_JOIN] 
hashCondition=((ws1.ws_ship_date_sk = date_dim.d_date_sk)) otherCondition=() 
build RFs:RF2 d_date_sk->[ws_ship_date_sk]
-------------------------------hashJoin[INNER_JOIN] 
hashCondition=((ws1.ws_ship_addr_sk = customer_address.ca_address_sk)) 
otherCondition=() build RFs:RF1 ca_address_sk->[ws_ship_addr_sk]
+--------------------------hashJoin[INNER_JOIN] 
hashCondition=((ws1.ws_web_site_sk = web_site.web_site_sk)) otherCondition=() 
build RFs:RF6 web_site_sk->[ws_web_site_sk];RF7 web_site_sk->[ws_web_site_sk]
+----------------------------hashJoin[INNER_JOIN] 
hashCondition=((ws1.ws_ship_date_sk = date_dim.d_date_sk)) otherCondition=() 
build RFs:RF4 d_date_sk->[ws_ship_date_sk];RF5 d_date_sk->[ws_ship_date_sk]
+------------------------------hashJoin[INNER_JOIN] 
hashCondition=((ws1.ws_ship_addr_sk = customer_address.ca_address_sk)) 
otherCondition=() build RFs:RF2 ca_address_sk->[ws_ship_addr_sk];RF3 
ca_address_sk->[ws_ship_addr_sk]
 --------------------------------PhysicalProject
-----------------------------------PhysicalOlapScan[web_sales] apply RFs: RF1 
RF2 RF3
+----------------------------------PhysicalOlapScan[web_sales] apply RFs: RF2 
RF3 RF4 RF5 RF6 RF7
 --------------------------------PhysicalDistribute[DistributionSpecReplicated]
 ----------------------------------PhysicalProject
 ------------------------------------filter((customer_address.ca_state = 'NC'))
diff --git 
a/regression-test/suites/nereids_tpcds_shape_sf100_p0/no_stats_shape/query95.groovy
 
b/regression-test/suites/nereids_tpcds_shape_sf100_p0/no_stats_shape/query95.groovy
index dca29abc663..ae03c56f537 100644
--- 
a/regression-test/suites/nereids_tpcds_shape_sf100_p0/no_stats_shape/query95.groovy
+++ 
b/regression-test/suites/nereids_tpcds_shape_sf100_p0/no_stats_shape/query95.groovy
@@ -28,7 +28,7 @@ sql 'set enable_runtime_filter_prune=false'
     sql 'set parallel_pipeline_task_num=8'
     sql 'set forbid_unknown_col_stats=false'
     sql 'set enable_stats=false'
-    sql "set runtime_filter_type=8"
+    sql "set runtime_filter_type=12"
     sql 'set broadcast_row_count_limit = 30000000'
     sql 'set enable_nereids_timeout = false'
     sql 'SET enable_pipeline_engine = true'
diff --git 
a/regression-test/suites/nereids_tpcds_shape_sf100_p0/shape/query95.groovy 
b/regression-test/suites/nereids_tpcds_shape_sf100_p0/shape/query95.groovy
index b43bb33b48e..451ce8219f2 100644
--- a/regression-test/suites/nereids_tpcds_shape_sf100_p0/shape/query95.groovy
+++ b/regression-test/suites/nereids_tpcds_shape_sf100_p0/shape/query95.groovy
@@ -29,7 +29,7 @@ suite("query95") {
     sql 'set forbid_unknown_col_stats=true'
     sql 'set enable_nereids_timeout = false'
     sql 'set enable_runtime_filter_prune=false'
-    sql 'set runtime_filter_type=8'
+    sql 'set runtime_filter_type=12'
     def ds = """with ws_wh as
 (select ws1.ws_order_number,ws1.ws_warehouse_sk wh1,ws2.ws_warehouse_sk wh2
  from web_sales ws1,web_sales ws2


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to