This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new dea016321ec [fix](coordinator) Fix wrong `recvrId` in fragment contains BHJ (#47728) dea016321ec is described below commit dea016321ec92ba63451a6307c134daa47c6df82 Author: Gabriel <liwenqi...@selectdb.com> AuthorDate: Tue Feb 11 10:44:48 2025 +0800 [fix](coordinator) Fix wrong `recvrId` in fragment contains BHJ (#47728) Pick #47727 In Coordinator, a shuffle map consists of `recvrId` in each instance. For example, if 3 BEs exist in a cluster, for a shuffled hash join, we get 3 maps for a fragment sent to each BE: BE0: {0:0, 1:1} BE1: {2:0, 3:1} BE2: {6:0, 5:1} In this example, parallelism is 2. Keys in shuffle map indicate the global shuffle id and the values indicate the instance id in current BE. In Coordinator, the `recvrId` is the global shuffle id of each instance so we may get a wrong result if it is wrong. This bug is caused by `recvrId` set by a BHJ fragment. If a fragment contains both BHJ and SHJ, `recvrId` should be set by SHJ and BHJ should be ignored. --- be/src/pipeline/local_exchange/local_exchanger.cpp | 14 ++++++++++++++ .../src/main/java/org/apache/doris/qe/Coordinator.java | 2 -- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp b/be/src/pipeline/local_exchange/local_exchanger.cpp index 3ec4f537e47..09fdf768af7 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/local_exchange/local_exchanger.cpp @@ -205,6 +205,7 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest local_state._channel_id); auto bucket_seq_to_instance_idx = local_state._parent->cast<LocalExchangeSinkOperatorX>()._bucket_seq_to_instance_idx; + int32_t enqueue_rows = 0; if (get_type() == ExchangeType::HASH_SHUFFLE) { /** * If type is `HASH_SHUFFLE`, data are hash-shuffled and distributed to all instances of @@ -221,12 +222,25 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest uint32_t start = local_state._partition_rows_histogram[it.first]; uint32_t size = local_state._partition_rows_histogram[it.first + 1] - start; if (size > 0) { + enqueue_rows += size; _enqueue_data_and_set_ready(it.second, local_state, {new_block_wrapper, {row_idx, start, size}}); } else { new_block_wrapper->unref(local_state._shared_state, local_state._channel_id); } } + if (enqueue_rows != rows) [[unlikely]] { + fmt::memory_buffer debug_string_buffer; + fmt::format_to(debug_string_buffer, "Type: {}, Local Exchange Id: {}, Shuffled Map: ", + get_exchange_type_name(get_type()), local_state.parent()->node_id()); + for (const auto& it : map) { + fmt::format_to(debug_string_buffer, "[{}:{}], ", it.first, it.second); + } + return Status::InternalError( + "Rows mismatched! Data may be lost. [Expected enqueue rows={}, Real enqueue " + "rows={}, Detail: {}]", + rows, enqueue_rows, fmt::to_string(debug_string_buffer)); + } } else { DCHECK(!bucket_seq_to_instance_idx.empty()); new_block_wrapper->ref(_num_partitions); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 774e4efa432..58631524024 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1447,7 +1447,6 @@ public class Coordinator implements CoordInterface { } else { destHosts.put(param.host, param); TPlanFragmentDestination dest = new TPlanFragmentDestination(); - param.recvrId = params.destinations.size(); dest.fragment_instance_id = param.instanceId; try { dest.server = toRpcHost(param.host); @@ -1593,7 +1592,6 @@ public class Coordinator implements CoordInterface { destHosts.put(param.host, param); TPlanFragmentDestination dest = new TPlanFragmentDestination(); dest.fragment_instance_id = param.instanceId; - param.recvrId = params.destinations.size(); try { dest.server = toRpcHost(param.host); dest.setBrpcServer(toBrpcHost(param.host)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org