This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 9e386c02ae7 [feat](Nereids) Reject Commutativity Swap for Nested Loop 
Joins Affecting Parallelism (#34639) (#34996)
9e386c02ae7 is described below

commit 9e386c02ae726a7a806b954a3e3bd2f7e85b9dc2
Author: 谢健 <jianx...@gmail.com>
AuthorDate: Tue May 21 10:17:19 2024 +0800

    [feat](Nereids) Reject Commutativity Swap for Nested Loop Joins Affecting 
Parallelism (#34639) (#34996)
    
    pick from master #34639
    
    This PR introduces a safeguard to prevent commutativity swaps in nested 
loop joins that would convert a parallelizable join into a non-parallelizable 
one, thereby preserving optimal query execution efficiency. By adding a 
function that assesses the impact of such swaps on parallelism, the system 
automatically rejects changes that would hinder performance, ensuring that 
joins can continue to be executed in parallel to fully utilize system resources 
and maintain high operational throughput.
---
 .../rules/exploration/join/JoinCommute.java        |  8 ++++++++
 .../apache/doris/planner/NestedLoopJoinNode.java   |  6 +++++-
 .../rules/exploration/join/JoinCommuteTest.java    | 22 +++++++++++++++++++++-
 3 files changed, 34 insertions(+), 2 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/join/JoinCommute.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/join/JoinCommute.java
index 8dd3e6abe71..e9656a1b62d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/join/JoinCommute.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/join/JoinCommute.java
@@ -25,8 +25,11 @@ import org.apache.doris.nereids.trees.expressions.Not;
 import org.apache.doris.nereids.trees.expressions.Slot;
 import 
org.apache.doris.nereids.trees.expressions.functions.scalar.BitmapContains;
 import org.apache.doris.nereids.trees.plans.GroupPlan;
+import org.apache.doris.nereids.trees.plans.JoinType;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
+import org.apache.doris.nereids.util.JoinUtils;
+import org.apache.doris.planner.NestedLoopJoinNode;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.thrift.TRuntimeFilterType;
 
@@ -59,6 +62,11 @@ public class JoinCommute extends OneExplorationRuleFactory {
                 .whenNot(LogicalJoin::hasJoinHint)
                 .whenNot(join -> joinOrderMatchBitmapRuntimeFilterOrder(join))
                 .whenNot(LogicalJoin::isMarkJoin)
+                // For a nested loop join, if commutativity causes a join that 
could originally be executed
+                // in parallel to become non-parallelizable, then we reject 
this swap.
+                .whenNot(join -> JoinUtils.shouldNestedLoopJoin(join)
+                        && 
NestedLoopJoinNode.canParallelize(JoinType.toJoinOperator(join.getJoinType()))
+                        && 
!NestedLoopJoinNode.canParallelize(JoinType.toJoinOperator(join.getJoinType().swap())))
                 .then(join -> {
                     LogicalJoin<Plan, Plan> newJoin = 
join.withTypeChildren(join.getJoinType().swap(),
                             join.right(), join.left());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
index 05eb34a7815..c1ad5933492 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
@@ -71,12 +71,16 @@ public class NestedLoopJoinNode extends JoinNodeBase {
         tupleIds.addAll(inner.getOutputTupleIds());
     }
 
-    public boolean canParallelize() {
+    public static boolean canParallelize(JoinOperator joinOp) {
         return joinOp == JoinOperator.CROSS_JOIN || joinOp == 
JoinOperator.INNER_JOIN
                 || joinOp == JoinOperator.LEFT_OUTER_JOIN || joinOp == 
JoinOperator.LEFT_SEMI_JOIN
                 || joinOp == JoinOperator.LEFT_ANTI_JOIN || joinOp == 
JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN;
     }
 
+    public boolean canParallelize() {
+        return canParallelize(joinOp);
+    }
+
     public void setJoinConjuncts(List<Expr> joinConjuncts) {
         this.joinConjuncts = joinConjuncts;
     }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/join/JoinCommuteTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/join/JoinCommuteTest.java
index 20323d108e6..022256225b6 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/join/JoinCommuteTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/join/JoinCommuteTest.java
@@ -18,7 +18,9 @@
 package org.apache.doris.nereids.rules.exploration.join;
 
 import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.trees.expressions.GreaterThan;
 import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
 import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.nereids.util.LogicalPlanBuilder;
@@ -27,11 +29,12 @@ import org.apache.doris.nereids.util.MemoTestUtils;
 import org.apache.doris.nereids.util.PlanChecker;
 import org.apache.doris.nereids.util.PlanConstructor;
 
+import com.google.common.collect.ImmutableList;
 import org.junit.jupiter.api.Test;
 
 public class JoinCommuteTest implements MemoPatternMatchSupported {
     @Test
-    public void testInnerJoinCommute() {
+    void testInnerJoinCommute() {
         LogicalOlapScan scan1 = PlanConstructor.newLogicalOlapScan(0, "t1", 0);
         LogicalOlapScan scan2 = PlanConstructor.newLogicalOlapScan(1, "t2", 0);
 
@@ -51,4 +54,21 @@ public class JoinCommuteTest implements 
MemoPatternMatchSupported {
                 )
         ;
     }
+
+    @Test
+    void testParallelJoinCommute() {
+        LogicalOlapScan scan1 = PlanConstructor.newLogicalOlapScan(0, "t1", 0);
+        LogicalOlapScan scan2 = PlanConstructor.newLogicalOlapScan(1, "t2", 0);
+
+        LogicalJoin<?, ?> join = (LogicalJoin<?, ?>) new 
LogicalPlanBuilder(scan1)
+                .join(scan2, JoinType.LEFT_OUTER_JOIN, Pair.of(0, 0))
+                .build();
+        join = join.withJoinConjuncts(
+                ImmutableList.of(),
+                ImmutableList.of(new GreaterThan(scan1.getOutput().get(0), 
scan2.getOutput().get(0))));
+
+        PlanChecker.from(MemoTestUtils.createConnectContext(), join)
+                .applyExploration(JoinCommute.BUSHY.build())
+                .printlnTree();
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to