This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit f1fbfeba2f8846e45841235bd38822b2adcea6f9 Author: minghong <engle...@gmail.com> AuthorDate: Wed Feb 21 15:53:23 2024 +0800 [fix](nereids) removeRuntimeFilter() removes more than one RFs if there are different types of RF from the buildNode to the target (#31197) this bug can be produced on tpcds 95, when set runtime filter type: min-max + bloom --- .../processor/post/RuntimeFilterContext.java | 31 ++++++++++++++++++++-- .../processor/post/RuntimeFilterGenerator.java | 4 +-- .../processor/post/RuntimeFilterPruner.java | 4 +-- .../post/RuntimeFilterPrunerForExternalTable.java | 2 +- .../no_stats_shape/query95.out | 26 +++++++++--------- .../nereids_tpcds_shape_sf100_p0/shape/query95.out | 24 ++++++++--------- .../no_stats_shape/query95.groovy | 2 +- .../shape/query95.groovy | 2 +- 8 files changed, 61 insertions(+), 34 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java index bf137e3c580..c8ffffa12e5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java @@ -181,12 +181,12 @@ public class RuntimeFilterContext { } /** - * remove rf from builderNode to target + * remove the given target from runtime filters from builderNode to target with all runtime filter types * * @param targetId rf target * @param builderNode rf src */ - public void removeFilter(ExprId targetId, PhysicalHashJoin builderNode) { + public void removeFilters(ExprId targetId, PhysicalHashJoin builderNode) { List<RuntimeFilter> filters = targetExprIdToFilter.get(targetId); if (filters != null) { Iterator<RuntimeFilter> filterIter = filters.iterator(); @@ -219,6 +219,33 @@ public class RuntimeFilterContext { } } + /** + * remove one target from rf, and if there is no target, remove the rf + */ + public void removeFilter(RuntimeFilter rf, ExprId targetId) { + Iterator<Slot> targetSlotIter = rf.getTargetSlots().listIterator(); + Iterator<PhysicalRelation> targetScanIter = rf.getTargetScans().iterator(); + Iterator<Expression> targetExpressionIter = rf.getTargetExpressions().iterator(); + Slot targetSlot; + PhysicalRelation targetScan; + while (targetScanIter.hasNext() && targetSlotIter.hasNext() && targetExpressionIter.hasNext()) { + targetExpressionIter.next(); + targetScan = targetScanIter.next(); + targetSlot = targetSlotIter.next(); + if (targetSlot.getExprId().equals(targetId)) { + targetScan.removeAppliedRuntimeFilter(rf); + targetExpressionIter.remove(); + targetScanIter.remove(); + targetSlotIter.remove(); + } + } + if (rf.getTargetSlots().isEmpty()) { + rf.getBuilderNode().getRuntimeFilters().remove(rf); + targetExprIdToFilter.get(targetId).remove(rf); + prunedRF.add(rf); + } + } + public void setTargetsOnScanNode(PhysicalRelation relation, Slot slot) { this.targetOnOlapScanNodeMap.computeIfAbsent(relation, k -> Lists.newArrayList()).add(slot); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java index 5a7fbecb6ec..f7a7c166bd7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java @@ -224,8 +224,8 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { ); if (pushedDown) { rfCtx.removeFilter( - rightDeepTargetExpressionOnCTE.getInputSlotExprIds().iterator().next(), - (PhysicalHashJoin) rfToPush.getBuilderNode()); + rfToPush, + rightDeepTargetExpressionOnCTE.getInputSlotExprIds().iterator().next()); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPruner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPruner.java index 3d5c9752ad9..4efafe3af90 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPruner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPruner.java @@ -147,7 +147,7 @@ public class RuntimeFilterPruner extends PlanPostProcessor { outputExprIdOfExpandTargets.addAll(expand.target2.getOutputExprIds()); rfContext.getTargetExprIdByFilterJoin(join) .stream().filter(exprId -> outputExprIdOfExpandTargets.contains(exprId)) - .forEach(exprId -> rfContext.removeFilter(exprId, join)); + .forEach(exprId -> rfContext.removeFilters(exprId, join)); } } RuntimeFilterContext.EffectiveSrcType childType = @@ -163,7 +163,7 @@ public class RuntimeFilterPruner extends PlanPostProcessor { } } if (!isEffective) { - exprIds.stream().forEach(exprId -> rfContext.removeFilter(exprId, join)); + exprIds.stream().forEach(exprId -> rfContext.removeFilters(exprId, join)); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPrunerForExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPrunerForExternalTable.java index 0a0cfe04ec6..2aa39a588c4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPrunerForExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPrunerForExternalTable.java @@ -52,7 +52,7 @@ public class RuntimeFilterPrunerForExternalTable extends PlanPostProcessor { for (int i = 0; i < rf.getTargetScans().size(); i++) { PhysicalRelation scan = rf.getTargetScans().get(i); if (canPrune(scan, joinAncestors)) { - rfCtx.removeFilter(rf.getTargetSlots().get(i).getExprId(), (PhysicalHashJoin) join); + rfCtx.removeFilters(rf.getTargetSlots().get(i).getExprId(), (PhysicalHashJoin) join); } } } diff --git a/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query95.out b/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query95.out index bf426f81483..765c019e5d4 100644 --- a/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query95.out +++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query95.out @@ -3,13 +3,13 @@ PhysicalCteAnchor ( cteId=CTEId#0 ) --PhysicalCteProducer ( cteId=CTEId#0 ) ----PhysicalProject -------hashJoin[INNER_JOIN] hashCondition=((ws1.ws_order_number = ws2.ws_order_number)) otherCondition=(( not (ws_warehouse_sk = ws_warehouse_sk))) build RFs:RF0 ws_order_number->[ws_order_number] +------hashJoin[INNER_JOIN] hashCondition=((ws1.ws_order_number = ws2.ws_order_number)) otherCondition=(( not (ws_warehouse_sk = ws_warehouse_sk))) build RFs:RF0 ws_order_number->[ws_order_number];RF1 ws_order_number->[ws_order_number] --------PhysicalDistribute[DistributionSpecHash] ----------PhysicalProject -------------PhysicalOlapScan[web_sales] apply RFs: RF0 RF7 +------------PhysicalOlapScan[web_sales] apply RFs: RF0 RF1 RF14 RF15 --------PhysicalDistribute[DistributionSpecHash] ----------PhysicalProject -------------PhysicalOlapScan[web_sales] apply RFs: RF7 +------------PhysicalOlapScan[web_sales] apply RFs: RF14 RF15 --PhysicalResultSink ----PhysicalTopN[MERGE_SORT] ------PhysicalTopN[LOCAL_SORT] @@ -19,25 +19,25 @@ PhysicalCteAnchor ( cteId=CTEId#0 ) --------------hashAgg[GLOBAL] ----------------hashAgg[LOCAL] ------------------PhysicalProject ---------------------hashJoin[RIGHT_SEMI_JOIN] hashCondition=((ws1.ws_order_number = ws_wh.ws_order_number)) otherCondition=() build RFs:RF6 ws_order_number->[ws_order_number] +--------------------hashJoin[RIGHT_SEMI_JOIN] hashCondition=((ws1.ws_order_number = ws_wh.ws_order_number)) otherCondition=() build RFs:RF12 ws_order_number->[ws_order_number];RF13 ws_order_number->[ws_order_number] ----------------------PhysicalDistribute[DistributionSpecHash] ------------------------PhysicalProject ---------------------------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: RF6 -----------------------hashJoin[INNER_JOIN] hashCondition=((ws1.ws_web_site_sk = web_site.web_site_sk)) otherCondition=() build RFs:RF5 web_site_sk->[ws_web_site_sk] -------------------------hashJoin[INNER_JOIN] hashCondition=((ws1.ws_ship_addr_sk = customer_address.ca_address_sk)) otherCondition=() build RFs:RF4 ca_address_sk->[ws_ship_addr_sk] ---------------------------hashJoin[INNER_JOIN] hashCondition=((ws1.ws_ship_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF3 d_date_sk->[ws_ship_date_sk] -----------------------------hashJoin[RIGHT_SEMI_JOIN] hashCondition=((ws1.ws_order_number = web_returns.wr_order_number)) otherCondition=() build RFs:RF2 ws_order_number->[wr_order_number];RF7 ws_order_number->[ws_order_number,ws_order_number] +--------------------------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: RF12 RF13 +----------------------hashJoin[INNER_JOIN] hashCondition=((ws1.ws_web_site_sk = web_site.web_site_sk)) otherCondition=() build RFs:RF10 web_site_sk->[ws_web_site_sk];RF11 web_site_sk->[ws_web_site_sk] +------------------------hashJoin[INNER_JOIN] hashCondition=((ws1.ws_ship_addr_sk = customer_address.ca_address_sk)) otherCondition=() build RFs:RF8 ca_address_sk->[ws_ship_addr_sk];RF9 ca_address_sk->[ws_ship_addr_sk] +--------------------------hashJoin[INNER_JOIN] hashCondition=((ws1.ws_ship_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF6 d_date_sk->[ws_ship_date_sk];RF7 d_date_sk->[ws_ship_date_sk] +----------------------------hashJoin[RIGHT_SEMI_JOIN] hashCondition=((ws1.ws_order_number = web_returns.wr_order_number)) otherCondition=() build RFs:RF4 ws_order_number->[wr_order_number];RF5 ws_order_number->[wr_order_number];RF14 ws_order_number->[ws_order_number,ws_order_number];RF15 ws_order_number->[ws_order_number,ws_order_number] ------------------------------PhysicalProject ---------------------------------hashJoin[INNER_JOIN] hashCondition=((web_returns.wr_order_number = ws_wh.ws_order_number)) otherCondition=() build RFs:RF1 wr_order_number->[ws_order_number] +--------------------------------hashJoin[INNER_JOIN] hashCondition=((web_returns.wr_order_number = ws_wh.ws_order_number)) otherCondition=() build RFs:RF2 wr_order_number->[ws_order_number];RF3 wr_order_number->[ws_order_number] ----------------------------------PhysicalDistribute[DistributionSpecHash] ------------------------------------PhysicalProject ---------------------------------------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: RF1 +--------------------------------------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: RF2 RF3 ----------------------------------PhysicalDistribute[DistributionSpecHash] ------------------------------------PhysicalProject ---------------------------------------PhysicalOlapScan[web_returns] apply RFs: RF2 +--------------------------------------PhysicalOlapScan[web_returns] apply RFs: RF4 RF5 ------------------------------PhysicalDistribute[DistributionSpecHash] --------------------------------PhysicalProject -----------------------------------PhysicalOlapScan[web_sales] apply RFs: RF3 RF4 RF5 +----------------------------------PhysicalOlapScan[web_sales] apply RFs: RF6 RF7 RF8 RF9 RF10 RF11 ----------------------------PhysicalDistribute[DistributionSpecReplicated] ------------------------------PhysicalProject --------------------------------filter((date_dim.d_date <= '1999-04-02') and (date_dim.d_date >= '1999-02-01')) diff --git a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query95.out b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query95.out index a2c6bccca1e..17c36c6120e 100644 --- a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query95.out +++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query95.out @@ -3,13 +3,13 @@ PhysicalCteAnchor ( cteId=CTEId#0 ) --PhysicalCteProducer ( cteId=CTEId#0 ) ----PhysicalProject -------hashJoin[INNER_JOIN] hashCondition=((ws1.ws_order_number = ws2.ws_order_number)) otherCondition=(( not (ws_warehouse_sk = ws_warehouse_sk))) build RFs:RF0 ws_order_number->[ws_order_number] +------hashJoin[INNER_JOIN] hashCondition=((ws1.ws_order_number = ws2.ws_order_number)) otherCondition=(( not (ws_warehouse_sk = ws_warehouse_sk))) build RFs:RF0 ws_order_number->[ws_order_number];RF1 ws_order_number->[ws_order_number] --------PhysicalDistribute[DistributionSpecHash] ----------PhysicalProject -------------PhysicalOlapScan[web_sales] apply RFs: RF0 RF7 +------------PhysicalOlapScan[web_sales] apply RFs: RF0 RF1 RF14 RF15 --------PhysicalDistribute[DistributionSpecHash] ----------PhysicalProject -------------PhysicalOlapScan[web_sales] apply RFs: RF7 +------------PhysicalOlapScan[web_sales] apply RFs: RF14 RF15 --PhysicalResultSink ----PhysicalTopN[MERGE_SORT] ------PhysicalTopN[LOCAL_SORT] @@ -19,25 +19,25 @@ PhysicalCteAnchor ( cteId=CTEId#0 ) --------------hashAgg[GLOBAL] ----------------hashAgg[LOCAL] ------------------PhysicalProject ---------------------hashJoin[RIGHT_SEMI_JOIN] hashCondition=((ws1.ws_order_number = web_returns.wr_order_number)) otherCondition=() build RFs:RF6 ws_order_number->[ws_order_number,wr_order_number] +--------------------hashJoin[RIGHT_SEMI_JOIN] hashCondition=((ws1.ws_order_number = web_returns.wr_order_number)) otherCondition=() build RFs:RF12 ws_order_number->[ws_order_number,wr_order_number];RF13 ws_order_number->[ws_order_number,wr_order_number] ----------------------PhysicalProject -------------------------hashJoin[INNER_JOIN] hashCondition=((web_returns.wr_order_number = ws_wh.ws_order_number)) otherCondition=() build RFs:RF5 wr_order_number->[ws_order_number] +------------------------hashJoin[INNER_JOIN] hashCondition=((web_returns.wr_order_number = ws_wh.ws_order_number)) otherCondition=() build RFs:RF10 wr_order_number->[ws_order_number];RF11 wr_order_number->[ws_order_number] --------------------------PhysicalDistribute[DistributionSpecHash] ----------------------------PhysicalProject -------------------------------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: RF5 RF6 +------------------------------PhysicalCteConsumer ( cteId=CTEId#0 ) apply RFs: RF10 RF11 RF12 RF13 --------------------------PhysicalDistribute[DistributionSpecHash] ----------------------------PhysicalProject -------------------------------PhysicalOlapScan[web_returns] apply RFs: RF6 -----------------------hashJoin[RIGHT_SEMI_JOIN] hashCondition=((ws1.ws_order_number = ws_wh.ws_order_number)) otherCondition=() build RFs:RF7 ws_order_number->[ws_order_number,ws_order_number] +------------------------------PhysicalOlapScan[web_returns] apply RFs: RF12 RF13 +----------------------hashJoin[RIGHT_SEMI_JOIN] hashCondition=((ws1.ws_order_number = ws_wh.ws_order_number)) otherCondition=() build RFs:RF14 ws_order_number->[ws_order_number,ws_order_number];RF15 ws_order_number->[ws_order_number,ws_order_number] ------------------------PhysicalDistribute[DistributionSpecHash] --------------------------PhysicalProject ----------------------------PhysicalCteConsumer ( cteId=CTEId#0 ) ------------------------PhysicalDistribute[DistributionSpecHash] ---------------------------hashJoin[INNER_JOIN] hashCondition=((ws1.ws_web_site_sk = web_site.web_site_sk)) otherCondition=() build RFs:RF3 web_site_sk->[ws_web_site_sk] -----------------------------hashJoin[INNER_JOIN] hashCondition=((ws1.ws_ship_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF2 d_date_sk->[ws_ship_date_sk] -------------------------------hashJoin[INNER_JOIN] hashCondition=((ws1.ws_ship_addr_sk = customer_address.ca_address_sk)) otherCondition=() build RFs:RF1 ca_address_sk->[ws_ship_addr_sk] +--------------------------hashJoin[INNER_JOIN] hashCondition=((ws1.ws_web_site_sk = web_site.web_site_sk)) otherCondition=() build RFs:RF6 web_site_sk->[ws_web_site_sk];RF7 web_site_sk->[ws_web_site_sk] +----------------------------hashJoin[INNER_JOIN] hashCondition=((ws1.ws_ship_date_sk = date_dim.d_date_sk)) otherCondition=() build RFs:RF4 d_date_sk->[ws_ship_date_sk];RF5 d_date_sk->[ws_ship_date_sk] +------------------------------hashJoin[INNER_JOIN] hashCondition=((ws1.ws_ship_addr_sk = customer_address.ca_address_sk)) otherCondition=() build RFs:RF2 ca_address_sk->[ws_ship_addr_sk];RF3 ca_address_sk->[ws_ship_addr_sk] --------------------------------PhysicalProject -----------------------------------PhysicalOlapScan[web_sales] apply RFs: RF1 RF2 RF3 +----------------------------------PhysicalOlapScan[web_sales] apply RFs: RF2 RF3 RF4 RF5 RF6 RF7 --------------------------------PhysicalDistribute[DistributionSpecReplicated] ----------------------------------PhysicalProject ------------------------------------filter((customer_address.ca_state = 'NC')) diff --git a/regression-test/suites/nereids_tpcds_shape_sf100_p0/no_stats_shape/query95.groovy b/regression-test/suites/nereids_tpcds_shape_sf100_p0/no_stats_shape/query95.groovy index dca29abc663..ae03c56f537 100644 --- a/regression-test/suites/nereids_tpcds_shape_sf100_p0/no_stats_shape/query95.groovy +++ b/regression-test/suites/nereids_tpcds_shape_sf100_p0/no_stats_shape/query95.groovy @@ -28,7 +28,7 @@ sql 'set enable_runtime_filter_prune=false' sql 'set parallel_pipeline_task_num=8' sql 'set forbid_unknown_col_stats=false' sql 'set enable_stats=false' - sql "set runtime_filter_type=8" + sql "set runtime_filter_type=12" sql 'set broadcast_row_count_limit = 30000000' sql 'set enable_nereids_timeout = false' sql 'SET enable_pipeline_engine = true' diff --git a/regression-test/suites/nereids_tpcds_shape_sf100_p0/shape/query95.groovy b/regression-test/suites/nereids_tpcds_shape_sf100_p0/shape/query95.groovy index b43bb33b48e..451ce8219f2 100644 --- a/regression-test/suites/nereids_tpcds_shape_sf100_p0/shape/query95.groovy +++ b/regression-test/suites/nereids_tpcds_shape_sf100_p0/shape/query95.groovy @@ -29,7 +29,7 @@ suite("query95") { sql 'set forbid_unknown_col_stats=true' sql 'set enable_nereids_timeout = false' sql 'set enable_runtime_filter_prune=false' - sql 'set runtime_filter_type=8' + sql 'set runtime_filter_type=12' def ds = """with ws_wh as (select ws1.ws_order_number,ws1.ws_warehouse_sk wh1,ws2.ws_warehouse_sk wh2 from web_sales ws1,web_sales ws2 --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org