This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 197869e5f64 branch-3.0: [opt](nereids) optimize limit on distinct aggregate #47570 (#47816) 197869e5f64 is described below commit 197869e5f64d3e50684acb84753361e9e65a6a68 Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Mon Feb 24 17:56:45 2025 +0800 branch-3.0: [opt](nereids) optimize limit on distinct aggregate #47570 (#47816) Cherry-picked from #47570 Co-authored-by: minghong <zhoumingh...@selectdb.com> --- .../glue/translator/PhysicalPlanTranslator.java | 20 ++++- .../nereids/rules/rewrite/LimitAggToTopNAgg.java | 12 ++- .../nereids/trees/plans/algebra/Aggregate.java | 6 ++ .../trees/plans/logical/LogicalAggregate.java | 7 +- .../plans/physical/PhysicalHashAggregate.java | 2 + .../PushDownLimitDistinctThroughJoinTest.java | 2 +- .../push_down_limit_distinct_through_join.out | Bin 543 -> 442 bytes .../data/nereids_tpch_p0/tpch/push_topn_to_agg.out | Bin 158 -> 725 bytes .../nereids_tpch_p0/tpch/push_topn_to_agg.groovy | 82 +++++++++++++++++++++ 9 files changed, 119 insertions(+), 12 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index c4d015f08eb..7336d244957 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -106,6 +106,7 @@ import org.apache.doris.nereids.trees.plans.JoinType; import org.apache.doris.nereids.trees.plans.PartitionTopnPhase; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PreAggStatus; +import org.apache.doris.nereids.trees.plans.algebra.Aggregate; import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin; import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort; import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows; @@ -1821,8 +1822,23 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla PlanNode child = inputFragment.getPlanRoot(); if (physicalLimit.getPhase().isLocal()) { - child.setLimit(MergeLimits.mergeLimit(physicalLimit.getLimit(), physicalLimit.getOffset(), - child.getLimit())); + long newLimit = MergeLimits.mergeLimit(physicalLimit.getLimit(), physicalLimit.getOffset(), + child.getLimit()); + child.setLimit(newLimit); + if (newLimit != -1 + && child instanceof AggregationNode && physicalLimit.child() instanceof PhysicalHashAggregate) { + PhysicalHashAggregate<? extends Plan> agg + = (PhysicalHashAggregate<? extends Plan>) physicalLimit.child(); + if (agg.isDistinct()) { + if (agg.child(0) instanceof PhysicalDistribute + && agg.child(0).child(0) instanceof PhysicalHashAggregate + && ((Aggregate) agg.child(0).child(0)).isDistinct() + && child.getChild(0) instanceof ExchangeNode + && child.getChild(0).getChild(0) instanceof AggregationNode) { + child.getChild(0).getChild(0).setLimit(newLimit); + } + } + } } else if (physicalLimit.getPhase().isGlobal()) { if (!(child instanceof ExchangeNode)) { ExchangeNode exchangeNode = new ExchangeNode(context.nextPlanNodeId(), child); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/LimitAggToTopNAgg.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/LimitAggToTopNAgg.java index 049709dd23a..c3da664b517 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/LimitAggToTopNAgg.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/LimitAggToTopNAgg.java @@ -62,7 +62,8 @@ public class LimitAggToTopNAgg implements RewriteRuleFactory { >= limit.getLimit() + limit.getOffset()) .when(limit -> { LogicalAggregate<? extends Plan> agg = limit.child(); - return !agg.getGroupByExpressions().isEmpty() && !agg.getSourceRepeat().isPresent(); + return !agg.getGroupByExpressions().isEmpty() && !agg.getSourceRepeat().isPresent() + && !agg.isDistinct(); }) .then(limit -> { LogicalAggregate<? extends Plan> agg = limit.child(); @@ -77,7 +78,8 @@ public class LimitAggToTopNAgg implements RewriteRuleFactory { >= limit.getLimit() + limit.getOffset()) .when(limit -> { LogicalAggregate<? extends Plan> agg = limit.child().child(); - return !agg.getGroupByExpressions().isEmpty() && !agg.getSourceRepeat().isPresent(); + return !agg.getGroupByExpressions().isEmpty() && !agg.getSourceRepeat().isPresent() + && !agg.isDistinct(); }) .then(limit -> { LogicalProject<? extends Plan> project = limit.child(); @@ -96,7 +98,8 @@ public class LimitAggToTopNAgg implements RewriteRuleFactory { >= topn.getLimit() + topn.getOffset()) .when(topn -> { LogicalAggregate<? extends Plan> agg = topn.child(); - return !agg.getGroupByExpressions().isEmpty() && !agg.getSourceRepeat().isPresent(); + return !agg.getGroupByExpressions().isEmpty() && !agg.getSourceRepeat().isPresent() + && !agg.isDistinct(); }) .then(topn -> { LogicalAggregate<? extends Plan> agg = topn.child(); @@ -117,7 +120,8 @@ public class LimitAggToTopNAgg implements RewriteRuleFactory { >= topn.getLimit() + topn.getOffset()) .when(topn -> { LogicalAggregate<? extends Plan> agg = topn.child().child(); - return !agg.getGroupByExpressions().isEmpty() && !agg.getSourceRepeat().isPresent(); + return !agg.getGroupByExpressions().isEmpty() && !agg.getSourceRepeat().isPresent() + && !agg.isDistinct(); }) .then(topn -> { LogicalTopN originTopn = topn; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Aggregate.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Aggregate.java index d29f7f8daea..7a283c740e5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Aggregate.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Aggregate.java @@ -19,6 +19,7 @@ package org.apache.doris.nereids.trees.plans.algebra; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.UnaryPlan; @@ -100,4 +101,9 @@ public interface Aggregate<CHILD_TYPE extends Plan> extends UnaryPlan<CHILD_TYPE } return false; } + + default boolean isDistinct() { + return getOutputExpressions().stream().allMatch(e -> e instanceof Slot) + && getGroupByExpressions().stream().allMatch(e -> e instanceof Slot); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java index df8f886451f..d96dd8a15c2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java @@ -151,10 +151,12 @@ public class LogicalAggregate<CHILD_TYPE extends Plan> this.sourceRepeat = Objects.requireNonNull(sourceRepeat, "sourceRepeat cannot be null"); } + @Override public List<Expression> getGroupByExpressions() { return groupByExpressions; } + @Override public List<NamedExpression> getOutputExpressions() { return outputExpressions; } @@ -167,11 +169,6 @@ public class LogicalAggregate<CHILD_TYPE extends Plan> return sourceRepeat; } - public boolean isDistinct() { - return outputExpressions.stream().allMatch(e -> e instanceof Slot) - && groupByExpressions.stream().allMatch(e -> e instanceof Slot); - } - public boolean isGenerated() { return generated; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashAggregate.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashAggregate.java index 7ed39fed8b6..e8749dcee22 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashAggregate.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashAggregate.java @@ -135,10 +135,12 @@ public class PhysicalHashAggregate<CHILD_TYPE extends Plan> extends PhysicalUnar this.requireProperties = Objects.requireNonNull(requireProperties, "requireProperties cannot be null"); } + @Override public List<Expression> getGroupByExpressions() { return groupByExpressions; } + @Override public List<NamedExpression> getOutputExpressions() { return outputExpressions; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/PushDownLimitDistinctThroughJoinTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/PushDownLimitDistinctThroughJoinTest.java index 910e4ce669a..bce9a2cd207 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/PushDownLimitDistinctThroughJoinTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/PushDownLimitDistinctThroughJoinTest.java @@ -134,7 +134,7 @@ class PushDownLimitDistinctThroughJoinTest extends TestWithFeService implements .rewrite() .matches( logicalProject(logicalJoin( - logicalTopN(logicalAggregate(logicalProject(logicalOlapScan()))) + logicalLimit(logicalAggregate(logicalProject(logicalOlapScan()))) .when(l -> l.getLimit() == 10), logicalProject(logicalOlapScan()) )) diff --git a/regression-test/data/nereids_rules_p0/push_down_limit_distinct/push_down_limit_distinct_through_join.out b/regression-test/data/nereids_rules_p0/push_down_limit_distinct/push_down_limit_distinct_through_join.out index 714ce630f16..9ffe9520387 100644 Binary files a/regression-test/data/nereids_rules_p0/push_down_limit_distinct/push_down_limit_distinct_through_join.out and b/regression-test/data/nereids_rules_p0/push_down_limit_distinct/push_down_limit_distinct_through_join.out differ diff --git a/regression-test/data/nereids_tpch_p0/tpch/push_topn_to_agg.out b/regression-test/data/nereids_tpch_p0/tpch/push_topn_to_agg.out index af01d68ffc6..0008027785a 100644 Binary files a/regression-test/data/nereids_tpch_p0/tpch/push_topn_to_agg.out and b/regression-test/data/nereids_tpch_p0/tpch/push_topn_to_agg.out differ diff --git a/regression-test/suites/nereids_tpch_p0/tpch/push_topn_to_agg.groovy b/regression-test/suites/nereids_tpch_p0/tpch/push_topn_to_agg.groovy index 5ae587910b6..844d76b2194 100644 --- a/regression-test/suites/nereids_tpch_p0/tpch/push_topn_to_agg.groovy +++ b/regression-test/suites/nereids_tpch_p0/tpch/push_topn_to_agg.groovy @@ -101,4 +101,86 @@ suite("push_topn_to_agg") { sql "select sum(ps_availqty), ps_partkey, ps_suppkey from partsupp group by ps_partkey, ps_suppkey order by ps_partkey, ps_suppkey limit 18;" contains("sortByGroupKey:true") } + + qt_shape_distinct_agg "explain shape plan select o_custkey, o_shippriority from orders group by o_custkey, o_shippriority limit 1"; + + qt_shape_distinct "explain shape plan select distinct o_custkey from orders group by o_custkey limit 1" + + explain { + sql "select o_custkey, o_shippriority from orders group by o_custkey, o_shippriority limit 1" + multiContains("limit: 1", 3) + } + /** + "limit 1" in 3 plan nodes: + 4:VEXCHANGE/ 3:VAGGREGATE (merge finalize) / 1:VAGGREGATE (update serialize) ++--------------------------------------------------------------------------------+ +| PLAN FRAGMENT 0 | +| OUTPUT EXPRS: | +| o_custkey[#11] | +| PARTITION: UNPARTITIONED | +| | +| HAS_COLO_PLAN_NODE: false | +| | +| VRESULT SINK | +| MYSQL_PROTOCAL | +| | +| 4:VEXCHANGE | +| offset: 0 | +| limit: 1 | +| distribute expr lists: o_custkey[#11] | +| | +| PLAN FRAGMENT 1 | +| | +| PARTITION: HASH_PARTITIONED: o_custkey[#10] | +| | +| HAS_COLO_PLAN_NODE: true | +| | +| STREAM DATA SINK | +| EXCHANGE ID: 04 | +| UNPARTITIONED | +| | +| 3:VAGGREGATE (merge finalize)(233) | +| | group by: o_custkey[#10] | +| | sortByGroupKey:false | +| | cardinality=50,000 | +| | limit: 1 | +| | distribute expr lists: o_custkey[#10] | +| | | +| 2:VEXCHANGE | +| offset: 0 | +| distribute expr lists: | +| | +| PLAN FRAGMENT 2 | +| | +| PARTITION: HASH_PARTITIONED: O_ORDERKEY[#0] | +| | +| HAS_COLO_PLAN_NODE: false | +| | +| STREAM DATA SINK | +| EXCHANGE ID: 02 | +| HASH_PARTITIONED: o_custkey[#10] | +| | +| 1:VAGGREGATE (update serialize)(223) | +| | STREAMING | +| | group by: o_custkey[#9] | +| | sortByGroupKey:false | +| | cardinality=50,000 | +| | limit: 1 | +| | distribute expr lists: | +| | | +| 0:VOlapScanNode(213) | +| TABLE: regression_test_nereids_tpch_p0.orders(orders), PREAGGREGATION: ON | +| partitions=1/1 (orders) | +| tablets=3/3, tabletList=1738740551790,1738740551792,1738740551794 | +| cardinality=150000, avgRowSize=165.10652, numNodes=1 | +| pushAggOp=NONE | +| final projections: O_CUSTKEY[#1] | +| final project output tuple id: 1 | +| | +| | +| | +| ========== STATISTICS ========== | +| planed with unknown column statistics | ++--------------------------------------------------------------------------------+ + **/ } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org