This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 55dd58d8994 [Refactor](Exchange) do exchange sink buffer refactor 
(#49335)
55dd58d8994 is described below

commit 55dd58d8994bdd28ee560ad6c2b6788ce63aafdc
Author: HappenLee <happen...@selectdb.com>
AuthorDate: Thu Apr 10 22:30:52 2025 +0800

    [Refactor](Exchange) do exchange sink buffer refactor (#49335)
    
     Do exchange refactor
     * remove unless check
     * change muti map to one struct map, only search one time
     * rename some variables to match code style
---
 be/src/pipeline/exec/exchange_sink_buffer.cpp   | 240 +++++++++++-------------
 be/src/pipeline/exec/exchange_sink_buffer.h     | 130 +++++++------
 be/src/pipeline/exec/exchange_sink_operator.cpp |  32 ++--
 be/src/pipeline/exec/exchange_sink_operator.h   |   8 +-
 be/src/vec/sink/vdata_stream_sender.cpp         |   2 +-
 be/src/vec/sink/vdata_stream_sender.h           |   4 +-
 be/test/pipeline/pipeline_test.cpp              |   2 +-
 be/test/vec/exec/exchange_sink_test.cpp         |  58 +++---
 be/test/vec/exec/exchange_sink_test.h           |   6 +-
 9 files changed, 241 insertions(+), 241 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 90d54a35aa0..95a5a00f68a 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -100,12 +100,7 @@ ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, 
PlanNodeId dest_node_
           _node_id(node_id),
           _state(state),
           _context(state->get_query_ctx()),
-          _exchange_sink_num(sender_ins_ids.size()) {
-    for (auto sender_ins_id : sender_ins_ids) {
-        _queue_deps.emplace(sender_ins_id, nullptr);
-        _parents.emplace(sender_ins_id, nullptr);
-    }
-}
+          _exchange_sink_num(sender_ins_ids.size()) {}
 
 void ExchangeSinkBuffer::close() {
     // Could not clear the queue here, because there maybe a running rpc want 
to
@@ -121,29 +116,34 @@ void ExchangeSinkBuffer::construct_request(TUniqueId 
fragment_instance_id) {
         return;
     }
     auto low_id = fragment_instance_id.lo;
-    if (_instance_to_package_queue_mutex.count(low_id)) {
+    if (_rpc_instances.contains(low_id)) {
         return;
     }
-    _instance_to_package_queue_mutex[low_id] = std::make_unique<std::mutex>();
-    _instance_to_seq[low_id] = 0;
-    _instance_to_package_queue[low_id] = std::queue<TransmitInfo, 
std::list<TransmitInfo>>();
-    _instance_to_broadcast_package_queue[low_id] =
+
+    // Initialize the instance data
+    auto instance_data = std::make_unique<RpcInstance>(low_id);
+    instance_data->mutex = std::make_unique<std::mutex>();
+    instance_data->seq = 0;
+    instance_data->package_queue = std::queue<TransmitInfo, 
std::list<TransmitInfo>>();
+    instance_data->broadcast_package_queue =
             std::queue<BroadcastTransmitInfo, 
std::list<BroadcastTransmitInfo>>();
-    _queue_capacity =
-            config::exchg_buffer_queue_capacity_factor * 
_instance_to_package_queue.size();
+    _queue_capacity = config::exchg_buffer_queue_capacity_factor * 
_rpc_instances.size();
+
     PUniqueId finst_id;
     finst_id.set_hi(fragment_instance_id.hi);
     finst_id.set_lo(fragment_instance_id.lo);
-    _rpc_channel_is_idle[low_id] = true;
-    _rpc_channel_is_turn_off[low_id] = false;
-    
_instance_to_rpc_stats_vec.emplace_back(std::make_shared<RpcInstanceStatistics>(low_id));
-    _instance_to_rpc_stats[low_id] = _instance_to_rpc_stats_vec.back().get();
-    _instance_to_request[low_id] = std::make_shared<PTransmitDataParams>();
-    _instance_to_request[low_id]->mutable_finst_id()->CopyFrom(finst_id);
-    _instance_to_request[low_id]->mutable_query_id()->CopyFrom(_query_id);
-
-    _instance_to_request[low_id]->set_node_id(_dest_node_id);
-    _running_sink_count[low_id] = _exchange_sink_num;
+
+    instance_data->rpc_channel_is_idle = true;
+    instance_data->rpc_channel_is_turn_off = false;
+
+    // Initialize request
+    instance_data->request = std::make_shared<PTransmitDataParams>();
+    instance_data->request->mutable_finst_id()->CopyFrom(finst_id);
+    instance_data->request->mutable_query_id()->CopyFrom(_query_id);
+    instance_data->request->set_node_id(_dest_node_id);
+    instance_data->running_sink_count = _exchange_sink_num;
+
+    _rpc_instances[low_id] = std::move(instance_data);
 }
 
 Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) {
@@ -151,20 +151,21 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& 
request) {
         return Status::OK();
     }
     auto ins_id = request.channel->dest_ins_id();
-    if (!_instance_to_package_queue_mutex.contains(ins_id)) {
+    if (!_rpc_instances.contains(ins_id)) {
         return Status::InternalError("fragment_instance_id {} not do 
register_sink",
                                      
print_id(request.channel->_fragment_instance_id));
     }
-    if (_rpc_channel_is_turn_off[ins_id]) {
+    auto& instance_data = *_rpc_instances[ins_id];
+    if (instance_data.rpc_channel_is_turn_off) {
         return Status::EndOfFile("receiver eof");
     }
     bool send_now = false;
     {
-        std::unique_lock<std::mutex> 
lock(*_instance_to_package_queue_mutex[ins_id]);
+        std::unique_lock<std::mutex> lock(*instance_data.mutex);
         // Do not have in process rpc, directly send
-        if (_rpc_channel_is_idle[ins_id]) {
+        if (instance_data.rpc_channel_is_idle) {
             send_now = true;
-            _rpc_channel_is_idle[ins_id] = false;
+            instance_data.rpc_channel_is_idle = false;
         }
         if (request.block) {
             RETURN_IF_ERROR(
@@ -172,16 +173,16 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& 
request) {
             COUNTER_UPDATE(request.channel->_parent->memory_used_counter(),
                            request.block->ByteSizeLong());
         }
-        _instance_to_package_queue[ins_id].emplace(std::move(request));
+        instance_data.package_queue.emplace(std::move(request));
         _total_queue_size++;
         if (_total_queue_size > _queue_capacity) {
-            for (auto& [_, dep] : _queue_deps) {
+            for (auto& dep : _queue_deps) {
                 dep->block();
             }
         }
     }
     if (send_now) {
-        RETURN_IF_ERROR(_send_rpc(ins_id));
+        RETURN_IF_ERROR(_send_rpc(instance_data));
     }
 
     return Status::OK();
@@ -192,71 +193,69 @@ Status 
ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
         return Status::OK();
     }
     auto ins_id = request.channel->dest_ins_id();
-    if (!_instance_to_package_queue_mutex.contains(ins_id)) {
+    if (!_rpc_instances.contains(ins_id)) {
         return Status::InternalError("fragment_instance_id {} not do 
register_sink",
                                      
print_id(request.channel->_fragment_instance_id));
     }
-    if (_rpc_channel_is_turn_off[ins_id]) {
+    auto& instance_data = *_rpc_instances[ins_id];
+    if (instance_data.rpc_channel_is_turn_off) {
         return Status::EndOfFile("receiver eof");
     }
     bool send_now = false;
     {
-        std::unique_lock<std::mutex> 
lock(*_instance_to_package_queue_mutex[ins_id]);
+        std::unique_lock<std::mutex> lock(*instance_data.mutex);
         // Do not have in process rpc, directly send
-        if (_rpc_channel_is_idle[ins_id]) {
+        if (instance_data.rpc_channel_is_idle) {
             send_now = true;
-            _rpc_channel_is_idle[ins_id] = false;
+            instance_data.rpc_channel_is_idle = false;
         }
         if (request.block_holder->get_block()) {
             RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(
                     request.block_holder->get_block()->be_exec_version()));
         }
-        _instance_to_broadcast_package_queue[ins_id].emplace(request);
+        instance_data.broadcast_package_queue.emplace(request);
     }
     if (send_now) {
-        RETURN_IF_ERROR(_send_rpc(ins_id));
+        RETURN_IF_ERROR(_send_rpc(instance_data));
     }
 
     return Status::OK();
 }
 
-Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
-    std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
+Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) {
+    std::unique_lock<std::mutex> lock(*(instance_data.mutex));
 
-    std::queue<TransmitInfo, std::list<TransmitInfo>>& q = 
_instance_to_package_queue[id];
+    std::queue<TransmitInfo, std::list<TransmitInfo>>& q = 
instance_data.package_queue;
     std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& 
broadcast_q =
-            _instance_to_broadcast_package_queue[id];
+            instance_data.broadcast_package_queue;
 
     if (_is_failed) {
-        _turn_off_channel(id, lock);
+        _turn_off_channel(instance_data, lock);
         return Status::OK();
     }
-    if (_rpc_channel_is_turn_off[id]) {
+    if (instance_data.rpc_channel_is_turn_off) {
         return Status::OK();
     }
 
     if (!q.empty()) {
         // If we have data to shuffle which is not broadcasted
         auto& request = q.front();
-        auto& brpc_request = _instance_to_request[id];
+        auto& brpc_request = instance_data.request;
         brpc_request->set_eos(request.eos);
-        brpc_request->set_packet_seq(_instance_to_seq[id]++);
+        brpc_request->set_packet_seq(instance_data.seq++);
         brpc_request->set_sender_id(request.channel->_parent->sender_id());
         brpc_request->set_be_number(request.channel->_parent->be_number());
         if (request.block && !request.block->column_metas().empty()) {
             brpc_request->set_allocated_block(request.block.get());
         }
-        if (!request.exec_status.ok()) {
-            
request.exec_status.to_protobuf(brpc_request->mutable_exec_status());
-        }
-        auto send_callback = request.channel->get_send_callback(id, 
request.eos);
+        auto send_callback = 
request.channel->get_send_callback(&instance_data, request.eos);
 
         
send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms);
         if (config::execution_ignore_eovercrowded) {
             send_callback->cntl_->ignore_eovercrowded();
         }
         send_callback->addFailedHandler([&, weak_task_ctx = 
weak_task_exec_ctx()](
-                                                const InstanceLoId& id, const 
std::string& err) {
+                                                RpcInstance* ins, const 
std::string& err) {
             auto task_lock = weak_task_ctx.lock();
             if (task_lock == nullptr) {
                 // This means ExchangeSinkBuffer Ojbect already destroyed, not 
need run failed any more.
@@ -264,11 +263,11 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
             }
             // attach task for memory tracker and query id when core
             SCOPED_ATTACH_TASK(_state);
-            _failed(id, err);
+            _failed(ins->id, err);
         });
         send_callback->start_rpc_time = GetCurrentTimeNanos();
         send_callback->addSuccessHandler([&, weak_task_ctx = 
weak_task_exec_ctx()](
-                                                 const InstanceLoId& id, const 
bool& eos,
+                                                 RpcInstance* ins_ptr, const 
bool& eos,
                                                  const PTransmitDataResult& 
result,
                                                  const int64_t& 
start_rpc_time) {
             auto task_lock = weak_task_ctx.lock();
@@ -279,24 +278,25 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
             // attach task for memory tracker and query id when core
             SCOPED_ATTACH_TASK(_state);
 
+            auto& ins = *ins_ptr;
             auto end_rpc_time = GetCurrentTimeNanos();
-            update_rpc_time(id, start_rpc_time, end_rpc_time);
+            update_rpc_time(ins, start_rpc_time, end_rpc_time);
 
             Status s(Status::create(result.status()));
             if (s.is<ErrorCode::END_OF_FILE>()) {
-                _set_receiver_eof(id);
+                _set_receiver_eof(ins);
             } else if (!s.ok()) {
-                _failed(id,
+                _failed(ins.id,
                         fmt::format("exchange req success but status isn't ok: 
{}", s.to_string()));
                 return;
             } else if (eos) {
-                _ended(id);
+                _ended(ins);
             }
             // The eos here only indicates that the current exchange sink has 
reached eos.
             // However, the queue still contains data from other exchange 
sinks, so RPCs need to continue being sent.
-            s = _send_rpc(id);
+            s = _send_rpc(ins);
             if (!s) {
-                _failed(id,
+                _failed(ins.id,
                         fmt::format("exchange req success but status isn't ok: 
{}", s.to_string()));
             }
         });
@@ -322,29 +322,29 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
         q.pop();
         _total_queue_size--;
         if (_total_queue_size <= _queue_capacity) {
-            for (auto& [_, dep] : _queue_deps) {
+            for (auto& dep : _queue_deps) {
                 dep->set_ready();
             }
         }
     } else if (!broadcast_q.empty()) {
         // If we have data to shuffle which is broadcasted
         auto& request = broadcast_q.front();
-        auto& brpc_request = _instance_to_request[id];
+        auto& brpc_request = instance_data.request;
         brpc_request->set_eos(request.eos);
-        brpc_request->set_packet_seq(_instance_to_seq[id]++);
+        brpc_request->set_packet_seq(instance_data.seq++);
         brpc_request->set_sender_id(request.channel->_parent->sender_id());
         brpc_request->set_be_number(request.channel->_parent->be_number());
         if (request.block_holder->get_block() &&
             !request.block_holder->get_block()->column_metas().empty()) {
             
brpc_request->set_allocated_block(request.block_holder->get_block());
         }
-        auto send_callback = request.channel->get_send_callback(id, 
request.eos);
+        auto send_callback = 
request.channel->get_send_callback(&instance_data, request.eos);
         
send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms);
         if (config::execution_ignore_eovercrowded) {
             send_callback->cntl_->ignore_eovercrowded();
         }
         send_callback->addFailedHandler([&, weak_task_ctx = 
weak_task_exec_ctx()](
-                                                const InstanceLoId& id, const 
std::string& err) {
+                                                RpcInstance* ins, const 
std::string& err) {
             auto task_lock = weak_task_ctx.lock();
             if (task_lock == nullptr) {
                 // This means ExchangeSinkBuffer Ojbect already destroyed, not 
need run failed any more.
@@ -352,11 +352,11 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
             }
             // attach task for memory tracker and query id when core
             SCOPED_ATTACH_TASK(_state);
-            _failed(id, err);
+            _failed(ins->id, err);
         });
         send_callback->start_rpc_time = GetCurrentTimeNanos();
         send_callback->addSuccessHandler([&, weak_task_ctx = 
weak_task_exec_ctx()](
-                                                 const InstanceLoId& id, const 
bool& eos,
+                                                 RpcInstance* ins_ptr, const 
bool& eos,
                                                  const PTransmitDataResult& 
result,
                                                  const int64_t& 
start_rpc_time) {
             auto task_lock = weak_task_ctx.lock();
@@ -366,26 +366,26 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
             }
             // attach task for memory tracker and query id when core
             SCOPED_ATTACH_TASK(_state);
-
+            auto& ins = *ins_ptr;
             auto end_rpc_time = GetCurrentTimeNanos();
-            update_rpc_time(id, start_rpc_time, end_rpc_time);
+            update_rpc_time(ins, start_rpc_time, end_rpc_time);
 
             Status s(Status::create(result.status()));
             if (s.is<ErrorCode::END_OF_FILE>()) {
-                _set_receiver_eof(id);
+                _set_receiver_eof(ins);
             } else if (!s.ok()) {
-                _failed(id,
+                _failed(ins.id,
                         fmt::format("exchange req success but status isn't ok: 
{}", s.to_string()));
                 return;
             } else if (eos) {
-                _ended(id);
+                _ended(ins);
             }
 
             // The eos here only indicates that the current exchange sink has 
reached eos.
             // However, the queue still contains data from other exchange 
sinks, so RPCs need to continue being sent.
-            s = _send_rpc(id);
+            s = _send_rpc(ins);
             if (!s) {
-                _failed(id,
+                _failed(ins.id,
                         fmt::format("exchange req success but status isn't ok: 
{}", s.to_string()));
             }
         });
@@ -408,29 +408,17 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
         }
         broadcast_q.pop();
     } else {
-        _rpc_channel_is_idle[id] = true;
+        instance_data.rpc_channel_is_idle = true;
     }
 
     return Status::OK();
 }
 
-void ExchangeSinkBuffer::_ended(InstanceLoId id) {
-    if (!_instance_to_package_queue_mutex.contains(id)) {
-        std::stringstream ss;
-        ss << "failed find the instance id:" << id
-           << " now mutex map size:" << 
_instance_to_package_queue_mutex.size();
-        for (const auto& p : _instance_to_package_queue_mutex) {
-            ss << " key:" << p.first << " value:" << p.second << "\n";
-        }
-        LOG(INFO) << ss.str();
-
-        throw Exception(Status::FatalError("not find the instance id"));
-    } else {
-        std::unique_lock<std::mutex> 
lock(*_instance_to_package_queue_mutex[id]);
-        _running_sink_count[id]--;
-        if (_running_sink_count[id] == 0) {
-            _turn_off_channel(id, lock);
-        }
+void ExchangeSinkBuffer::_ended(RpcInstance& ins) {
+    std::unique_lock<std::mutex> lock(*ins.mutex);
+    ins.running_sink_count--;
+    if (ins.running_sink_count == 0) {
+        _turn_off_channel(ins, lock);
     }
 }
 
@@ -441,15 +429,15 @@ void ExchangeSinkBuffer::_failed(InstanceLoId id, const 
std::string& err) {
     _context->cancel(Status::Cancelled(err));
 }
 
-void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) {
-    std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
+void ExchangeSinkBuffer::_set_receiver_eof(RpcInstance& ins) {
+    std::unique_lock<std::mutex> lock(*ins.mutex);
     // When the receiving side reaches eof, it means the receiver has finished 
early.
     // The remaining data in the current rpc_channel does not need to be sent,
     // and the rpc_channel should be turned off immediately.
-    Defer turn_off([&]() { _turn_off_channel(id, lock); });
+    Defer turn_off([&]() { _turn_off_channel(ins, lock); });
 
     std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& 
broadcast_q =
-            _instance_to_broadcast_package_queue[id];
+            ins.broadcast_package_queue;
     for (; !broadcast_q.empty(); broadcast_q.pop()) {
         if (broadcast_q.front().block_holder->get_block()) {
             
COUNTER_UPDATE(broadcast_q.front().channel->_parent->memory_used_counter(),
@@ -461,7 +449,7 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) 
{
         swap(empty, broadcast_q);
     }
 
-    std::queue<TransmitInfo, std::list<TransmitInfo>>& q = 
_instance_to_package_queue[id];
+    std::queue<TransmitInfo, std::list<TransmitInfo>>& q = ins.package_queue;
     for (; !q.empty(); q.pop()) {
         // Must update _total_queue_size here, otherwise if _total_queue_size 
> _queue_capacity at EOF,
         // ExchangeSinkQueueDependency will be blocked and pipeline will be 
deadlocked
@@ -474,7 +462,7 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) 
{
 
     // Try to wake up pipeline after clearing the queue
     if (_total_queue_size <= _queue_capacity) {
-        for (auto& [_, dep] : _queue_deps) {
+        for (auto& dep : _queue_deps) {
             dep->set_ready();
         }
     }
@@ -486,20 +474,20 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId 
id) {
 }
 
 // The unused parameter `with_lock` is to ensure that the function is called 
when the lock is held.
-void ExchangeSinkBuffer::_turn_off_channel(InstanceLoId id,
+void ExchangeSinkBuffer::_turn_off_channel(RpcInstance& ins,
                                            std::unique_lock<std::mutex>& 
/*with_lock*/) {
-    if (!_rpc_channel_is_idle[id]) {
-        _rpc_channel_is_idle[id] = true;
+    if (!ins.rpc_channel_is_idle) {
+        ins.rpc_channel_is_idle = true;
     }
     // Ensure that each RPC is turned off only once.
-    if (_rpc_channel_is_turn_off[id]) {
+    if (ins.rpc_channel_is_turn_off) {
         return;
     }
-    _rpc_channel_is_turn_off[id] = true;
+    ins.rpc_channel_is_turn_off = true;
     auto weak_task_ctx = weak_task_exec_ctx();
     if (auto pip_ctx = weak_task_ctx.lock()) {
-        for (auto& [_, parent] : _parents) {
-            parent->on_channel_finished(id);
+        for (auto& parent : _parents) {
+            parent->on_channel_finished(ins.id);
         }
     }
 }
@@ -507,10 +495,10 @@ void ExchangeSinkBuffer::_turn_off_channel(InstanceLoId 
id,
 void ExchangeSinkBuffer::get_max_min_rpc_time(int64_t* max_time, int64_t* 
min_time) {
     int64_t local_max_time = 0;
     int64_t local_min_time = INT64_MAX;
-    for (auto& [id, stats] : _instance_to_rpc_stats) {
-        if (stats->sum_time != 0) {
-            local_max_time = std::max(local_max_time, stats->sum_time);
-            local_min_time = std::min(local_min_time, stats->sum_time);
+    for (auto& [_, ins] : _rpc_instances) {
+        if (ins->stats.sum_time != 0) {
+            local_max_time = std::max(local_max_time, ins->stats.sum_time);
+            local_min_time = std::min(local_min_time, ins->stats.sum_time);
         }
     }
     *max_time = local_max_time;
@@ -519,24 +507,22 @@ void ExchangeSinkBuffer::get_max_min_rpc_time(int64_t* 
max_time, int64_t* min_ti
 
 int64_t ExchangeSinkBuffer::get_sum_rpc_time() {
     int64_t sum_time = 0;
-    for (auto& [id, stats] : _instance_to_rpc_stats) {
-        sum_time += stats->sum_time;
+    for (auto& [_, ins] : _rpc_instances) {
+        sum_time += ins->stats.sum_time;
     }
     return sum_time;
 }
 
-void ExchangeSinkBuffer::update_rpc_time(InstanceLoId id, int64_t 
start_rpc_time,
+void ExchangeSinkBuffer::update_rpc_time(RpcInstance& ins, int64_t 
start_rpc_time,
                                          int64_t receive_rpc_time) {
     _rpc_count++;
     int64_t rpc_spend_time = receive_rpc_time - start_rpc_time;
-    DCHECK(_instance_to_rpc_stats.find(id) != _instance_to_rpc_stats.end());
     if (rpc_spend_time > 0) {
-        ++_instance_to_rpc_stats[id]->rpc_count;
-        _instance_to_rpc_stats[id]->sum_time += rpc_spend_time;
-        _instance_to_rpc_stats[id]->max_time =
-                std::max(_instance_to_rpc_stats[id]->max_time, rpc_spend_time);
-        _instance_to_rpc_stats[id]->min_time =
-                std::min(_instance_to_rpc_stats[id]->min_time, rpc_spend_time);
+        auto& stats = ins.stats;
+        ++stats.rpc_count;
+        stats.sum_time += rpc_spend_time;
+        stats.max_time = std::max(stats.max_time, rpc_spend_time);
+        stats.min_time = std::min(stats.min_time, rpc_spend_time);
     }
 }
 
@@ -561,21 +547,21 @@ void ExchangeSinkBuffer::update_profile(RuntimeProfile* 
profile) {
     // This counter will lead to performance degradation.
     // So only collect this information when the profile level is greater than 
3.
     if (_state->profile_level() > 3 && max_count > 0) {
-        std::vector<RpcInstanceStatistics> tmp_rpc_stats_vec;
-        for (const auto& stats : _instance_to_rpc_stats_vec) {
-            tmp_rpc_stats_vec.emplace_back(*stats);
+        std::vector<std::pair<InstanceLoId, RpcInstanceStatistics>> 
tmp_rpc_stats_vec;
+        for (const auto& [id, ins] : _rpc_instances) {
+            tmp_rpc_stats_vec.emplace_back(id, ins->stats);
         }
         pdqsort(tmp_rpc_stats_vec.begin(), tmp_rpc_stats_vec.end(),
-                [](const auto& a, const auto& b) { return a.max_time > 
b.max_time; });
+                [](const auto& a, const auto& b) { return a.second.max_time > 
b.second.max_time; });
         auto count = std::min((size_t)max_count, tmp_rpc_stats_vec.size());
         int i = 0;
         auto* detail_profile = profile->create_child("RpcInstanceDetails", 
true, true);
-        for (const auto& stats : tmp_rpc_stats_vec) {
+        for (const auto& [id, stats] : tmp_rpc_stats_vec) {
             if (0 == stats.rpc_count) {
                 continue;
             }
             std::stringstream out;
-            out << "Instance " << std::hex << stats.inst_lo_id;
+            out << "Instance " << std::hex << id;
             auto stats_str = fmt::format(
                     "Count: {}, MaxTime: {}, MinTime: {}, AvgTime: {}, 
SumTime: {}",
                     stats.rpc_count, PrettyPrinter::print(stats.max_time, 
TUnit::TIME_NS),
@@ -594,10 +580,10 @@ void ExchangeSinkBuffer::update_profile(RuntimeProfile* 
profile) {
 
 std::string ExchangeSinkBuffer::debug_each_instance_queue_size() {
     fmt::memory_buffer debug_string_buffer;
-    for (auto& [id, m] : _instance_to_package_queue_mutex) {
-        std::unique_lock<std::mutex> lock(*m);
+    for (auto& [id, instance_data] : _rpc_instances) {
+        std::unique_lock<std::mutex> lock(*instance_data->mutex);
         fmt::format_to(debug_string_buffer, "Instance: {}, queue size: {}\n", 
id,
-                       _instance_to_package_queue[id].size());
+                       instance_data->package_queue.size());
     }
     return fmt::to_string(debug_string_buffer);
 }
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h 
b/be/src/pipeline/exec/exchange_sink_buffer.h
index 90be213c9d6..9a00ef072d5 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -114,7 +114,6 @@ struct TransmitInfo {
     vectorized::Channel* channel = nullptr;
     std::unique_ptr<PBlock> block;
     bool eos;
-    Status exec_status;
 };
 
 struct BroadcastTransmitInfo {
@@ -123,6 +122,49 @@ struct BroadcastTransmitInfo {
     bool eos;
 };
 
+struct RpcInstanceStatistics {
+    int64_t rpc_count = 0;
+    int64_t max_time = 0;
+    int64_t min_time = INT64_MAX;
+    int64_t sum_time = 0;
+};
+
+// Consolidated structure for RPC instance data
+struct RpcInstance {
+    // Constructor initializes the instance with the given ID
+    RpcInstance(InstanceLoId id) : id(id) {}
+
+    // Unique identifier for this RPC instance
+    InstanceLoId id;
+
+    // Mutex for thread-safe access to this instance's data
+    std::unique_ptr<std::mutex> mutex;
+
+    // Sequence number for RPC packets, incremented for each packet sent
+    int64_t seq = 0;
+
+    // Queue for regular data transmission requests
+    std::queue<TransmitInfo, std::list<TransmitInfo>> package_queue;
+
+    // Queue for broadcast data transmission requests
+    std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>> 
broadcast_package_queue;
+
+    // RPC request parameters for data transmission
+    std::shared_ptr<PTransmitDataParams> request;
+
+    // Flag indicating if the RPC channel is currently idle (no active RPC)
+    bool rpc_channel_is_idle = true;
+
+    // Flag indicating if the RPC channel has been turned off (no more RPCs 
will be sent)
+    bool rpc_channel_is_turn_off = false;
+
+    // Statistics for monitoring RPC performance (latency, counts, etc.)
+    RpcInstanceStatistics stats;
+
+    // Count of active exchange sinks using this RPC instance
+    int64_t running_sink_count = 0;
+};
+
 template <typename Response>
 class ExchangeSendCallback : public ::doris::DummyBrpcCallback<Response> {
     ENABLE_FACTORY_CREATOR(ExchangeSendCallback);
@@ -130,20 +172,19 @@ class ExchangeSendCallback : public 
::doris::DummyBrpcCallback<Response> {
 public:
     ExchangeSendCallback() = default;
 
-    void init(InstanceLoId id, bool eos) {
-        _id = id;
+    void init(pipeline::RpcInstance* ins, bool eos) {
+        _ins = ins;
         _eos = eos;
     }
 
     ~ExchangeSendCallback() override = default;
     ExchangeSendCallback(const ExchangeSendCallback& other) = delete;
     ExchangeSendCallback& operator=(const ExchangeSendCallback& other) = 
delete;
-    void addFailedHandler(
-            const std::function<void(const InstanceLoId&, const 
std::string&)>& fail_fn) {
+    void addFailedHandler(const std::function<void(RpcInstance*, const 
std::string&)>& fail_fn) {
         _fail_fn = fail_fn;
     }
-    void addSuccessHandler(const std::function<void(const InstanceLoId&, const 
bool&,
-                                                    const Response&, const 
int64_t&)>& suc_fn) {
+    void addSuccessHandler(const std::function<void(RpcInstance*, const bool&, 
const Response&,
+                                                    const int64_t&)>& suc_fn) {
         _suc_fn = suc_fn;
     }
 
@@ -157,9 +198,9 @@ public:
                         
::doris::DummyBrpcCallback<Response>::cntl_->ErrorText(),
                         BackendOptions::get_localhost(),
                         
::doris::DummyBrpcCallback<Response>::cntl_->latency_us());
-                _fail_fn(_id, err);
+                _fail_fn(_ins, err);
             } else {
-                _suc_fn(_id, _eos, 
*(::doris::DummyBrpcCallback<Response>::response_),
+                _suc_fn(_ins, _eos, 
*(::doris::DummyBrpcCallback<Response>::response_),
                         start_rpc_time);
             }
         } catch (const std::exception& exp) {
@@ -172,9 +213,9 @@ public:
     int64_t start_rpc_time;
 
 private:
-    std::function<void(const InstanceLoId&, const std::string&)> _fail_fn;
-    std::function<void(const InstanceLoId&, const bool&, const Response&, 
const int64_t&)> _suc_fn;
-    InstanceLoId _id;
+    std::function<void(RpcInstance*, const std::string&)> _fail_fn;
+    std::function<void(RpcInstance*, const bool&, const Response&, const 
int64_t&)> _suc_fn;
+    RpcInstance* _ins;
     bool _eos;
 };
 
@@ -237,15 +278,14 @@ public:
     Status add_block(TransmitInfo&& request);
     Status add_block(BroadcastTransmitInfo&& request);
     void close();
-    void update_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t 
receive_rpc_time);
+    void update_rpc_time(RpcInstance& ins, int64_t start_rpc_time, int64_t 
receive_rpc_time);
     void update_profile(RuntimeProfile* profile);
 
     void set_dependency(InstanceLoId sender_ins_id, 
std::shared_ptr<Dependency> queue_dependency,
                         ExchangeSinkLocalState* local_state) {
-        DCHECK(_queue_deps.contains(sender_ins_id));
-        DCHECK(_parents.contains(sender_ins_id));
-        _queue_deps[sender_ins_id] = queue_dependency;
-        _parents[sender_ins_id] = local_state;
+        std::lock_guard l(_m);
+        _queue_deps.push_back(queue_dependency);
+        _parents.push_back(local_state);
     }
 
     void set_low_memory_mode() { _queue_capacity = 8; }
@@ -257,38 +297,9 @@ private:
 #endif
     friend class ExchangeSinkLocalState;
 
-    phmap::flat_hash_map<InstanceLoId, std::unique_ptr<std::mutex>>
-            _instance_to_package_queue_mutex;
-    // store data in non-broadcast shuffle
-    phmap::flat_hash_map<InstanceLoId, std::queue<TransmitInfo, 
std::list<TransmitInfo>>>
-            _instance_to_package_queue;
+    // Single map to store all RPC instance data
+    phmap::flat_hash_map<InstanceLoId, std::unique_ptr<RpcInstance>> 
_rpc_instances;
     std::atomic<size_t> _queue_capacity;
-    // store data in broadcast shuffle
-    phmap::flat_hash_map<InstanceLoId,
-                         std::queue<BroadcastTransmitInfo, 
std::list<BroadcastTransmitInfo>>>
-            _instance_to_broadcast_package_queue;
-    using PackageSeq = int64_t;
-    // must init zero
-    // TODO: make all flat_hash_map to a STRUT
-    phmap::flat_hash_map<InstanceLoId, PackageSeq> _instance_to_seq;
-    phmap::flat_hash_map<InstanceLoId, std::shared_ptr<PTransmitDataParams>> 
_instance_to_request;
-    // One channel is corresponding to a downstream instance.
-    phmap::flat_hash_map<InstanceLoId, bool> _rpc_channel_is_idle;
-
-    // There could be multiple situations that cause an rpc_channel to be 
turned off,
-    // such as receiving the eof, manual cancellation by the user, or all 
sinks reaching eos.
-    // Therefore, it is necessary to prevent an rpc_channel from being turned 
off multiple times.
-    phmap::flat_hash_map<InstanceLoId, bool> _rpc_channel_is_turn_off;
-    struct RpcInstanceStatistics {
-        RpcInstanceStatistics(InstanceLoId id) : inst_lo_id(id) {}
-        InstanceLoId inst_lo_id;
-        int64_t rpc_count = 0;
-        int64_t max_time = 0;
-        int64_t min_time = INT64_MAX;
-        int64_t sum_time = 0;
-    };
-    std::vector<std::shared_ptr<RpcInstanceStatistics>> 
_instance_to_rpc_stats_vec;
-    phmap::flat_hash_map<InstanceLoId, RpcInstanceStatistics*> 
_instance_to_rpc_stats;
 
     // It is set to true only when an RPC fails. Currently, we do not have an 
error retry mechanism.
     // If an RPC error occurs, the query will be canceled.
@@ -301,19 +312,19 @@ private:
     RuntimeState* _state = nullptr;
     QueryContext* _context = nullptr;
 
-    Status _send_rpc(InstanceLoId);
+    Status _send_rpc(RpcInstance& ins);
 
 #ifndef BE_TEST
-    inline void _ended(InstanceLoId id);
+    inline void _ended(RpcInstance& ins);
     inline void _failed(InstanceLoId id, const std::string& err);
-    inline void _set_receiver_eof(InstanceLoId id);
-    inline void _turn_off_channel(InstanceLoId id, 
std::unique_lock<std::mutex>& with_lock);
+    inline void _set_receiver_eof(RpcInstance& ins);
+    inline void _turn_off_channel(RpcInstance& ins, 
std::unique_lock<std::mutex>& with_lock);
 
 #else
-    virtual void _ended(InstanceLoId id);
+    virtual void _ended(RpcInstance& ins);
     virtual void _failed(InstanceLoId id, const std::string& err);
-    virtual void _set_receiver_eof(InstanceLoId id);
-    virtual void _turn_off_channel(InstanceLoId id, 
std::unique_lock<std::mutex>& with_lock);
+    virtual void _set_receiver_eof(RpcInstance& ins);
+    virtual void _turn_off_channel(RpcInstance& ins, 
std::unique_lock<std::mutex>& with_lock);
 #endif
 
     void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time);
@@ -323,13 +334,12 @@ private:
     // Any modification to instance_to_package_queue requires a corresponding 
modification to _total_queue_size.
     std::atomic<int> _total_queue_size = 0;
 
-    // _running_sink_count is used to track how many sinks have not finished 
yet.
-    // It is only decremented when eos is reached.
-    phmap::flat_hash_map<InstanceLoId, int64_t> _running_sink_count;
+    // protected the `_queue_deps` and `_parents`
+    std::mutex _m;
     // _queue_deps is used for memory control.
-    phmap::flat_hash_map<InstanceLoId, std::shared_ptr<Dependency>> 
_queue_deps;
+    std::vector<std::shared_ptr<Dependency>> _queue_deps;
     // The ExchangeSinkLocalState in _parents is only used in 
_turn_off_channel.
-    phmap::flat_hash_map<InstanceLoId, ExchangeSinkLocalState*> _parents;
+    std::vector<ExchangeSinkLocalState*> _parents;
     const int64_t _exchange_sink_num;
 };
 
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index b105985e11a..e944cb78f67 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -102,10 +102,10 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
             _last_local_channel_idx = i;
         }
     }
-    only_local_exchange = local_size == channels.size();
+    _only_local_exchange = local_size == channels.size();
     _rpc_channels_num = channels.size() - local_size;
 
-    if (!only_local_exchange) {
+    if (!_only_local_exchange) {
         _sink_buffer = p.get_sink_buffer(state->fragment_instance_id().lo);
         register_channels(_sink_buffer.get());
         _queue_dependency = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
@@ -220,12 +220,9 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
     id.set_hi(_state->query_id().hi);
     id.set_lo(_state->query_id().lo);
 
-    if ((_part_type == TPartitionType::UNPARTITIONED || channels.size() == 1) 
&&
-        !only_local_exchange) {
-        _broadcast_dependency = Dependency::create_shared(
-                _parent->operator_id(), _parent->node_id(), 
"BroadcastDependency", true);
+    if ((_part_type == TPartitionType::UNPARTITIONED) && 
!_only_local_exchange) {
         _broadcast_pb_mem_limiter =
-                
vectorized::BroadcastPBlockHolderMemLimiter::create_shared(_broadcast_dependency);
+                
vectorized::BroadcastPBlockHolderMemLimiter::create_shared(_queue_dependency);
     } else if (_last_local_channel_idx > -1) {
         size_t dep_id = 0;
         for (auto& channel : channels) {
@@ -298,6 +295,20 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
     if (sink.__isset.output_tuple_id) {
         _output_tuple_id = sink.output_tuple_id;
     }
+
+    // Bucket shuffle may contain some same bucket so no need change the 
BUCKET_SHFFULE_HASH_PARTITIONED
+    if (_part_type != TPartitionType::UNPARTITIONED &&
+        _part_type != TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
+        // if the destinations only one dest, we need to use broadcast
+        std::unordered_set<UniqueId> dest_fragment_ids_set;
+        for (auto& dest : _dests) {
+            dest_fragment_ids_set.insert(dest.fragment_instance_id);
+            if (dest_fragment_ids_set.size() > 1) {
+                break;
+            }
+        }
+        _part_type = dest_fragment_ids_set.size() == 1 ? 
TPartitionType::UNPARTITIONED : _part_type;
+    }
 }
 
 Status ExchangeSinkOperatorX::init(const TDataSink& tsink) {
@@ -368,11 +379,11 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
         set_low_memory_mode(state);
     }
 
-    if (_part_type == TPartitionType::UNPARTITIONED || 
local_state.channels.size() == 1) {
+    if (_part_type == TPartitionType::UNPARTITIONED) {
         // 1. serialize depends on it is not local exchange
         // 2. send block
         // 3. rollover block
-        if (local_state.only_local_exchange) {
+        if (local_state._only_local_exchange) {
             if (!block->empty()) {
                 Status status;
                 size_t idx = 0;
@@ -549,9 +560,6 @@ Status ExchangeSinkLocalState::close(RuntimeState* state, 
Status exec_status) {
     }
 
     COUNTER_SET(_wait_for_finish_dependency_timer, 
_finish_dependency->watcher_elapse_time());
-    if (_broadcast_dependency) {
-        COUNTER_UPDATE(_wait_broadcast_buffer_timer, 
_broadcast_dependency->watcher_elapse_time());
-    }
     for (size_t i = 0; i < _local_channels_dependency.size(); i++) {
         COUNTER_UPDATE(_wait_channel_timer[i],
                        _local_channels_dependency[i]->watcher_elapse_time());
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index f9cf4f90748..cdef5e5e119 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -62,9 +62,6 @@ public:
         if (_queue_dependency) {
             dep_vec.push_back(_queue_dependency.get());
         }
-        if (_broadcast_dependency) {
-            dep_vec.push_back(_broadcast_dependency.get());
-        }
         std::for_each(_local_channels_dependency.begin(), 
_local_channels_dependency.end(),
                       [&](std::shared_ptr<Dependency> dep) { 
dep_vec.push_back(dep.get()); });
         return dep_vec;
@@ -104,7 +101,7 @@ public:
     }
     std::vector<std::shared_ptr<vectorized::Channel>> channels;
     int current_channel_idx {0}; // index of current channel to send to if 
_random == true
-    bool only_local_exchange {false};
+    bool _only_local_exchange {false};
 
     void on_channel_finished(InstanceLoId channel_id);
     vectorized::PartitionerBase* partitioner() const { return 
_partitioner.get(); }
@@ -145,7 +142,6 @@ private:
     vectorized::BlockSerializer _serializer;
 
     std::shared_ptr<Dependency> _queue_dependency = nullptr;
-    std::shared_ptr<Dependency> _broadcast_dependency = nullptr;
 
     /**
      * We use this to control the execution for local exchange.
@@ -202,7 +198,7 @@ public:
     bool is_serial_operator() const override { return true; }
     void set_low_memory_mode(RuntimeState* state) override {
         auto& local_state = get_local_state(state);
-        // When `local_state.only_local_exchange` the `sink_buffer` is nullptr.
+        // When `local_state._only_local_exchange` the `sink_buffer` is 
nullptr.
         if (local_state._sink_buffer) {
             local_state._sink_buffer->set_low_memory_mode();
         }
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index b4d6331c6f6..3f09d02f20f 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -174,7 +174,7 @@ Status Channel::send_remote_block(std::unique_ptr<PBlock>&& 
block, bool eos) {
         }
     }
     if (eos || block->column_metas_size()) {
-        RETURN_IF_ERROR(_buffer->add_block({this, std::move(block), eos, 
Status::OK()}));
+        RETURN_IF_ERROR(_buffer->add_block({this, std::move(block), eos}));
     }
     return Status::OK();
 }
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index c7b96c8638c..f62391d66b1 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -166,13 +166,13 @@ public:
     InstanceLoId dest_ins_id() const { return _fragment_instance_id.lo; }
 
     std::shared_ptr<pipeline::ExchangeSendCallback<PTransmitDataResult>> 
get_send_callback(
-            InstanceLoId id, bool eos) {
+            pipeline::RpcInstance* ins, bool eos) {
         if (!_send_callback) {
             _send_callback = 
pipeline::ExchangeSendCallback<PTransmitDataResult>::create_shared();
         } else {
             _send_callback->cntl_->Reset();
         }
-        _send_callback->init(id, eos);
+        _send_callback->init(ins, eos);
         return _send_callback;
     }
 
diff --git a/be/test/pipeline/pipeline_test.cpp 
b/be/test/pipeline/pipeline_test.cpp
index 6c2c99931a4..305e895b7b3 100644
--- a/be/test/pipeline/pipeline_test.cpp
+++ b/be/test/pipeline/pipeline_test.cpp
@@ -321,7 +321,7 @@ TEST_F(PipelineTest, HAPPY_PATH) {
     auto& sink_local_state =
             
_runtime_states.back().front()->get_sink_local_state()->cast<ExchangeSinkLocalState>();
     EXPECT_EQ(sink_local_state.channels.size(), 1);
-    EXPECT_EQ(sink_local_state.only_local_exchange, true);
+    EXPECT_EQ(sink_local_state._only_local_exchange, true);
 
     EXPECT_EQ(local_state.stream_recvr->sender_queues().size(), 1);
 
diff --git a/be/test/vec/exec/exchange_sink_test.cpp 
b/be/test/vec/exec/exchange_sink_test.cpp
index 7dbd352bd3a..0643d3c67b8 100644
--- a/be/test/vec/exec/exchange_sink_test.cpp
+++ b/be/test/vec/exec/exchange_sink_test.cpp
@@ -47,12 +47,12 @@ TEST_F(ExchangeSInkTest, test_normal_end) {
         EXPECT_EQ(sink3.add_block(dest_ins_id_2, true), Status::OK());
         EXPECT_EQ(sink3.add_block(dest_ins_id_3, true), Status::OK());
 
-        for (auto [id, count] : buffer->_running_sink_count) {
-            EXPECT_EQ(count, 3) << "id : " << id;
+        for (const auto& [id, instance] : buffer->_rpc_instances) {
+            EXPECT_EQ(instance->running_sink_count, 3) << "id : " << id;
         }
 
-        for (auto [id, is_turn_off] : buffer->_rpc_channel_is_turn_off) {
-            EXPECT_EQ(is_turn_off, false) << "id : " << id;
+        for (const auto& [id, instance] : buffer->_rpc_instances) {
+            EXPECT_EQ(instance->rpc_channel_is_turn_off, false) << "id : " << 
id;
         }
 
         pop_block(dest_ins_id_1, PopState::accept);
@@ -67,12 +67,12 @@ TEST_F(ExchangeSInkTest, test_normal_end) {
         pop_block(dest_ins_id_3, PopState::accept);
         pop_block(dest_ins_id_3, PopState::accept);
 
-        for (auto [id, count] : buffer->_running_sink_count) {
-            EXPECT_EQ(count, 0) << "id : " << id;
+        for (const auto& [id, instance] : buffer->_rpc_instances) {
+            EXPECT_EQ(instance->running_sink_count, 0) << "id : " << id;
         }
 
-        for (auto [id, is_turn_off] : buffer->_rpc_channel_is_turn_off) {
-            EXPECT_EQ(is_turn_off, true) << "id : " << id;
+        for (const auto& [id, instance] : buffer->_rpc_instances) {
+            EXPECT_EQ(instance->rpc_channel_is_turn_off, true) << "id : " << 
id;
         }
         clear_all_done();
     }
@@ -99,17 +99,17 @@ TEST_F(ExchangeSInkTest, test_eof_end) {
         EXPECT_EQ(sink3.add_block(dest_ins_id_2, true), Status::OK());
         EXPECT_EQ(sink3.add_block(dest_ins_id_3, false), Status::OK());
 
-        for (auto [id, count] : buffer->_running_sink_count) {
-            EXPECT_EQ(count, 3) << "id : " << id;
+        for (const auto& [id, instance] : buffer->_rpc_instances) {
+            EXPECT_EQ(instance->running_sink_count, 3) << "id : " << id;
         }
 
-        for (auto [id, is_turn_off] : buffer->_rpc_channel_is_turn_off) {
-            EXPECT_EQ(is_turn_off, false) << "id : " << id;
+        for (const auto& [id, instance] : buffer->_rpc_instances) {
+            EXPECT_EQ(instance->rpc_channel_is_turn_off, false) << "id : " << 
id;
         }
 
         pop_block(dest_ins_id_1, PopState::eof);
-        EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_1], true);
-        EXPECT_TRUE(buffer->_instance_to_package_queue[dest_ins_id_1].empty());
+        
EXPECT_EQ(buffer->_rpc_instances[dest_ins_id_1]->rpc_channel_is_turn_off, true);
+        
EXPECT_TRUE(buffer->_rpc_instances[dest_ins_id_1]->package_queue.empty());
 
         pop_block(dest_ins_id_2, PopState::accept);
         pop_block(dest_ins_id_2, PopState::accept);
@@ -119,9 +119,11 @@ TEST_F(ExchangeSInkTest, test_eof_end) {
         pop_block(dest_ins_id_3, PopState::accept);
         pop_block(dest_ins_id_3, PopState::accept);
 
-        EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_1], true);
-        EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_2], false) << 
"not all eos";
-        EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_3], false) << " 
not all eos";
+        
EXPECT_EQ(buffer->_rpc_instances[dest_ins_id_1]->rpc_channel_is_turn_off, true);
+        
EXPECT_EQ(buffer->_rpc_instances[dest_ins_id_2]->rpc_channel_is_turn_off, false)
+                << "not all eos";
+        
EXPECT_EQ(buffer->_rpc_instances[dest_ins_id_3]->rpc_channel_is_turn_off, false)
+                << " not all eos";
 
         EXPECT_TRUE(sink1.add_block(dest_ins_id_1, 
true).is<ErrorCode::END_OF_FILE>());
         EXPECT_EQ(sink1.add_block(dest_ins_id_2, true), Status::OK());
@@ -129,10 +131,10 @@ TEST_F(ExchangeSInkTest, test_eof_end) {
         pop_block(dest_ins_id_2, PopState::accept);
         pop_block(dest_ins_id_3, PopState::accept);
 
-        EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_1], true);
-        EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_2], true);
-        EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_3], false);
-        EXPECT_EQ(buffer->_running_sink_count[dest_ins_id_3], 1);
+        
EXPECT_EQ(buffer->_rpc_instances[dest_ins_id_1]->rpc_channel_is_turn_off, true);
+        
EXPECT_EQ(buffer->_rpc_instances[dest_ins_id_2]->rpc_channel_is_turn_off, true);
+        
EXPECT_EQ(buffer->_rpc_instances[dest_ins_id_3]->rpc_channel_is_turn_off, 
false);
+        EXPECT_EQ(buffer->_rpc_instances[dest_ins_id_3]->running_sink_count, 
1);
 
         clear_all_done();
     }
@@ -159,12 +161,12 @@ TEST_F(ExchangeSInkTest, test_error_end) {
         EXPECT_EQ(sink3.add_block(dest_ins_id_2, false), Status::OK());
         EXPECT_EQ(sink3.add_block(dest_ins_id_3, false), Status::OK());
 
-        for (auto [id, count] : buffer->_running_sink_count) {
-            EXPECT_EQ(count, 3) << "id : " << id;
+        for (const auto& [id, instance] : buffer->_rpc_instances) {
+            EXPECT_EQ(instance->running_sink_count, 3) << "id : " << id;
         }
 
-        for (auto [id, is_turn_off] : buffer->_rpc_channel_is_turn_off) {
-            EXPECT_EQ(is_turn_off, false) << "id : " << id;
+        for (const auto& [id, instance] : buffer->_rpc_instances) {
+            EXPECT_EQ(instance->rpc_channel_is_turn_off, false) << "id : " << 
id;
         }
 
         pop_block(dest_ins_id_2, PopState::error);
@@ -226,9 +228,9 @@ TEST_F(ExchangeSInkTest, test_queue_size) {
 
         std::cout << "each queue size : \n" << 
buffer->debug_each_instance_queue_size() << "\n";
 
-        EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_1], false);
-        EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_2], true);
-        EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_3], false);
+        
EXPECT_EQ(buffer->_rpc_instances[dest_ins_id_1]->rpc_channel_is_turn_off, 
false);
+        
EXPECT_EQ(buffer->_rpc_instances[dest_ins_id_2]->rpc_channel_is_turn_off, true);
+        
EXPECT_EQ(buffer->_rpc_instances[dest_ins_id_3]->rpc_channel_is_turn_off, 
false);
         clear_all_done();
     }
 }
diff --git a/be/test/vec/exec/exchange_sink_test.h 
b/be/test/vec/exec/exchange_sink_test.h
index 253d7b267f9..59004a53a9e 100644
--- a/be/test/vec/exec/exchange_sink_test.h
+++ b/be/test/vec/exec/exchange_sink_test.h
@@ -138,10 +138,8 @@ struct SinkWithChannel {
     std::map<int64_t, std::shared_ptr<Channel>> channels;
     Status add_block(int64_t id, bool eos) {
         auto channel = channels[id];
-        TransmitInfo transmitInfo {.channel = channel.get(),
-                                   .block = std::make_unique<PBlock>(),
-                                   .eos = eos,
-                                   .exec_status = Status::OK()};
+        TransmitInfo transmitInfo {
+                .channel = channel.get(), .block = std::make_unique<PBlock>(), 
.eos = eos};
         return buffer->add_block(std::move(transmitInfo));
     }
 };


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to