This is an automated email from the ASF dual-hosted git repository. morrysnow pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 57301920e3f [fix](colocate join) fix wrong use of colocate join (#37361) (#37714) 57301920e3f is described below commit 57301920e3f3b5a85cec8a5c27ccabb20c34d980 Author: camby <camby...@tencent.com> AuthorDate: Mon Jul 15 16:47:17 2024 +0800 [fix](colocate join) fix wrong use of colocate join (#37361) (#37714) cherry-pick from master #37361 --- .../properties/ChildOutputPropertyDeriver.java | 2 +- .../properties/ChildrenPropertiesRegulator.java | 2 +- .../LogicalOlapScanToPhysicalOlapScan.java | 12 ++-- .../org/apache/doris/nereids/util/JoinUtils.java | 53 ++++++++++++-- .../test_colocate_join_of_column_order.groovy | 82 ++++++++++++++++++++++ 5 files changed, 137 insertions(+), 14 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java index 3756c1bcfe3..bb23f46cdc4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java @@ -279,7 +279,7 @@ public class ChildOutputPropertyDeriver extends PlanVisitor<PhysicalProperties, case RIGHT_SEMI_JOIN: case RIGHT_ANTI_JOIN: case RIGHT_OUTER_JOIN: - if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec)) { + if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec, hashJoin.getHashJoinConjuncts())) { return new PhysicalProperties(rightHashSpec); } else { // retain left shuffle type, since coordinator use left most node to schedule fragment diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java index 3beed014aac..02cb56ef8ea 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java @@ -245,7 +245,7 @@ public class ChildrenPropertiesRegulator extends PlanVisitor<Boolean, Void> { Optional<PhysicalProperties> updatedForLeft = Optional.empty(); Optional<PhysicalProperties> updatedForRight = Optional.empty(); - if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec)) { + if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec, hashJoin.getHashJoinConjuncts())) { // check colocate join with scan return true; } else if (couldNotRightBucketShuffleJoin(hashJoin.getJoinType(), leftHashSpec, rightHashSpec)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java index 43436355ae1..472d2e169db 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java @@ -83,8 +83,8 @@ public class LogicalOlapScanToPhysicalOlapScan extends OneImplementationRuleFact List<Slot> output = olapScan.getOutput(); List<Slot> baseOutput = olapScan.getOutputByIndex(olapScan.getTable().getBaseIndexId()); List<ExprId> hashColumns = Lists.newArrayList(); - for (Slot slot : output) { - for (Column column : hashDistributionInfo.getDistributionColumns()) { + for (Column column : hashDistributionInfo.getDistributionColumns()) { + for (Slot slot : output) { if (((SlotReference) slot).getColumn().get().getNameWithoutMvPrefix() .equals(column.getName())) { hashColumns.add(slot.getExprId()); @@ -92,8 +92,8 @@ public class LogicalOlapScanToPhysicalOlapScan extends OneImplementationRuleFact } } if (hashColumns.size() != hashDistributionInfo.getDistributionColumns().size()) { - for (Slot slot : baseOutput) { - for (Column column : hashDistributionInfo.getDistributionColumns()) { + for (Column column : hashDistributionInfo.getDistributionColumns()) { + for (Slot slot : baseOutput) { // If the length of the column in the bucket key changes after DDL, the length cannot be // determined. As a result, some bucket fields are lost in the query execution plan. // So here we use the column name to avoid this problem @@ -109,8 +109,8 @@ public class LogicalOlapScanToPhysicalOlapScan extends OneImplementationRuleFact HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo; List<Slot> output = olapScan.getOutput(); List<ExprId> hashColumns = Lists.newArrayList(); - for (Slot slot : output) { - for (Column column : hashDistributionInfo.getDistributionColumns()) { + for (Column column : hashDistributionInfo.getDistributionColumns()) { + for (Slot slot : output) { // If the length of the column in the bucket key changes after DDL, the length cannot be // determined. As a result, some bucket fields are lost in the query execution plan. // So here we use the column name to avoid this problem diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java index 79462a9d8e7..df0665be3da 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java @@ -32,6 +32,7 @@ import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.MarkJoinSlotReference; import org.apache.doris.nereids.trees.expressions.Not; import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.expressions.functions.scalar.BitmapContains; import org.apache.doris.nereids.trees.plans.JoinType; import org.apache.doris.nereids.trees.plans.Plan; @@ -54,6 +55,7 @@ import com.google.common.collect.Sets; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @@ -257,13 +259,14 @@ public class JoinUtils { return false; } return couldColocateJoin((DistributionSpecHash) leftDistributionSpec, - (DistributionSpecHash) rightDistributionSpec); + (DistributionSpecHash) rightDistributionSpec, join.getHashJoinConjuncts()); } /** * could do colocate join with left and right child distribution spec. */ - public static boolean couldColocateJoin(DistributionSpecHash leftHashSpec, DistributionSpecHash rightHashSpec) { + public static boolean couldColocateJoin(DistributionSpecHash leftHashSpec, DistributionSpecHash rightHashSpec, + List<Expression> conjuncts) { if (ConnectContext.get() == null || ConnectContext.get().getSessionVariable().isDisableColocatePlan()) { return false; @@ -285,12 +288,50 @@ public class JoinUtils { boolean noNeedCheckColocateGroup = hitSameIndex && (leftTablePartitions.equals(rightTablePartitions)) && (leftTablePartitions.size() <= 1); ColocateTableIndex colocateIndex = Env.getCurrentColocateIndex(); - if (noNeedCheckColocateGroup - || (colocateIndex.isSameGroup(leftTableId, rightTableId) - && !colocateIndex.isGroupUnstable(colocateIndex.getGroup(leftTableId)))) { + if (noNeedCheckColocateGroup) { return true; } - return false; + if (!colocateIndex.isSameGroup(leftTableId, rightTableId) + || colocateIndex.isGroupUnstable(colocateIndex.getGroup(leftTableId))) { + return false; + } + + Set<Integer> equalIndices = new HashSet<>(); + for (Expression expr : conjuncts) { + // only simple equal predicate can use colocate join + if (!(expr instanceof EqualPredicate)) { + return false; + } + Expression leftChild = ((EqualPredicate) expr).left(); + Expression rightChild = ((EqualPredicate) expr).right(); + if (!(leftChild instanceof SlotReference) || !(rightChild instanceof SlotReference)) { + return false; + } + + SlotReference leftSlot = (SlotReference) leftChild; + SlotReference rightSlot = (SlotReference) rightChild; + Integer leftIndex = null; + Integer rightIndex = null; + if (leftSlot.getTable().isPresent() && leftSlot.getTable().get().getId() == leftHashSpec.getTableId()) { + leftIndex = leftHashSpec.getExprIdToEquivalenceSet().get(leftSlot.getExprId()); + rightIndex = rightHashSpec.getExprIdToEquivalenceSet().get(rightSlot.getExprId()); + } else { + leftIndex = rightHashSpec.getExprIdToEquivalenceSet().get(leftSlot.getExprId()); + rightIndex = leftHashSpec.getExprIdToEquivalenceSet().get(rightSlot.getExprId()); + } + if (!Objects.equals(leftIndex, rightIndex)) { + return false; + } + if (leftIndex != null) { + equalIndices.add(leftIndex); + } + } + // on conditions must contain all distributed columns + if (equalIndices.containsAll(leftHashSpec.getExprIdToEquivalenceSet().values())) { + return true; + } else { + return false; + } } public static Set<ExprId> getJoinOutputExprIdSet(Plan left, Plan right) { diff --git a/regression-test/suites/correctness_p0/test_colocate_join_of_column_order.groovy b/regression-test/suites/correctness_p0/test_colocate_join_of_column_order.groovy new file mode 100644 index 00000000000..663b7da02d6 --- /dev/null +++ b/regression-test/suites/correctness_p0/test_colocate_join_of_column_order.groovy @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_colocate_join_of_column_order") { + sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_t1`; """ + // distributed by k1,k2 + sql """ + CREATE TABLE IF NOT EXISTS `test_colocate_join_of_column_order_t1` ( + `k1` varchar(64) NULL, + `k2` varchar(64) NULL, + `v` int NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`,`k2`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`,`k2`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "colocate_with" = "group_column_order" + ); + """ + sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_t2`; """ + // distributed by k2,k1 + sql """ + CREATE TABLE IF NOT EXISTS `test_colocate_join_of_column_order_t2` ( + `k1` varchar(64) NULL, + `k2` varchar(64) NULL, + `v` int NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`,`k2`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k2`,`k1`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "colocate_with" = "group_column_order" + ); + """ + sql """insert into test_colocate_join_of_column_order_t1 values('k1','k2',11);""" + sql """insert into test_colocate_join_of_column_order_t2 values('k1','k2',11);""" + + sql """set enable_nereids_planner=true; """ + explain { + sql("select * from test_colocate_join_of_column_order_t1 a join test_colocate_join_of_column_order_t2 b on a.k1=b.k1 and a.k2=b.k2;") + notContains "COLOCATE" + } + explain { + sql("select * from test_colocate_join_of_column_order_t1 a join test_colocate_join_of_column_order_t2 b on a.k1=b.k2;") + notContains "COLOCATE" + } + explain { + sql("select * from test_colocate_join_of_column_order_t1 a join test_colocate_join_of_column_order_t2 b on a.k1=b.k1;") + notContains "COLOCATE" + } + explain { + sql("select * from test_colocate_join_of_column_order_t1 a join test_colocate_join_of_column_order_t2 b on a.k1=b.k2 and a.v=b.v;") + notContains "COLOCATE" + } + explain { + sql("select * from test_colocate_join_of_column_order_t1 a join test_colocate_join_of_column_order_t2 b on a.k1=b.k2 and a.k2=b.k1;") + contains "COLOCATE" + } + explain { + sql("select * from test_colocate_join_of_column_order_t1 a join test_colocate_join_of_column_order_t2 b on a.k1=b.k2 and a.k2=b.k1 and a.v=b.v;") + contains "COLOCATE" + } + + sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_t1`; """ + sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_t2`; """ +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org