This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f5c5fbadf62 [fix](local shuffle) Fix correctness for bucket hash 
shuffle exchanger (#39568)
f5c5fbadf62 is described below

commit f5c5fbadf62f0c553b9efb3d8a92c28d49fec789
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Wed Aug 21 08:56:42 2024 +0800

    [fix](local shuffle) Fix correctness for bucket hash shuffle exchanger 
(#39568)
    
    ## Proposed changes
    
    For query plan
    
    
    
![image](https://github.com/user-attachments/assets/334cc4c4-49ae-4330-83ff-03b9bae00e3c)
    
    
    we will plan local exchangers  and get a new plan
    
    
    
![image](https://github.com/user-attachments/assets/2b8ece64-3aa0-423c-9db0-fd02024957db)
    
    
    and the hash join operator will get probe and build data which are
    different distributed (one is HASH shuffle and another is Bucket hash
    shuffle). This PR fix it.
    <!--Describe your changes.-->
---
 .../local_exchange_sink_operator.cpp               | 14 ++--
 be/src/pipeline/local_exchange/local_exchanger.cpp | 34 ++++++++-
 .../nereids_p0/join/test_join_local_shuffle.out    |  4 ++
 .../nereids_p0/join/test_join_local_shuffle.groovy | 81 ++++++++++++++++++++++
 4 files changed, 123 insertions(+), 10 deletions(-)

diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp 
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
index 97acd2a8070..91a2c630418 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
@@ -40,7 +40,7 @@ Status LocalExchangeSinkOperatorX::init(ExchangeType type, 
const int num_buckets
                                         const std::map<int, int>& 
shuffle_idx_to_instance_idx) {
     _name = "LOCAL_EXCHANGE_SINK_OPERATOR (" + get_exchange_type_name(type) + 
")";
     _type = type;
-    if (_type == ExchangeType::HASH_SHUFFLE) {
+    if (_type == ExchangeType::HASH_SHUFFLE || _type == 
ExchangeType::BUCKET_HASH_SHUFFLE) {
         // For shuffle join, if data distribution has been broken by previous 
operator, we
         // should use a HASH_SHUFFLE local exchanger to shuffle data again. To 
be mentioned,
         // we should use map shuffle idx to instance idx because all instances 
will be
@@ -57,17 +57,17 @@ Status LocalExchangeSinkOperatorX::init(ExchangeType type, 
const int num_buckets
                 _shuffle_idx_to_instance_idx[i] = {i, i};
             }
         }
-        _partitioner.reset(new 
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
-                _num_partitions));
-        RETURN_IF_ERROR(_partitioner->init(_texprs));
-    } else if (_type == ExchangeType::BUCKET_HASH_SHUFFLE) {
         _partitioner.reset(
-                new 
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(num_buckets));
+                _type == ExchangeType::HASH_SHUFFLE
+                        ? new 
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
+                                  _num_partitions)
+                        : new 
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
+                                  num_buckets));
         RETURN_IF_ERROR(_partitioner->init(_texprs));
     }
-
     return Status::OK();
 }
+
 Status LocalExchangeSinkOperatorX::prepare(RuntimeState* state) {
     if (_type == ExchangeType::HASH_SHUFFLE || _type == 
ExchangeType::BUCKET_HASH_SHUFFLE) {
         RETURN_IF_ERROR(_partitioner->prepare(state, _child_x->row_desc()));
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp 
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index e10da2beb72..79fbb0f8d06 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -194,7 +194,15 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
         return Status::OK();
     }
     
local_state._shared_state->add_total_mem_usage(new_block_wrapper->data_block.allocated_bytes());
+    auto bucket_seq_to_instance_idx =
+            
local_state._parent->cast<LocalExchangeSinkOperatorX>()._bucket_seq_to_instance_idx;
     if (get_type() == ExchangeType::HASH_SHUFFLE) {
+        /**
+         * If type is `HASH_SHUFFLE`, data are hash-shuffled and distributed 
to all instances of
+         * all BEs. So we need a shuffleId-To-InstanceId mapping.
+         * For example, row 1 get a hash value 1 which means we should 
distribute to instance 1 on
+         * BE 1 and row 2 get a hash value 2 which means we should distribute 
to instance 1 on BE 3.
+         */
         const auto& map = 
local_state._parent->cast<LocalExchangeSinkOperatorX>()
                                   ._shuffle_idx_to_instance_idx;
         new_block_wrapper->ref(map.size());
@@ -211,6 +219,7 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
             }
         }
     } else if (_num_senders != _num_sources || 
_ignore_source_data_distribution) {
+        // In this branch, data just should be distributed equally into all 
instances.
         new_block_wrapper->ref(_num_partitions);
         for (size_t i = 0; i < _num_partitions; i++) {
             uint32_t start = local_state._partition_rows_histogram[i];
@@ -222,15 +231,34 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
                 new_block_wrapper->unref(local_state._shared_state);
             }
         }
