This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 964c9401312 [fix](coordinator) Fix wrong `recvrId` in fragment 
contains BHJ (#47727) (#47729)
964c9401312 is described below

commit 964c9401312cd1ebc8c603fcdcfaec7df3f2d326
Author: Gabriel <liwenqi...@selectdb.com>
AuthorDate: Tue Feb 11 18:49:34 2025 +0800

    [fix](coordinator) Fix wrong `recvrId` in fragment contains BHJ (#47727) 
(#47729)
    
    In Coordinator, a shuffle map consists of `recvrId` in each instance.
    For example, if 3 BEs exist in a cluster, for a shuffled hash join, we
    get 3 maps for a fragment sent to each BE:
    BE0: {0:0, 1:1}
    BE1: {2:0, 3:1}
    BE2: {4:0, 5:1}
    In this example, parallelism is 2. Keys in shuffle map indicate the
    global shuffle id and the values indicate the instance id in current BE.
    In Coordinator, the `recvrId` is the global shuffle id of each instance
    so we may get a wrong result if it is wrong.
    
    This bug is caused by `recvrId` set by a BHJ fragment. If a fragment
    contains both BHJ and SHJ, `recvrId` should be set by SHJ and BHJ should
    be ignored.
---
 .../pipeline/pipeline_x/local_exchange/local_exchanger.cpp | 14 ++++++++++++++
 .../src/main/java/org/apache/doris/qe/Coordinator.java     |  2 --
 2 files changed, 14 insertions(+), 2 deletions(-)

diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
index 016b35b0f25..33312201f4e 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
@@ -157,12 +157,14 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
         const auto& map = 
local_state._parent->cast<LocalExchangeSinkOperatorX>()
                                   ._shuffle_idx_to_instance_idx;
         new_block_wrapper->ref(map.size());
+        uint32_t enqueue_rows = 0;
         for (const auto& it : map) {
             DCHECK(it.second >= 0 && it.second < _num_partitions)
                     << it.first << " : " << it.second << " " << 
_num_partitions;
             uint32_t start = local_state._partition_rows_histogram[it.first];
             uint32_t size = local_state._partition_rows_histogram[it.first + 
1] - start;
             if (size > 0) {
+                enqueue_rows += size;
                 local_state._shared_state->add_mem_usage(
                         it.second, 
new_block_wrapper->data_block.allocated_bytes(), false);
 
@@ -176,6 +178,18 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
                 new_block_wrapper->unref(local_state._shared_state);
             }
         }
+        if (enqueue_rows != rows) [[unlikely]] {
+            fmt::memory_buffer debug_string_buffer;
+            fmt::format_to(debug_string_buffer, "Type: {}, Local Exchange Id: 
{}, Shuffled Map: ",
+                           get_exchange_type_name(get_type()), 
local_state.parent()->node_id());
+            for (const auto& it : map) {
+                fmt::format_to(debug_string_buffer, "[{}:{}], ", it.first, 
it.second);
+            }
+            return Status::InternalError(
+                    "Rows mismatched! Data may be lost. [Expected enqueue 
rows={}, Real enqueue "
+                    "rows={}, Detail: {}]",
+                    rows, enqueue_rows, fmt::to_string(debug_string_buffer));
+        }
     } else if (_num_senders != _num_sources || 
_ignore_source_data_distribution) {
         // In this branch, data just should be distributed equally into all 
instances.
         new_block_wrapper->ref(_num_partitions);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index df6081626ca..82d24ef3fa3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1745,7 +1745,6 @@ public class Coordinator implements CoordInterface {
                             destHosts.put(param.host, param);
                             param.buildHashTableForBroadcastJoin = true;
                             TPlanFragmentDestination dest = new 
TPlanFragmentDestination();
-                            param.recvrId = params.destinations.size();
                             dest.fragment_instance_id = param.instanceId;
                             try {
                                 dest.server = toRpcHost(param.host);
@@ -1870,7 +1869,6 @@ public class Coordinator implements CoordInterface {
                             param.buildHashTableForBroadcastJoin = true;
                             TPlanFragmentDestination dest = new 
TPlanFragmentDestination();
                             dest.fragment_instance_id = param.instanceId;
-                            param.recvrId = params.destinations.size();
                             try {
                                 dest.server = toRpcHost(param.host);
                                 dest.setBrpcServer(toBrpcHost(param.host));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to