This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1253ef3dc67 [refactor](exchange) remove duplicate code (#45466)
1253ef3dc67 is described below

commit 1253ef3dc674f4c2b7ac9824dedb708d7c1bb6d0
Author: Gabriel <liwenqi...@selectdb.com>
AuthorDate: Wed Dec 18 10:32:32 2024 +0800

    [refactor](exchange) remove duplicate code (#45466)
---
 be/src/pipeline/exec/exchange_sink_operator.cpp | 8 ++------
 1 file changed, 2 insertions(+), 6 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index aa893fc0a26..e7fed76be8f 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -112,6 +112,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
         }
     }
     only_local_exchange = local_size == channels.size();
+    _rpc_channels_num = channels.size() - local_size;
 
     if (!only_local_exchange) {
         _sink_buffer = p.get_sink_buffer(state->fragment_instance_id().lo);
@@ -206,17 +207,12 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
         std::mt19937 g(rd());
         shuffle(channels.begin(), channels.end(), g);
     }
-    size_t local_size = 0;
     for (int i = 0; i < channels.size(); ++i) {
         RETURN_IF_ERROR(channels[i]->open(state));
         if (channels[i]->is_local()) {
-            local_size++;
             _last_local_channel_idx = i;
         }
     }
-    only_local_exchange = local_size == channels.size();
-
-    _rpc_channels_num = channels.size() - local_size;
 
     PUniqueId id;
     id.set_hi(_state->query_id().hi);
@@ -228,7 +224,7 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
                 _parent->operator_id(), _parent->node_id(), 
"BroadcastDependency", true);
         _broadcast_pb_mem_limiter =
                 
vectorized::BroadcastPBlockHolderMemLimiter::create_shared(_broadcast_dependency);
-    } else if (local_size > 0) {
+    } else if (!only_local_exchange) {
         size_t dep_id = 0;
         for (auto& channel : channels) {
             if (channel->is_local()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to