This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new f5c5fbadf62 [fix](local shuffle) Fix correctness for bucket hash shuffle exchanger (#39568) f5c5fbadf62 is described below commit f5c5fbadf62f0c553b9efb3d8a92c28d49fec789 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Wed Aug 21 08:56:42 2024 +0800 [fix](local shuffle) Fix correctness for bucket hash shuffle exchanger (#39568) ## Proposed changes For query plan  we will plan local exchangers and get a new plan  and the hash join operator will get probe and build data which are different distributed (one is HASH shuffle and another is Bucket hash shuffle). This PR fix it. <!--Describe your changes.--> --- .../local_exchange_sink_operator.cpp | 14 ++-- be/src/pipeline/local_exchange/local_exchanger.cpp | 34 ++++++++- .../nereids_p0/join/test_join_local_shuffle.out | 4 ++ .../nereids_p0/join/test_join_local_shuffle.groovy | 81 ++++++++++++++++++++++ 4 files changed, 123 insertions(+), 10 deletions(-) diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp index 97acd2a8070..91a2c630418 100644 --- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp +++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp @@ -40,7 +40,7 @@ Status LocalExchangeSinkOperatorX::init(ExchangeType type, const int num_buckets const std::map<int, int>& shuffle_idx_to_instance_idx) { _name = "LOCAL_EXCHANGE_SINK_OPERATOR (" + get_exchange_type_name(type) + ")"; _type = type; - if (_type == ExchangeType::HASH_SHUFFLE) { + if (_type == ExchangeType::HASH_SHUFFLE || _type == ExchangeType::BUCKET_HASH_SHUFFLE) { // For shuffle join, if data distribution has been broken by previous operator, we // should use a HASH_SHUFFLE local exchanger to shuffle data again. To be mentioned, // we should use map shuffle idx to instance idx because all instances will be @@ -57,17 +57,17 @@ Status LocalExchangeSinkOperatorX::init(ExchangeType type, const int num_buckets _shuffle_idx_to_instance_idx[i] = {i, i}; } } - _partitioner.reset(new vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>( - _num_partitions)); - RETURN_IF_ERROR(_partitioner->init(_texprs)); - } else if (_type == ExchangeType::BUCKET_HASH_SHUFFLE) { _partitioner.reset( - new vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(num_buckets)); + _type == ExchangeType::HASH_SHUFFLE + ? new vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>( + _num_partitions) + : new vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>( + num_buckets)); RETURN_IF_ERROR(_partitioner->init(_texprs)); } - return Status::OK(); } + Status LocalExchangeSinkOperatorX::prepare(RuntimeState* state) { if (_type == ExchangeType::HASH_SHUFFLE || _type == ExchangeType::BUCKET_HASH_SHUFFLE) { RETURN_IF_ERROR(_partitioner->prepare(state, _child_x->row_desc())); diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp b/be/src/pipeline/local_exchange/local_exchanger.cpp index e10da2beb72..79fbb0f8d06 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/local_exchange/local_exchanger.cpp @@ -194,7 +194,15 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest return Status::OK(); } local_state._shared_state->add_total_mem_usage(new_block_wrapper->data_block.allocated_bytes()); + auto bucket_seq_to_instance_idx = + local_state._parent->cast<LocalExchangeSinkOperatorX>()._bucket_seq_to_instance_idx; if (get_type() == ExchangeType::HASH_SHUFFLE) { + /** + * If type is `HASH_SHUFFLE`, data are hash-shuffled and distributed to all instances of + * all BEs. So we need a shuffleId-To-InstanceId mapping. + * For example, row 1 get a hash value 1 which means we should distribute to instance 1 on + * BE 1 and row 2 get a hash value 2 which means we should distribute to instance 1 on BE 3. + */ const auto& map = local_state._parent->cast<LocalExchangeSinkOperatorX>() ._shuffle_idx_to_instance_idx; new_block_wrapper->ref(map.size()); @@ -211,6 +219,7 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest } } } else if (_num_senders != _num_sources || _ignore_source_data_distribution) { + // In this branch, data just should be distributed equally into all instances. new_block_wrapper->ref(_num_partitions); for (size_t i = 0; i < _num_partitions; i++) { uint32_t start = local_state._partition_rows_histogram[i]; @@ -222,15 +231,34 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest new_block_wrapper->unref(local_state._shared_state); } } + } else if (bucket_seq_to_instance_idx.empty()) { + /** + * If type is `BUCKET_HASH_SHUFFLE` and `_bucket_seq_to_instance_idx` is empty, which + * means no scan operators is included in this fragment so we also need a `HASH_SHUFFLE` here. + */ + const auto& map = local_state._parent->cast<LocalExchangeSinkOperatorX>() + ._shuffle_idx_to_instance_idx; + DCHECK(!map.empty()); + new_block_wrapper->ref(map.size()); + for (const auto& it : map) { + DCHECK(it.second >= 0 && it.second < _num_partitions) + << it.first << " : " << it.second << " " << _num_partitions; + uint32_t start = local_state._partition_rows_histogram[it.first]; + uint32_t size = local_state._partition_rows_histogram[it.first + 1] - start; + if (size > 0) { + _enqueue_data_and_set_ready(it.second, local_state, + {new_block_wrapper, {row_idx, start, size}}); + } else { + new_block_wrapper->unref(local_state._shared_state); + } + } } else { new_block_wrapper->ref(_num_partitions); - auto map = - local_state._parent->cast<LocalExchangeSinkOperatorX>()._bucket_seq_to_instance_idx; for (size_t i = 0; i < _num_partitions; i++) { uint32_t start = local_state._partition_rows_histogram[i]; uint32_t size = local_state._partition_rows_histogram[i + 1] - start; if (size > 0) { - _enqueue_data_and_set_ready(map[i], local_state, + _enqueue_data_and_set_ready(bucket_seq_to_instance_idx[i], local_state, {new_block_wrapper, {row_idx, start, size}}); } else { new_block_wrapper->unref(local_state._shared_state); diff --git a/regression-test/data/nereids_p0/join/test_join_local_shuffle.out b/regression-test/data/nereids_p0/join/test_join_local_shuffle.out new file mode 100644 index 00000000000..34b95802f6a --- /dev/null +++ b/regression-test/data/nereids_p0/join/test_join_local_shuffle.out @@ -0,0 +1,4 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +0 1 2 0 + diff --git a/regression-test/suites/nereids_p0/join/test_join_local_shuffle.groovy b/regression-test/suites/nereids_p0/join/test_join_local_shuffle.groovy new file mode 100644 index 00000000000..c66131b57dc --- /dev/null +++ b/regression-test/suites/nereids_p0/join/test_join_local_shuffle.groovy @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_join_local_shuffle", "query,p0") { + sql "SET enable_nereids_planner=true" + sql "SET enable_fallback_to_original_planner=false" + sql """ + CREATE TABLE test_join_local_shuffle_1 ( + `c1` int(11) NULL COMMENT "", + `c2` int(11) NULL COMMENT "" + ) ENGINE=OLAP + DUPLICATE KEY(`c1`) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`c1`) BUCKETS 16 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + sql """ + CREATE TABLE test_join_local_shuffle_2 ( + `c1` int(11) NULL COMMENT "", + `c2` int(11) NULL COMMENT "" + ) ENGINE=OLAP + DUPLICATE KEY(`c1`) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`c1`) BUCKETS 16 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + sql """ + CREATE TABLE test_join_local_shuffle_3 ( + `c1` int(11) NULL COMMENT "", + `c2` int(11) NULL COMMENT "" + ) ENGINE=OLAP + DUPLICATE KEY(`c1`) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`c1`) BUCKETS 16 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + sql """ + CREATE TABLE test_join_local_shuffle_4 ( + `c1` int(11) NULL COMMENT "", + `c2` int(11) NULL COMMENT "" + ) ENGINE=OLAP + DUPLICATE KEY(`c1`) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`c1`) BUCKETS 16 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + sql "insert into test_join_local_shuffle_1 values(0, 1);" + sql "insert into test_join_local_shuffle_2 values(2, 0);" + sql "insert into test_join_local_shuffle_3 values(2, 0);" + sql "insert into test_join_local_shuffle_4 values(0, 1);" + qt_sql " select /*+SET_VAR(disable_join_reorder=true,enable_local_shuffle=true) */ * from (select c1, max(c2) from (select b.c1 c1, b.c2 c2 from test_join_local_shuffle_3 a join [shuffle] test_join_local_shuffle_1 b on a.c2 = b.c1 join [broadcast] test_join_local_shuffle_4 c on b.c1 = c.c1) t1 group by c1) t, test_join_local_shuffle_2 where t.c1 = test_join_local_shuffle_2.c2; " + + sql "DROP TABLE IF EXISTS test_join_local_shuffle_1;" + sql "DROP TABLE IF EXISTS test_join_local_shuffle_2;" + sql "DROP TABLE IF EXISTS test_join_local_shuffle_3;" + sql "DROP TABLE IF EXISTS test_join_local_shuffle_4;" +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org