This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 57301920e3f [fix](colocate join) fix wrong use of colocate join 
(#37361) (#37714)
57301920e3f is described below

commit 57301920e3f3b5a85cec8a5c27ccabb20c34d980
Author: camby <camby...@tencent.com>
AuthorDate: Mon Jul 15 16:47:17 2024 +0800

    [fix](colocate join) fix wrong use of colocate join (#37361) (#37714)
    
    cherry-pick from master #37361
---
 .../properties/ChildOutputPropertyDeriver.java     |  2 +-
 .../properties/ChildrenPropertiesRegulator.java    |  2 +-
 .../LogicalOlapScanToPhysicalOlapScan.java         | 12 ++--
 .../org/apache/doris/nereids/util/JoinUtils.java   | 53 ++++++++++++--
 .../test_colocate_join_of_column_order.groovy      | 82 ++++++++++++++++++++++
 5 files changed, 137 insertions(+), 14 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
index 3756c1bcfe3..bb23f46cdc4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
@@ -279,7 +279,7 @@ public class ChildOutputPropertyDeriver extends 
PlanVisitor<PhysicalProperties,
                 case RIGHT_SEMI_JOIN:
                 case RIGHT_ANTI_JOIN:
                 case RIGHT_OUTER_JOIN:
-                    if (JoinUtils.couldColocateJoin(leftHashSpec, 
rightHashSpec)) {
+                    if (JoinUtils.couldColocateJoin(leftHashSpec, 
rightHashSpec, hashJoin.getHashJoinConjuncts())) {
                         return new PhysicalProperties(rightHashSpec);
                     } else {
                         // retain left shuffle type, since coordinator use 
left most node to schedule fragment
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
index 3beed014aac..02cb56ef8ea 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
@@ -245,7 +245,7 @@ public class ChildrenPropertiesRegulator extends 
PlanVisitor<Boolean, Void> {
         Optional<PhysicalProperties> updatedForLeft = Optional.empty();
         Optional<PhysicalProperties> updatedForRight = Optional.empty();
 
-        if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec)) {
+        if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec, 
hashJoin.getHashJoinConjuncts())) {
             // check colocate join with scan
             return true;
         } else if (couldNotRightBucketShuffleJoin(hashJoin.getJoinType(), 
leftHashSpec, rightHashSpec)) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java
index 43436355ae1..472d2e169db 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java
@@ -83,8 +83,8 @@ public class LogicalOlapScanToPhysicalOlapScan extends 
OneImplementationRuleFact
                 List<Slot> output = olapScan.getOutput();
                 List<Slot> baseOutput = 
olapScan.getOutputByIndex(olapScan.getTable().getBaseIndexId());
                 List<ExprId> hashColumns = Lists.newArrayList();
-                for (Slot slot : output) {
-                    for (Column column : 
hashDistributionInfo.getDistributionColumns()) {
+                for (Column column : 
hashDistributionInfo.getDistributionColumns()) {
+                    for (Slot slot : output) {
                         if (((SlotReference) 
slot).getColumn().get().getNameWithoutMvPrefix()
                                 .equals(column.getName())) {
                             hashColumns.add(slot.getExprId());
@@ -92,8 +92,8 @@ public class LogicalOlapScanToPhysicalOlapScan extends 
OneImplementationRuleFact
                     }
                 }
                 if (hashColumns.size() != 
hashDistributionInfo.getDistributionColumns().size()) {
-                    for (Slot slot : baseOutput) {
-                        for (Column column : 
hashDistributionInfo.getDistributionColumns()) {
+                    for (Column column : 
hashDistributionInfo.getDistributionColumns()) {
+                        for (Slot slot : baseOutput) {
                             // If the length of the column in the bucket key 
changes after DDL, the length cannot be
                             // determined. As a result, some bucket fields are 
lost in the query execution plan.
                             // So here we use the column name to avoid this 
problem
@@ -109,8 +109,8 @@ public class LogicalOlapScanToPhysicalOlapScan extends 
OneImplementationRuleFact
                 HashDistributionInfo hashDistributionInfo = 
(HashDistributionInfo) distributionInfo;
                 List<Slot> output = olapScan.getOutput();
                 List<ExprId> hashColumns = Lists.newArrayList();
-                for (Slot slot : output) {
-                    for (Column column : 
hashDistributionInfo.getDistributionColumns()) {
+                for (Column column : 
hashDistributionInfo.getDistributionColumns()) {
+                    for (Slot slot : output) {
                         // If the length of the column in the bucket key 
changes after DDL, the length cannot be
                         // determined. As a result, some bucket fields are 
lost in the query execution plan.
                         // So here we use the column name to avoid this problem
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java
index 79462a9d8e7..df0665be3da 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/JoinUtils.java
@@ -32,6 +32,7 @@ import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.MarkJoinSlotReference;
 import org.apache.doris.nereids.trees.expressions.Not;
 import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
 import 
org.apache.doris.nereids.trees.expressions.functions.scalar.BitmapContains;
 import org.apache.doris.nereids.trees.plans.JoinType;
 import org.apache.doris.nereids.trees.plans.Plan;
@@ -54,6 +55,7 @@ import com.google.common.collect.Sets;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -257,13 +259,14 @@ public class JoinUtils {
             return false;
         }
         return couldColocateJoin((DistributionSpecHash) leftDistributionSpec,
-                (DistributionSpecHash) rightDistributionSpec);
+                (DistributionSpecHash) rightDistributionSpec, 
join.getHashJoinConjuncts());
     }
 
     /**
      * could do colocate join with left and right child distribution spec.
      */
-    public static boolean couldColocateJoin(DistributionSpecHash leftHashSpec, 
DistributionSpecHash rightHashSpec) {
+    public static boolean couldColocateJoin(DistributionSpecHash leftHashSpec, 
DistributionSpecHash rightHashSpec,
+            List<Expression> conjuncts) {
         if (ConnectContext.get() == null
                 || 
ConnectContext.get().getSessionVariable().isDisableColocatePlan()) {
             return false;
@@ -285,12 +288,50 @@ public class JoinUtils {
         boolean noNeedCheckColocateGroup = hitSameIndex && 
(leftTablePartitions.equals(rightTablePartitions))
                 && (leftTablePartitions.size() <= 1);
         ColocateTableIndex colocateIndex = Env.getCurrentColocateIndex();
-        if (noNeedCheckColocateGroup
-                || (colocateIndex.isSameGroup(leftTableId, rightTableId)
-                && 
!colocateIndex.isGroupUnstable(colocateIndex.getGroup(leftTableId)))) {
+        if (noNeedCheckColocateGroup) {
             return true;
         }
-        return false;
+        if (!colocateIndex.isSameGroup(leftTableId, rightTableId)
+                || 
colocateIndex.isGroupUnstable(colocateIndex.getGroup(leftTableId))) {
+            return false;
+        }
+
+        Set<Integer> equalIndices = new HashSet<>();
+        for (Expression expr : conjuncts) {
+            // only simple equal predicate can use colocate join
+            if (!(expr instanceof EqualPredicate)) {
+                return false;
+            }
+            Expression leftChild = ((EqualPredicate) expr).left();
+            Expression rightChild = ((EqualPredicate) expr).right();
+            if (!(leftChild instanceof SlotReference) || !(rightChild 
instanceof SlotReference)) {
+                return false;
+            }
+
+            SlotReference leftSlot = (SlotReference) leftChild;
+            SlotReference rightSlot = (SlotReference) rightChild;
+            Integer leftIndex = null;
+            Integer rightIndex = null;
+            if (leftSlot.getTable().isPresent() && 
leftSlot.getTable().get().getId() == leftHashSpec.getTableId()) {
+                leftIndex = 
leftHashSpec.getExprIdToEquivalenceSet().get(leftSlot.getExprId());
+                rightIndex = 
rightHashSpec.getExprIdToEquivalenceSet().get(rightSlot.getExprId());
+            } else {
+                leftIndex = 
rightHashSpec.getExprIdToEquivalenceSet().get(leftSlot.getExprId());
+                rightIndex = 
leftHashSpec.getExprIdToEquivalenceSet().get(rightSlot.getExprId());
+            }
+            if (!Objects.equals(leftIndex, rightIndex)) {
+                return false;
+            }
+            if (leftIndex != null) {
+                equalIndices.add(leftIndex);
+            }
+        }
+        // on conditions must contain all distributed columns
+        if 
(equalIndices.containsAll(leftHashSpec.getExprIdToEquivalenceSet().values())) {
+            return true;
+        } else {
+            return false;
+        }
     }
 
     public static Set<ExprId> getJoinOutputExprIdSet(Plan left, Plan right) {
diff --git 
a/regression-test/suites/correctness_p0/test_colocate_join_of_column_order.groovy
 
b/regression-test/suites/correctness_p0/test_colocate_join_of_column_order.groovy
new file mode 100644
index 00000000000..663b7da02d6
--- /dev/null
+++ 
b/regression-test/suites/correctness_p0/test_colocate_join_of_column_order.groovy
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_colocate_join_of_column_order") {
+    sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_t1`; """
+    // distributed by k1,k2
+    sql """
+        CREATE TABLE IF NOT EXISTS `test_colocate_join_of_column_order_t1` (
+        `k1` varchar(64) NULL,
+        `k2` varchar(64) NULL,
+        `v` int NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`k1`,`k2`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`k1`,`k2`) BUCKETS 1
+        PROPERTIES (
+            "replication_num" = "1",
+            "colocate_with" = "group_column_order"
+        );
+    """
+    sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_t2`; """
+    // distributed by k2,k1
+    sql """
+        CREATE TABLE IF NOT EXISTS `test_colocate_join_of_column_order_t2` (
+        `k1` varchar(64) NULL,
+        `k2` varchar(64) NULL,
+        `v` int NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`k1`,`k2`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`k2`,`k1`) BUCKETS 1
+        PROPERTIES (
+            "replication_num" = "1",
+            "colocate_with" = "group_column_order"
+        );
+    """
+    sql """insert into test_colocate_join_of_column_order_t1 
values('k1','k2',11);"""
+    sql """insert into test_colocate_join_of_column_order_t2 
values('k1','k2',11);"""
+
+    sql """set enable_nereids_planner=true; """
+    explain {
+        sql("select * from test_colocate_join_of_column_order_t1 a join 
test_colocate_join_of_column_order_t2 b on a.k1=b.k1 and a.k2=b.k2;")
+        notContains "COLOCATE"
+    }
+    explain {
+        sql("select * from test_colocate_join_of_column_order_t1 a join 
test_colocate_join_of_column_order_t2 b on a.k1=b.k2;")
+        notContains "COLOCATE"
+    }
+    explain {
+        sql("select * from test_colocate_join_of_column_order_t1 a join 
test_colocate_join_of_column_order_t2 b on a.k1=b.k1;")
+        notContains "COLOCATE"
+    }
+    explain {
+        sql("select * from test_colocate_join_of_column_order_t1 a join 
test_colocate_join_of_column_order_t2 b on a.k1=b.k2 and a.v=b.v;")
+        notContains "COLOCATE"
+    }
+    explain {
+        sql("select * from test_colocate_join_of_column_order_t1 a join 
test_colocate_join_of_column_order_t2 b on a.k1=b.k2 and a.k2=b.k1;")
+        contains "COLOCATE"
+    }
+    explain {
+        sql("select * from test_colocate_join_of_column_order_t1 a join 
test_colocate_join_of_column_order_t2 b on a.k1=b.k2 and a.k2=b.k1 and 
a.v=b.v;")
+        contains "COLOCATE"
+    }
+
+    sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_t1`; """
+    sql """ DROP TABLE IF EXISTS `test_colocate_join_of_column_order_t2`; """
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to