This is an automated email from the ASF dual-hosted git repository.

huajianlan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a2786fb8b2 [fix](nereids) fix bucket shuffle join to right compute 
wrong output property (#47888)
2a2786fb8b2 is described below

commit 2a2786fb8b23132846a5bf2d9f7d924d0c37dd67
Author: 924060929 <lanhuaj...@selectdb.com>
AuthorDate: Mon Feb 17 10:43:04 2025 +0800

    [fix](nereids) fix bucket shuffle join to right compute wrong output 
property (#47888)
    
    fix Illegal bucket shuffle join or colocate join in fragment because
    compute wrong join output property, introduced by #41730
    
    the exception:
    ```
    errCode = 2, detailMessage = Illegal bucket shuffle join or colocate join 
in fragment
    ```
---
 .../properties/ChildOutputPropertyDeriver.java     |  26 ++++---
 .../properties/ChildOutputPropertyDeriverTest.java |  12 +++-
 .../distribute/bucket_shuffle_to_right.out         | Bin 0 -> 192 bytes
 .../distribute/bucket_shuffle_to_right.groovy      |  78 +++++++++++++++++++++
 4 files changed, 105 insertions(+), 11 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
index 8dc77a43f50..ef89c327605 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
@@ -505,32 +505,42 @@ public class ChildOutputPropertyDeriver extends 
PlanVisitor<PhysicalProperties,
                     return new PhysicalProperties(
                             DistributionSpecHash.merge(rightHashSpec, 
leftHashSpec, outputShuffleType)
                     );
-                } else {
+                } else if (shuffleSide == ShuffleSide.RIGHT || shuffleSide == 
ShuffleSide.NONE) {
+                    return new PhysicalProperties(
+                            DistributionSpecHash.merge(leftHashSpec, 
rightHashSpec, outputShuffleType)
+                    );
+                } else if (shuffleSide == ShuffleSide.BOTH) {
                     return new PhysicalProperties(
                             DistributionSpecHash.merge(leftHashSpec, 
rightHashSpec, outputShuffleType)
+                                    
.withShuffleTypeAndForbidColocateJoin(leftHashSpec.getShuffleType())
                     );
+                } else {
+                    throw new AnalysisException("unknown shuffle side " + 
shuffleSide);
                 }
             case LEFT_SEMI_JOIN:
             case LEFT_ANTI_JOIN:
             case NULL_AWARE_LEFT_ANTI_JOIN:
             case LEFT_OUTER_JOIN:
-                if (shuffleSide == ShuffleSide.LEFT) {
+                if (shuffleSide == ShuffleSide.LEFT || shuffleSide == 
ShuffleSide.BOTH) {
                     return new PhysicalProperties(
                             
leftHashSpec.withShuffleTypeAndForbidColocateJoin(outputShuffleType)
                     );
-                } else {
+                } else if (shuffleSide == ShuffleSide.RIGHT || shuffleSide == 
ShuffleSide.NONE) {
                     return new PhysicalProperties(leftHashSpec);
+                } else {
+                    throw new AnalysisException("unknown shuffle side " + 
shuffleSide);
                 }
             case RIGHT_SEMI_JOIN:
             case RIGHT_ANTI_JOIN:
             case RIGHT_OUTER_JOIN:
-                if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec, 
hashJoin.getHashJoinConjuncts())) {
+                if (shuffleSide == ShuffleSide.RIGHT || shuffleSide == 
ShuffleSide.BOTH) {
+                    return new PhysicalProperties(
+                            
rightHashSpec.withShuffleTypeAndForbidColocateJoin(outputShuffleType)
+                    );
+                } else if (shuffleSide == ShuffleSide.LEFT || shuffleSide == 
ShuffleSide.NONE) {
                     return new PhysicalProperties(rightHashSpec);
                 } else {
-                    // retain left shuffle type, since coordinator use left 
most node to schedule fragment
-                    // forbid colocate join, since right table already shuffle
-                    return new 
PhysicalProperties(rightHashSpec.withShuffleTypeAndForbidColocateJoin(
-                            leftHashSpec.getShuffleType()));
+                    throw new AnalysisException("unknown shuffle side " + 
shuffleSide);
                 }
             case FULL_OUTER_JOIN:
                 return PhysicalProperties.createAnyFromHash(leftHashSpec, 
rightHashSpec);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriverTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriverTest.java
index 8351b9f8c22..cc26ba2945b 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriverTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriverTest.java
@@ -55,6 +55,7 @@ import org.apache.doris.nereids.types.TinyIntType;
 import org.apache.doris.nereids.util.ExpressionUtils;
 import org.apache.doris.nereids.util.JoinUtils;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
@@ -390,11 +391,12 @@ class ChildOutputPropertyDeriverTest {
         GroupExpression groupExpression = new GroupExpression(join);
         new Group(null, groupExpression, null);
 
+        long leftTableId = 0L;
         PhysicalProperties left = new PhysicalProperties(
                 new DistributionSpecHash(
                         Lists.newArrayList(new ExprId(0)),
                         ShuffleType.NATURAL,
-                        0,
+                        leftTableId,
                         Sets.newHashSet(0L)
                 ),
                 new OrderSpec(
@@ -402,10 +404,11 @@ class ChildOutputPropertyDeriverTest {
                                 true, true)))
         );
 
+        long rightTableId = 1L;
         PhysicalProperties right = new PhysicalProperties(new 
DistributionSpecHash(
                 Lists.newArrayList(new ExprId(1)),
                 ShuffleType.NATURAL,
-                1,
+                rightTableId,
                 Sets.newHashSet(1L)
         ));
 
@@ -416,8 +419,11 @@ class ChildOutputPropertyDeriverTest {
         Assertions.assertTrue(result.getOrderSpec().getOrderKeys().isEmpty());
         Assertions.assertInstanceOf(DistributionSpecHash.class, 
result.getDistributionSpec());
         DistributionSpecHash actual = (DistributionSpecHash) 
result.getDistributionSpec();
+
         Assertions.assertEquals(ShuffleType.NATURAL, actual.getShuffleType());
-        Assertions.assertEquals(-1, actual.getTableId());
+        Assertions.assertEquals(
+                SessionVariable.canUseNereidsDistributePlanner() ? 
rightTableId : -1L, actual.getTableId()
+        );
         // check merged
         Assertions.assertEquals(1, actual.getExprIdToEquivalenceSet().size());
         Assertions.assertEquals(1, 
actual.getExprIdToEquivalenceSet().keySet().iterator().next().asInt());
diff --git 
a/regression-test/data/nereids_syntax_p0/distribute/bucket_shuffle_to_right.out 
b/regression-test/data/nereids_syntax_p0/distribute/bucket_shuffle_to_right.out
new file mode 100644
index 00000000000..b17e5bf2ec5
Binary files /dev/null and 
b/regression-test/data/nereids_syntax_p0/distribute/bucket_shuffle_to_right.out 
differ
diff --git 
a/regression-test/suites/nereids_syntax_p0/distribute/bucket_shuffle_to_right.groovy
 
b/regression-test/suites/nereids_syntax_p0/distribute/bucket_shuffle_to_right.groovy
new file mode 100644
index 00000000000..6bd2ff14973
--- /dev/null
+++ 
b/regression-test/suites/nereids_syntax_p0/distribute/bucket_shuffle_to_right.groovy
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("bucket_shuffle_to_right") {
+    multi_sql """
+        set enable_nereids_distribute_planner=true;
+        set disable_join_reorder=true;
+        
+        drop table if exists 
table_20_undef_partitions2_keys3_properties4_distributed_by53;
+        create table 
table_20_undef_partitions2_keys3_properties4_distributed_by53 (
+                                                                               
        `pk` int,
+                                                                               
        `col_varchar_10__undef_signed` varchar(10)   ,
+                                                                               
        `col_int_undef_signed` int   ,
+                                                                               
        `col_varchar_1024__undef_signed` varchar(1024)
+        ) engine=olap
+        DUPLICATE KEY(pk, col_varchar_10__undef_signed)
+        
+        distributed by hash(pk) buckets 10
+        properties("replication_num" = "1");
+        insert into 
table_20_undef_partitions2_keys3_properties4_distributed_by53(pk,col_int_undef_signed,col_varchar_10__undef_signed,col_varchar_1024__undef_signed)
 values 
(0,null,"at",'b'),(1,null,'e',"your"),(2,6,'h',"look"),(3,3,'z',"to"),(4,6,'q',"did"),(5,7,"come",'g'),(6,null,'x',"his"),(7,null,'w','s'),(8,null,"don't",'l'),(9,null,"and","know"),(10,null,'q','c'),(11,null,'u','w'),(12,9,'c','x'),(13,null,"my","or"),(14,null,'a','i'),(15,null,"look",'u'),(16,2,"were","be"),(17,nul
 [...]
+        
+        
+        drop table if exists 
table_23_undef_partitions2_keys3_properties4_distributed_by52;
+        create table 
table_23_undef_partitions2_keys3_properties4_distributed_by52 (
+                                                                               
        `pk` int,
+                                                                               
        `col_int_undef_signed` int   ,
+                                                                               
        `col_varchar_10__undef_signed` varchar(10)   ,
+                                                                               
        `col_varchar_1024__undef_signed` varchar(1024) MIN
+        ) engine=olap
+        AGGREGATE KEY(pk, col_int_undef_signed, col_varchar_10__undef_signed)
+        
+        distributed by hash(pk) buckets 10
+        properties("replication_num" = "1");
+        insert into 
table_23_undef_partitions2_keys3_properties4_distributed_by52(pk,col_int_undef_signed,col_varchar_10__undef_signed,col_varchar_1024__undef_signed)
 values 
(0,null,"to","so"),(1,null,'u','j'),(2,null,"say","would"),(3,null,'t',"to"),(4,0,"your",'q'),(5,1,'w',"if"),(6,null,"right",'p'),(7,7,'h',"her"),(8,6,"that",'v'),(9,5,'k',"as"),(10,null,"know","did"),(11,9,"to",'q'),(12,null,"look","don't"),(13,9,"say",'v'),(14,null,'m','j'),(15,9,"i","want"),(16,4,"then","why"),(17
 [...]
+        
+        
+        drop table if exists 
table_100_undef_partitions2_keys3_properties4_distributed_by5;
+        create table 
table_100_undef_partitions2_keys3_properties4_distributed_by5 (
+                                                                               
        `col_int_undef_signed` int/*agg_type_placeholder*/   ,
+                                                                               
        `col_varchar_10__undef_signed` varchar(10)/*agg_type_placeholder*/   ,
+                                                                               
        `col_varchar_1024__undef_signed` varchar(1024)/*agg_type_placeholder*/  
 ,
+                                                                               
        `pk` int/*agg_type_placeholder*/
+        ) engine=olap
+        
+        
+        distributed by hash(pk) buckets 10
+        properties("replication_num" = "1");
+        insert into 
table_100_undef_partitions2_keys3_properties4_distributed_by5(pk,col_int_undef_signed,col_varchar_10__undef_signed,col_varchar_1024__undef_signed)
 values 
(0,null,"when","yes"),(1,null,"do",'i'),(2,1,"all","didn't"),(3,null,"don't","who"),(4,9,"your","it"),(5,5,'n','c'),(6,0,"up","it's"),(7,9,'d','a'),(8,3,"yeah",'v'),(9,null,'r','s'),(10,5,'s','n'),(11,null,'w','l'),(12,null,'k',"she"),(13,1,"from","what"),(14,1,'t',"at"),(15,null,"something",'s'),(16,null,'q',"his"),
 [...]
+        """
+
+    order_qt_bucket_shuffle_to_right """
+        select alias1 . `col_int_undef_signed` AS field1
+        from table_100_undef_partitions2_keys3_properties4_distributed_by5 as 
alias3
+        right outer join
+        (
+            select alias1 . `pk`, alias1 . `col_int_undef_signed`
+            from table_23_undef_partitions2_keys3_properties4_distributed_by52 
as alias2
+            right outer join
+            table_20_undef_partitions2_keys3_properties4_distributed_by53 as 
alias1
+            on  alias1 . `pk` = alias2 . `col_int_undef_signed`
+        ) alias1 ON alias1 . `pk` = alias3 . `pk`
+        WHERE alias1 . `pk` >= 0;
+        """
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to