This is an automated email from the ASF dual-hosted git repository. morrysnow pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 9e386c02ae7 [feat](Nereids) Reject Commutativity Swap for Nested Loop Joins Affecting Parallelism (#34639) (#34996) 9e386c02ae7 is described below commit 9e386c02ae726a7a806b954a3e3bd2f7e85b9dc2 Author: 谢健 <jianx...@gmail.com> AuthorDate: Tue May 21 10:17:19 2024 +0800 [feat](Nereids) Reject Commutativity Swap for Nested Loop Joins Affecting Parallelism (#34639) (#34996) pick from master #34639 This PR introduces a safeguard to prevent commutativity swaps in nested loop joins that would convert a parallelizable join into a non-parallelizable one, thereby preserving optimal query execution efficiency. By adding a function that assesses the impact of such swaps on parallelism, the system automatically rejects changes that would hinder performance, ensuring that joins can continue to be executed in parallel to fully utilize system resources and maintain high operational throughput. --- .../rules/exploration/join/JoinCommute.java | 8 ++++++++ .../apache/doris/planner/NestedLoopJoinNode.java | 6 +++++- .../rules/exploration/join/JoinCommuteTest.java | 22 +++++++++++++++++++++- 3 files changed, 34 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/join/JoinCommute.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/join/JoinCommute.java index 8dd3e6abe71..e9656a1b62d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/join/JoinCommute.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/join/JoinCommute.java @@ -25,8 +25,11 @@ import org.apache.doris.nereids.trees.expressions.Not; import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.expressions.functions.scalar.BitmapContains; import org.apache.doris.nereids.trees.plans.GroupPlan; +import org.apache.doris.nereids.trees.plans.JoinType; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.logical.LogicalJoin; +import org.apache.doris.nereids.util.JoinUtils; +import org.apache.doris.planner.NestedLoopJoinNode; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TRuntimeFilterType; @@ -59,6 +62,11 @@ public class JoinCommute extends OneExplorationRuleFactory { .whenNot(LogicalJoin::hasJoinHint) .whenNot(join -> joinOrderMatchBitmapRuntimeFilterOrder(join)) .whenNot(LogicalJoin::isMarkJoin) + // For a nested loop join, if commutativity causes a join that could originally be executed + // in parallel to become non-parallelizable, then we reject this swap. + .whenNot(join -> JoinUtils.shouldNestedLoopJoin(join) + && NestedLoopJoinNode.canParallelize(JoinType.toJoinOperator(join.getJoinType())) + && !NestedLoopJoinNode.canParallelize(JoinType.toJoinOperator(join.getJoinType().swap()))) .then(join -> { LogicalJoin<Plan, Plan> newJoin = join.withTypeChildren(join.getJoinType().swap(), join.right(), join.left()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java index 05eb34a7815..c1ad5933492 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java @@ -71,12 +71,16 @@ public class NestedLoopJoinNode extends JoinNodeBase { tupleIds.addAll(inner.getOutputTupleIds()); } - public boolean canParallelize() { + public static boolean canParallelize(JoinOperator joinOp) { return joinOp == JoinOperator.CROSS_JOIN || joinOp == JoinOperator.INNER_JOIN || joinOp == JoinOperator.LEFT_OUTER_JOIN || joinOp == JoinOperator.LEFT_SEMI_JOIN || joinOp == JoinOperator.LEFT_ANTI_JOIN || joinOp == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN; } + public boolean canParallelize() { + return canParallelize(joinOp); + } + public void setJoinConjuncts(List<Expr> joinConjuncts) { this.joinConjuncts = joinConjuncts; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/join/JoinCommuteTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/join/JoinCommuteTest.java index 20323d108e6..022256225b6 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/join/JoinCommuteTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/join/JoinCommuteTest.java @@ -18,7 +18,9 @@ package org.apache.doris.nereids.rules.exploration.join; import org.apache.doris.common.Pair; +import org.apache.doris.nereids.trees.expressions.GreaterThan; import org.apache.doris.nereids.trees.plans.JoinType; +import org.apache.doris.nereids.trees.plans.logical.LogicalJoin; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.util.LogicalPlanBuilder; @@ -27,11 +29,12 @@ import org.apache.doris.nereids.util.MemoTestUtils; import org.apache.doris.nereids.util.PlanChecker; import org.apache.doris.nereids.util.PlanConstructor; +import com.google.common.collect.ImmutableList; import org.junit.jupiter.api.Test; public class JoinCommuteTest implements MemoPatternMatchSupported { @Test - public void testInnerJoinCommute() { + void testInnerJoinCommute() { LogicalOlapScan scan1 = PlanConstructor.newLogicalOlapScan(0, "t1", 0); LogicalOlapScan scan2 = PlanConstructor.newLogicalOlapScan(1, "t2", 0); @@ -51,4 +54,21 @@ public class JoinCommuteTest implements MemoPatternMatchSupported { ) ; } + + @Test + void testParallelJoinCommute() { + LogicalOlapScan scan1 = PlanConstructor.newLogicalOlapScan(0, "t1", 0); + LogicalOlapScan scan2 = PlanConstructor.newLogicalOlapScan(1, "t2", 0); + + LogicalJoin<?, ?> join = (LogicalJoin<?, ?>) new LogicalPlanBuilder(scan1) + .join(scan2, JoinType.LEFT_OUTER_JOIN, Pair.of(0, 0)) + .build(); + join = join.withJoinConjuncts( + ImmutableList.of(), + ImmutableList.of(new GreaterThan(scan1.getOutput().get(0), scan2.getOutput().get(0)))); + + PlanChecker.from(MemoTestUtils.createConnectContext(), join) + .applyExploration(JoinCommute.BUSHY.build()) + .printlnTree(); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org