This is an automated email from the ASF dual-hosted git repository.

englefly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7276665f1e [enhancement](Nereids) avoiding broadcast join 
heuristically and pruning more in CostAndEnforceJob (#25137)
7276665f1e is described below

commit 7276665f1e4a224a87e8889f5aa84ab6329ad9d9
Author: 谢健 <jianx...@gmail.com>
AuthorDate: Tue Oct 10 13:38:10 2023 +0800

    [enhancement](Nereids) avoiding broadcast join heuristically and pruning 
more in CostAndEnforceJob (#25137)
    
    When the rowCount exceeds a certain threshold, refrain from generating a 
broadcast join.
    Only enforce the best expression in CostAndEnforce Job, rather than 
enforcing every expression.
    Remove lower bound group pruning
---
 .../org/apache/doris/nereids/cost/CostModelV1.java |  9 -------
 .../nereids/jobs/cascades/CostAndEnforcerJob.java  | 31 +++++++++++++++++++---
 .../nereids/properties/PhysicalProperties.java     |  4 +++
 .../nereids/properties/RequestPropertyDeriver.java |  8 +++++-
 .../properties/RequestPropertyDeriverTest.java     | 18 +++++++++++--
 5 files changed, 55 insertions(+), 15 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModelV1.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModelV1.java
index c802148fe0..2e19fa1fb6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModelV1.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModelV1.java
@@ -207,15 +207,6 @@ class CostModelV1 extends PlanVisitor<Cost, PlanContext> {
 
         // replicate
         if (spec instanceof DistributionSpecReplicated) {
-            double dataSize = childStatistics.computeSize();
-            double memLimit = 
ConnectContext.get().getSessionVariable().getMaxExecMemByte();
-            //if build side is big, avoid use broadcast join
-            double rowsLimit = 
ConnectContext.get().getSessionVariable().getBroadcastRowCountLimit();
-            double brMemlimit = 
ConnectContext.get().getSessionVariable().getBroadcastHashtableMemLimitPercentage();
-            if (dataSize > memLimit * brMemlimit
-                    || childStatistics.getRowCount() > rowsLimit) {
-                return CostV1.of(Double.MAX_VALUE, Double.MAX_VALUE, 
Double.MAX_VALUE);
-            }
             // estimate broadcast cost by an experience formula: beNumber^0.5 
* rowCount
             // - sender number and receiver number is not available at RBO 
stage now, so we use beNumber
             // - senders and receivers work in parallel, that why we use 
square of beNumber
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java
index dbe45d13bb..7cb73a332d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java
@@ -188,9 +188,17 @@ public class CostAndEnforcerJob extends Job implements 
Cloneable {
                         curNodeCost,
                         
lowestCostExpr.getCostValueByProperties(requestChildProperty),
                         curChildIndex);
-                if (curTotalCost.getValue() > context.getCostUpperBound()) {
-                    curTotalCost = Cost.infinite();
-                }
+
+                // Not performing lower bound group pruning here is to avoid 
redundant optimization of children.
+                // For example:
+                //      Group1 : betterExpr, currentExpr(child: Group2), 
otherExpr(child: Group)
+                //      steps
+                //          1. CostAndEnforce(currentExpr) with upperBound 
betterExpr.cost
+                //          2. OptimzeGroup(Group2) with upperBound 
bestExpr.cost - currentExpr.nodeCost
+                //          3. CostAndEnforce(Expr in Group2) trigger here and 
exit
+                //              ...
+                //          n.  CostAndEnforce(otherExpr) can trigger optimize 
group2 again for the same requireProp
+
                 // the request child properties will be covered by the output 
properties
                 // that corresponding to the request properties. so if we run 
a costAndEnforceJob of the same
                 // group expression, that request child properties will be 
different of this.
@@ -275,6 +283,23 @@ public class CostAndEnforcerJob extends Job implements 
Cloneable {
             }
             return;
         }
+
+        if (context.getRequiredProperties().isDistributionOnlyProperties()) {
+            // For properties without an orderSpec, 
enforceMissingPropertiesHelper always adds a distributor
+            // above this group expression. The cost of the distributor is 
equal to the cost of the groupExpression
+            // plus the cost of the distributor. The distributor remains 
unchanged for different groupExpressions.
+            // Therefore, if there is a better groupExpr, it is preferable to 
enforce the better groupExpr.
+            // Consequently, we can avoid this enforcement.
+            Optional<Pair<Cost, GroupExpression>> bestExpr = 
groupExpression.getOwnerGroup()
+                    .getLowestCostPlan(context.getRequiredProperties());
+            double bestCost = bestExpr
+                    .map(costGroupExpressionPair -> 
costGroupExpressionPair.first.getValue())
+                    .orElse(Double.POSITIVE_INFINITY);
+            if (curTotalCost.getValue() > bestCost) {
+                return;
+            }
+        }
+
         EnforceMissingPropertiesHelper enforceMissingPropertiesHelper
                 = new EnforceMissingPropertiesHelper(context, groupExpression, 
curTotalCost);
         PhysicalProperties addEnforcedProperty = enforceMissingPropertiesHelper
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
index e3b5151af7..3c89f87340 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
@@ -106,6 +106,10 @@ public class PhysicalProperties {
         return distributionSpec;
     }
 
+    public boolean isDistributionOnlyProperties() {
+        return orderSpec.getOrderKeys().isEmpty();
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
index ec1d93ca70..acc8ef7886 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
@@ -165,8 +165,14 @@ public class RequestPropertyDeriver extends 
PlanVisitor<Void, PlanContext> {
         if (JoinUtils.couldShuffle(hashJoin)) {
             addShuffleJoinRequestProperty(hashJoin);
         }
+
         // for broadcast join
-        if (JoinUtils.couldBroadcast(hashJoin)) {
+        double memLimit = 
ConnectContext.get().getSessionVariable().getMaxExecMemByte();
+        double rowsLimit = 
ConnectContext.get().getSessionVariable().getBroadcastRowCountLimit();
+        double brMemlimit = 
ConnectContext.get().getSessionVariable().getBroadcastHashtableMemLimitPercentage();
+        double datasize = 
hashJoin.getGroupExpression().get().child(1).getStatistics().computeSize();
+        double rowCount = 
hashJoin.getGroupExpression().get().child(1).getStatistics().getRowCount();
+        if (JoinUtils.couldBroadcast(hashJoin) && rowCount <= rowsLimit && 
datasize <= memLimit * brMemlimit) {
             addBroadcastJoinRequestProperty();
         }
         return null;
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/RequestPropertyDeriverTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/RequestPropertyDeriverTest.java
index 4ccee56e4b..37767273b3 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/RequestPropertyDeriverTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/RequestPropertyDeriverTest.java
@@ -37,6 +37,7 @@ import 
org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
 import org.apache.doris.nereids.types.IntegerType;
 import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.qe.ConnectContext;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
@@ -60,6 +61,12 @@ class RequestPropertyDeriverTest {
     @Mocked
     LogicalProperties logicalProperties;
 
+    @Mocked
+    ConnectContext connectContext;
+
+    @Injectable
+    Group group;
+
     @Injectable
     JobContext jobContext;
 
@@ -105,7 +112,7 @@ class RequestPropertyDeriverTest {
                 ExpressionUtils.EMPTY_CONDITION, 
ExpressionUtils.EMPTY_CONDITION, JoinHint.NONE, Optional.empty(),
                 logicalProperties,
                 groupPlan, groupPlan);
-        GroupExpression groupExpression = new GroupExpression(join);
+        GroupExpression groupExpression = new GroupExpression(join, 
Lists.newArrayList(group, group));
         new Group(null, groupExpression, null);
 
         RequestPropertyDeriver requestPropertyDeriver = new 
RequestPropertyDeriver(jobContext);
@@ -130,11 +137,18 @@ class RequestPropertyDeriverTest {
             }
         };
 
+        new MockUp<ConnectContext>() {
+            @Mock
+            ConnectContext get() {
+                return connectContext;
+            }
+        };
+
         PhysicalHashJoin<GroupPlan, GroupPlan> join = new 
PhysicalHashJoin<>(JoinType.INNER_JOIN,
                 ExpressionUtils.EMPTY_CONDITION, 
ExpressionUtils.EMPTY_CONDITION, JoinHint.NONE, Optional.empty(),
                 logicalProperties,
                 groupPlan, groupPlan);
-        GroupExpression groupExpression = new GroupExpression(join);
+        GroupExpression groupExpression = new GroupExpression(join, 
Lists.newArrayList(group, group));
         new Group(null, groupExpression, null);
 
         RequestPropertyDeriver requestPropertyDeriver = new 
RequestPropertyDeriver(jobContext);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to