This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit b8ba96e73e070706c0874018a9248f130928b75d
Author: morrySnow <101034200+morrys...@users.noreply.github.com>
AuthorDate: Sat Jun 11 21:44:47 2022 +0800

    [fix](planner) produce wrong result when use bucket shuffle join with 
colocate left table (#10045)
    
    When plan bucket shuffle join, we need to know left table bucket number.
    Currently, we use tablet number directly based on the assumption that left 
table has only one partition.
    But, when left table is colocated table, it could have more than one 
partition.
    In this case, some data in right table will be dropped incorrectly and 
produce wrong result for query.
    
    reproduce could follow regression test in PR.
---
 .../main/java/org/apache/doris/qe/Coordinator.java | 16 ++++-
 .../test_bucket_join_with_colocate_table.out       |  7 ++
 .../test_bucket_join_with_colocate_table.groovy    | 80 ++++++++++++++++++++++
 3 files changed, 100 insertions(+), 3 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 86225e38b5..75dc09b9f5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1850,9 +1850,19 @@ public class Coordinator {
         private void computeScanRangeAssignmentByBucket(
                 final OlapScanNode scanNode, ImmutableMap<Long, Backend> 
idToBackend, Map<TNetworkAddress, Long> addressToBackendID) throws Exception {
             if 
(!fragmentIdToSeqToAddressMap.containsKey(scanNode.getFragmentId())) {
-                // The bucket shuffle join only hit when the partition is one. 
so the totalTabletsNum is all tablet of
-                // one hit partition. can be the right bucket num in bucket 
shuffle join
-                fragmentIdToBucketNumMap.put(scanNode.getFragmentId(), 
(int)scanNode.getTotalTabletsNum());
+                // In bucket shuffle join, we have 2 situation.
+                // 1. Only one partition: in this case, we use 
scanNode.getTotalTabletsNum() to get the right bucket num
+                //    because when table turn on dynamic partition, the bucket 
number in default distribution info
+                //    is not correct.
+                // 2. Table is colocated: in this case, table could have more 
than one partition, but all partition's
+                //    bucket number must be same, so we use default bucket num 
is ok.
+                int bucketNum = 0;
+                if (scanNode.getOlapTable().isColocateTable()) {
+                    bucketNum = 
scanNode.getOlapTable().getDefaultDistributionInfo().getBucketNum();
+                } else {
+                    bucketNum = (int) (scanNode.getTotalTabletsNum());
+                }
+                fragmentIdToBucketNumMap.put(scanNode.getFragmentId(), 
bucketNum);
                 fragmentIdToSeqToAddressMap.put(scanNode.getFragmentId(), new 
HashedMap());
                 
fragmentIdBucketSeqToScanRangeMap.put(scanNode.getFragmentId(), new 
BucketSeqToScanRange());
                 
fragmentIdToBuckendIdBucketCountMap.put(scanNode.getFragmentId(), new 
HashMap<>());
diff --git 
a/regression-test/data/correctness/test_bucket_join_with_colocate_table.out 
b/regression-test/data/correctness/test_bucket_join_with_colocate_table.out
new file mode 100644
index 0000000000..0cf6530614
--- /dev/null
+++ b/regression-test/data/correctness/test_bucket_join_with_colocate_table.out
@@ -0,0 +1,7 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select --
+2      2       2       2       2
+1      1       1       1       1
+3      3       3       3       3
+\N     \N      \N      4       4
+
diff --git 
a/regression-test/suites/correctness/test_bucket_join_with_colocate_table.groovy
 
b/regression-test/suites/correctness/test_bucket_join_with_colocate_table.groovy
new file mode 100644
index 0000000000..4c8f0d0f18
--- /dev/null
+++ 
b/regression-test/suites/correctness/test_bucket_join_with_colocate_table.groovy
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+ // or more contributor license agreements.  See the NOTICE file
+ // distributed with this work for additional information
+ // regarding copyright ownership.  The ASF licenses this file
+ // to you under the Apache License, Version 2.0 (the
+ // "License"); you may not use this file except in compliance
+ // with the License.  You may obtain a copy of the License at
+ //
+ //   http://www.apache.org/licenses/LICENSE-2.0
+ //
+ // Unless required by applicable law or agreed to in writing,
+ // software distributed under the License is distributed on an
+ // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ // KIND, either express or implied.  See the License for the
+ // specific language governing permissions and limitations
+ // under the License.
+
+ suite("test_bucket_join_with_colocate_table") {
+     def colocateTableName = "colocate_table"
+     def rightTable = "right_table"
+
+
+     sql """ DROP TABLE IF EXISTS ${colocateTableName} """
+     sql """ DROP TABLE IF EXISTS ${rightTable} """
+     sql """
+         CREATE TABLE `${colocateTableName}` (
+           `c1` int(11) NULL COMMENT "",
+           `c2` int(11) NULL COMMENT "",
+           `c3` int(11) NULL COMMENT ""
+         ) ENGINE=OLAP
+         DUPLICATE KEY(`c1`, `c2`, `c3`)
+         COMMENT "OLAP"
+         PARTITION BY RANGE(`c2`)
+         (PARTITION p1 VALUES [("-2147483648"), ("2")),
+         PARTITION p2 VALUES [("2"), (MAXVALUE)))
+         DISTRIBUTED BY HASH(`c1`) BUCKETS 8
+         PROPERTIES (
+           "replication_allocation" = "tag.location.default: 1",
+           "colocate_with" = "group1",
+           "in_memory" = "false",
+           "storage_format" = "V2"
+         )
+     """
+     sql """
+         CREATE TABLE `${rightTable}` (
+           `k1` int(11) NOT NULL COMMENT "",
+           `v1` int(11) NOT NULL COMMENT ""
+         ) ENGINE=OLAP
+         DUPLICATE KEY(`k1`, `v1`)
+         COMMENT "OLAP"
+         DISTRIBUTED BY HASH(`k1`) BUCKETS 10
+         PROPERTIES (
+           "replication_allocation" = "tag.location.default: 1",
+           "in_memory" = "false",
+           "storage_format" = "V2"
+         )
+     """
+
+     sql """ INSERT INTO ${colocateTableName} VALUES
+         (0, 0, 0),
+         (1, 1, 1),
+         (2, 2, 2),
+         (3, 3, 3)
+         ;
+     """
+
+     sql """ INSERT INTO ${rightTable} VALUES
+         (1, 1),
+         (2, 2),
+         (3, 3),
+         (4, 4)
+         ;
+     """
+
+     // test_vectorized
+     sql """ set enable_vectorized_engine = true; """
+
+     qt_select """  select * from ${colocateTableName} right outer join 
${rightTable} on ${colocateTableName}.c1 = ${rightTable}.k1; """
+ }
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to