+    } else if (bucket_seq_to_instance_idx.empty()) {
+        /**
+         * If type is `BUCKET_HASH_SHUFFLE` and `_bucket_seq_to_instance_idx` 
is empty, which
+         * means no scan operators is included in this fragment so we also 
need a `HASH_SHUFFLE` here.
+         */
+        const auto& map = 
local_state._parent->cast<LocalExchangeSinkOperatorX>()
+                                  ._shuffle_idx_to_instance_idx;
+        DCHECK(!map.empty());
+        new_block_wrapper->ref(map.size());
+        for (const auto& it : map) {
+            DCHECK(it.second >= 0 && it.second < _num_partitions)
+                    << it.first << " : " << it.second << " " << 
_num_partitions;
+            uint32_t start = local_state._partition_rows_histogram[it.first];
+            uint32_t size = local_state._partition_rows_histogram[it.first + 
1] - start;
+            if (size > 0) {
+                _enqueue_data_and_set_ready(it.second, local_state,
+                                            {new_block_wrapper, {row_idx, 
start, size}});
+            } else {
+                new_block_wrapper->unref(local_state._shared_state);
+            }
+        }
     } else {
         new_block_wrapper->ref(_num_partitions);
-        auto map =
-                
local_state._parent->cast<LocalExchangeSinkOperatorX>()._bucket_seq_to_instance_idx;
         for (size_t i = 0; i < _num_partitions; i++) {
             uint32_t start = local_state._partition_rows_histogram[i];
             uint32_t size = local_state._partition_rows_histogram[i + 1] - 
start;
             if (size > 0) {
-                _enqueue_data_and_set_ready(map[i], local_state,
+                _enqueue_data_and_set_ready(bucket_seq_to_instance_idx[i], 
local_state,
                                             {new_block_wrapper, {row_idx, 
start, size}});
             } else {
                 new_block_wrapper->unref(local_state._shared_state);
diff --git a/regression-test/data/nereids_p0/join/test_join_local_shuffle.out 
b/regression-test/data/nereids_p0/join/test_join_local_shuffle.out
new file mode 100644
index 00000000000..34b95802f6a
--- /dev/null
+++ b/regression-test/data/nereids_p0/join/test_join_local_shuffle.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+0      1       2       0
+
diff --git 
a/regression-test/suites/nereids_p0/join/test_join_local_shuffle.groovy 
b/regression-test/suites/nereids_p0/join/test_join_local_shuffle.groovy
new file mode 100644
index 00000000000..c66131b57dc
--- /dev/null
+++ b/regression-test/suites/nereids_p0/join/test_join_local_shuffle.groovy
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_join_local_shuffle", "query,p0") {
+    sql "SET enable_nereids_planner=true"
+    sql "SET enable_fallback_to_original_planner=false"
+    sql """
+        CREATE TABLE test_join_local_shuffle_1 (
+            `c1` int(11) NULL COMMENT "",
+            `c2` int(11) NULL COMMENT ""
+          ) ENGINE=OLAP
+          DUPLICATE KEY(`c1`)
+          COMMENT "OLAP"
+          DISTRIBUTED BY HASH(`c1`) BUCKETS 16
+          PROPERTIES (
+            "replication_allocation" = "tag.location.default: 1"
+          );
+        """
+    sql """
+        CREATE TABLE test_join_local_shuffle_2 (
+            `c1` int(11) NULL COMMENT "",
+            `c2` int(11) NULL COMMENT ""
+          ) ENGINE=OLAP
+          DUPLICATE KEY(`c1`)
+          COMMENT "OLAP"
+          DISTRIBUTED BY HASH(`c1`) BUCKETS 16
+          PROPERTIES (
+            "replication_allocation" = "tag.location.default: 1"
+          );
+        """
+
+    sql """
+        CREATE TABLE test_join_local_shuffle_3 (
+            `c1` int(11) NULL COMMENT "",
+            `c2` int(11) NULL COMMENT ""
+          ) ENGINE=OLAP
+          DUPLICATE KEY(`c1`)
+          COMMENT "OLAP"
+          DISTRIBUTED BY HASH(`c1`) BUCKETS 16
+          PROPERTIES (
+            "replication_allocation" = "tag.location.default: 1"
+          );
+            """
+    sql """
+            CREATE TABLE test_join_local_shuffle_4 (
+                `c1` int(11) NULL COMMENT "",
+                `c2` int(11) NULL COMMENT ""
+              ) ENGINE=OLAP
+              DUPLICATE KEY(`c1`)
+              COMMENT "OLAP"
+              DISTRIBUTED BY HASH(`c1`) BUCKETS 16
+              PROPERTIES (
+                "replication_allocation" = "tag.location.default: 1"
+              );
+                """
+
+    sql "insert into test_join_local_shuffle_1 values(0, 1);"
+    sql "insert into test_join_local_shuffle_2 values(2, 0);"
+    sql "insert into test_join_local_shuffle_3 values(2, 0);"
+    sql "insert into test_join_local_shuffle_4 values(0, 1);"
+    qt_sql " select  
/*+SET_VAR(disable_join_reorder=true,enable_local_shuffle=true) */ * from 
(select c1, max(c2) from (select b.c1 c1, b.c2 c2 from 
test_join_local_shuffle_3 a join [shuffle] test_join_local_shuffle_1 b on a.c2 
= b.c1 join [broadcast] test_join_local_shuffle_4 c on b.c1 = c.c1) t1 group by 
c1) t, test_join_local_shuffle_2 where t.c1 = test_join_local_shuffle_2.c2; "
+
+    sql "DROP TABLE IF EXISTS test_join_local_shuffle_1;"
+    sql "DROP TABLE IF EXISTS test_join_local_shuffle_2;"
+    sql "DROP TABLE IF EXISTS test_join_local_shuffle_3;"
+    sql "DROP TABLE IF EXISTS test_join_local_shuffle_4;"
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to