This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 197869e5f64 branch-3.0: [opt](nereids) optimize limit on distinct 
aggregate #47570 (#47816)
197869e5f64 is described below

commit 197869e5f64d3e50684acb84753361e9e65a6a68
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Feb 24 17:56:45 2025 +0800

    branch-3.0: [opt](nereids) optimize limit on distinct aggregate #47570 
(#47816)
    
    Cherry-picked from #47570
    
    Co-authored-by: minghong <zhoumingh...@selectdb.com>
---
 .../glue/translator/PhysicalPlanTranslator.java    |  20 ++++-
 .../nereids/rules/rewrite/LimitAggToTopNAgg.java   |  12 ++-
 .../nereids/trees/plans/algebra/Aggregate.java     |   6 ++
 .../trees/plans/logical/LogicalAggregate.java      |   7 +-
 .../plans/physical/PhysicalHashAggregate.java      |   2 +
 .../PushDownLimitDistinctThroughJoinTest.java      |   2 +-
 .../push_down_limit_distinct_through_join.out      | Bin 543 -> 442 bytes
 .../data/nereids_tpch_p0/tpch/push_topn_to_agg.out | Bin 158 -> 725 bytes
 .../nereids_tpch_p0/tpch/push_topn_to_agg.groovy   |  82 +++++++++++++++++++++
 9 files changed, 119 insertions(+), 12 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index c4d015f08eb..7336d244957 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -106,6 +106,7 @@ import org.apache.doris.nereids.trees.plans.JoinType;
 import org.apache.doris.nereids.trees.plans.PartitionTopnPhase;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.PreAggStatus;
+import org.apache.doris.nereids.trees.plans.algebra.Aggregate;
 import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin;
 import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
@@ -1821,8 +1822,23 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         PlanNode child = inputFragment.getPlanRoot();
 
         if (physicalLimit.getPhase().isLocal()) {
-            child.setLimit(MergeLimits.mergeLimit(physicalLimit.getLimit(), 
physicalLimit.getOffset(),
-                    child.getLimit()));
+            long newLimit = MergeLimits.mergeLimit(physicalLimit.getLimit(), 
physicalLimit.getOffset(),
+                    child.getLimit());
+            child.setLimit(newLimit);
+            if (newLimit != -1
+                    && child instanceof AggregationNode && 
physicalLimit.child() instanceof PhysicalHashAggregate) {
+                PhysicalHashAggregate<? extends Plan> agg
+                        = (PhysicalHashAggregate<? extends Plan>) 
physicalLimit.child();
+                if (agg.isDistinct()) {
+                    if (agg.child(0) instanceof PhysicalDistribute
+                            && agg.child(0).child(0) instanceof 
PhysicalHashAggregate
+                            && ((Aggregate) agg.child(0).child(0)).isDistinct()
+                            && child.getChild(0) instanceof ExchangeNode
+                            && child.getChild(0).getChild(0) instanceof 
AggregationNode) {
+                        child.getChild(0).getChild(0).setLimit(newLimit);
+                    }
+                }
+            }
         } else if (physicalLimit.getPhase().isGlobal()) {
             if (!(child instanceof ExchangeNode)) {
                 ExchangeNode exchangeNode = new 
ExchangeNode(context.nextPlanNodeId(), child);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/LimitAggToTopNAgg.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/LimitAggToTopNAgg.java
index 049709dd23a..c3da664b517 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/LimitAggToTopNAgg.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/LimitAggToTopNAgg.java
@@ -62,7 +62,8 @@ public class LimitAggToTopNAgg implements RewriteRuleFactory {
                                 >= limit.getLimit() + limit.getOffset())
                         .when(limit -> {
                             LogicalAggregate<? extends Plan> agg = 
limit.child();
-                            return !agg.getGroupByExpressions().isEmpty() && 
!agg.getSourceRepeat().isPresent();
+                            return !agg.getGroupByExpressions().isEmpty() && 
!agg.getSourceRepeat().isPresent()
+                                    && !agg.isDistinct();
                         })
                         .then(limit -> {
                             LogicalAggregate<? extends Plan> agg = 
limit.child();
@@ -77,7 +78,8 @@ public class LimitAggToTopNAgg implements RewriteRuleFactory {
                                 >= limit.getLimit() + limit.getOffset())
                         .when(limit -> {
                             LogicalAggregate<? extends Plan> agg = 
limit.child().child();
-                            return !agg.getGroupByExpressions().isEmpty() && 
!agg.getSourceRepeat().isPresent();
+                            return !agg.getGroupByExpressions().isEmpty() && 
!agg.getSourceRepeat().isPresent()
+                                    && !agg.isDistinct();
                         })
                         .then(limit -> {
                             LogicalProject<? extends Plan> project = 
limit.child();
@@ -96,7 +98,8 @@ public class LimitAggToTopNAgg implements RewriteRuleFactory {
                                 >= topn.getLimit() + topn.getOffset())
                         .when(topn -> {
                             LogicalAggregate<? extends Plan> agg = 
topn.child();
-                            return !agg.getGroupByExpressions().isEmpty() && 
!agg.getSourceRepeat().isPresent();
+                            return !agg.getGroupByExpressions().isEmpty() && 
!agg.getSourceRepeat().isPresent()
+                                    && !agg.isDistinct();
                         })
                         .then(topn -> {
                             LogicalAggregate<? extends Plan> agg = 
topn.child();
@@ -117,7 +120,8 @@ public class LimitAggToTopNAgg implements 
RewriteRuleFactory {
                                 >= topn.getLimit() + topn.getOffset())
                         .when(topn -> {
                             LogicalAggregate<? extends Plan> agg = 
topn.child().child();
-                            return !agg.getGroupByExpressions().isEmpty() && 
!agg.getSourceRepeat().isPresent();
+                            return !agg.getGroupByExpressions().isEmpty() && 
!agg.getSourceRepeat().isPresent()
+                                    && !agg.isDistinct();
                         })
                         .then(topn -> {
                             LogicalTopN originTopn = topn;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Aggregate.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Aggregate.java
index d29f7f8daea..7a283c740e5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Aggregate.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Aggregate.java
@@ -19,6 +19,7 @@ package org.apache.doris.nereids.trees.plans.algebra;
 
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
 import 
org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.UnaryPlan;
@@ -100,4 +101,9 @@ public interface Aggregate<CHILD_TYPE extends Plan> extends 
UnaryPlan<CHILD_TYPE
         }
         return false;
     }
+
+    default boolean isDistinct() {
+        return getOutputExpressions().stream().allMatch(e -> e instanceof Slot)
+                && getGroupByExpressions().stream().allMatch(e -> e instanceof 
Slot);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java
index df8f886451f..d96dd8a15c2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java
@@ -151,10 +151,12 @@ public class LogicalAggregate<CHILD_TYPE extends Plan>
         this.sourceRepeat = Objects.requireNonNull(sourceRepeat, "sourceRepeat 
cannot be null");
     }
 
+    @Override
     public List<Expression> getGroupByExpressions() {
         return groupByExpressions;
     }
 
+    @Override
     public List<NamedExpression> getOutputExpressions() {
         return outputExpressions;
     }
@@ -167,11 +169,6 @@ public class LogicalAggregate<CHILD_TYPE extends Plan>
         return sourceRepeat;
     }
 
-    public boolean isDistinct() {
-        return outputExpressions.stream().allMatch(e -> e instanceof Slot)
-                && groupByExpressions.stream().allMatch(e -> e instanceof 
Slot);
-    }
-
     public boolean isGenerated() {
         return generated;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashAggregate.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashAggregate.java
index 7ed39fed8b6..e8749dcee22 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashAggregate.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashAggregate.java
@@ -135,10 +135,12 @@ public class PhysicalHashAggregate<CHILD_TYPE extends 
Plan> extends PhysicalUnar
         this.requireProperties = Objects.requireNonNull(requireProperties, 
"requireProperties cannot be null");
     }
 
+    @Override
     public List<Expression> getGroupByExpressions() {
         return groupByExpressions;
     }
 
+    @Override
     public List<NamedExpression> getOutputExpressions() {
         return outputExpressions;
     }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/PushDownLimitDistinctThroughJoinTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/PushDownLimitDistinctThroughJoinTest.java
index 910e4ce669a..bce9a2cd207 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/PushDownLimitDistinctThroughJoinTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/PushDownLimitDistinctThroughJoinTest.java
@@ -134,7 +134,7 @@ class PushDownLimitDistinctThroughJoinTest extends 
TestWithFeService implements
                 .rewrite()
                 .matches(
                         logicalProject(logicalJoin(
-                                
logicalTopN(logicalAggregate(logicalProject(logicalOlapScan())))
+                                
logicalLimit(logicalAggregate(logicalProject(logicalOlapScan())))
                                         .when(l -> l.getLimit() == 10),
                                 logicalProject(logicalOlapScan())
                         ))
diff --git 
a/regression-test/data/nereids_rules_p0/push_down_limit_distinct/push_down_limit_distinct_through_join.out
 
b/regression-test/data/nereids_rules_p0/push_down_limit_distinct/push_down_limit_distinct_through_join.out
index 714ce630f16..9ffe9520387 100644
Binary files 
a/regression-test/data/nereids_rules_p0/push_down_limit_distinct/push_down_limit_distinct_through_join.out
 and 
b/regression-test/data/nereids_rules_p0/push_down_limit_distinct/push_down_limit_distinct_through_join.out
 differ
diff --git a/regression-test/data/nereids_tpch_p0/tpch/push_topn_to_agg.out 
b/regression-test/data/nereids_tpch_p0/tpch/push_topn_to_agg.out
index af01d68ffc6..0008027785a 100644
Binary files a/regression-test/data/nereids_tpch_p0/tpch/push_topn_to_agg.out 
and b/regression-test/data/nereids_tpch_p0/tpch/push_topn_to_agg.out differ
diff --git 
a/regression-test/suites/nereids_tpch_p0/tpch/push_topn_to_agg.groovy 
b/regression-test/suites/nereids_tpch_p0/tpch/push_topn_to_agg.groovy
index 5ae587910b6..844d76b2194 100644
--- a/regression-test/suites/nereids_tpch_p0/tpch/push_topn_to_agg.groovy
+++ b/regression-test/suites/nereids_tpch_p0/tpch/push_topn_to_agg.groovy
@@ -101,4 +101,86 @@ suite("push_topn_to_agg") {
         sql "select sum(ps_availqty), ps_partkey, ps_suppkey from partsupp 
group by ps_partkey, ps_suppkey order by ps_partkey, ps_suppkey limit 18;"
         contains("sortByGroupKey:true")
     }
+
+    qt_shape_distinct_agg "explain shape plan select o_custkey, o_shippriority 
from orders group by o_custkey, o_shippriority limit 1";
+
+    qt_shape_distinct "explain shape plan select distinct o_custkey from 
orders group by o_custkey limit 1"
+
+    explain {
+        sql "select o_custkey, o_shippriority from orders group by o_custkey, 
o_shippriority limit 1"
+        multiContains("limit: 1", 3)
+    }
+    /**
+    "limit 1" in 3 plan nodes:
+    4:VEXCHANGE/ 3:VAGGREGATE (merge finalize) / 1:VAGGREGATE (update 
serialize)
++--------------------------------------------------------------------------------+
+| PLAN FRAGMENT 0                                                              
  |
+|   OUTPUT EXPRS:                                                              
  |
+|     o_custkey[#11]                                                           
  |
+|   PARTITION: UNPARTITIONED                                                   
  |
+|                                                                              
  |
+|   HAS_COLO_PLAN_NODE: false                                                  
  |
+|                                                                              
  |
+|   VRESULT SINK                                                               
  |
+|      MYSQL_PROTOCAL                                                          
  |
+|                                                                              
  |
+|   4:VEXCHANGE                                                                
  |
+|      offset: 0                                                               
  |
+|      limit: 1                                                                
  |
+|      distribute expr lists: o_custkey[#11]                                   
  |
+|                                                                              
  |
+| PLAN FRAGMENT 1                                                              
  |
+|                                                                              
  |
+|   PARTITION: HASH_PARTITIONED: o_custkey[#10]                                
  |
+|                                                                              
  |
+|   HAS_COLO_PLAN_NODE: true                                                   
  |
+|                                                                              
  |
+|   STREAM DATA SINK                                                           
  |
+|     EXCHANGE ID: 04                                                          
  |
+|     UNPARTITIONED                                                            
  |
+|                                                                              
  |
+|   3:VAGGREGATE (merge finalize)(233)                                         
  |
+|   |  group by: o_custkey[#10]                                                
  |
+|   |  sortByGroupKey:false                                                    
  |
+|   |  cardinality=50,000                                                      
  |
+|   |  limit: 1                                                                
  |
+|   |  distribute expr lists: o_custkey[#10]                                   
  |
+|   |                                                                          
  |
+|   2:VEXCHANGE                                                                
  |
+|      offset: 0                                                               
  |
+|      distribute expr lists:                                                  
  |
+|                                                                              
  |
+| PLAN FRAGMENT 2                                                              
  |
+|                                                                              
  |
+|   PARTITION: HASH_PARTITIONED: O_ORDERKEY[#0]                                
  |
+|                                                                              
  |
+|   HAS_COLO_PLAN_NODE: false                                                  
  |
+|                                                                              
  |
+|   STREAM DATA SINK                                                           
  |
+|     EXCHANGE ID: 02                                                          
  |
+|     HASH_PARTITIONED: o_custkey[#10]                                         
  |
+|                                                                              
  |
+|   1:VAGGREGATE (update serialize)(223)                                       
  |
+|   |  STREAMING                                                               
  |
+|   |  group by: o_custkey[#9]                                                 
  |
+|   |  sortByGroupKey:false                                                    
  |
+|   |  cardinality=50,000                                                      
  |
+|   |  limit: 1                                                                
  |
+|   |  distribute expr lists:                                                  
  |
+|   |                                                                          
  |
+|   0:VOlapScanNode(213)                                                       
  |
+|      TABLE: regression_test_nereids_tpch_p0.orders(orders), PREAGGREGATION: 
ON |
+|      partitions=1/1 (orders)                                                 
  |
+|      tablets=3/3, tabletList=1738740551790,1738740551792,1738740551794       
  |
+|      cardinality=150000, avgRowSize=165.10652, numNodes=1                    
  |
+|      pushAggOp=NONE                                                          
  |
+|      final projections: O_CUSTKEY[#1]                                        
  |
+|      final project output tuple id: 1                                        
  |
+|                                                                              
  |
+|                                                                              
  |
+|                                                                              
  |
+| ========== STATISTICS ==========                                             
  |
+| planed with unknown column statistics                                        
  |
++--------------------------------------------------------------------------------+
+    **/
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to