This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2bec12ed7d9 [refactor](exchange) Optimize the logic related to sending 
and closing (#41968)
2bec12ed7d9 is described below

commit 2bec12ed7d9eeea10a788b3e94e2084d29241501
Author: Jerry Hu <mrh...@gmail.com>
AuthorDate: Wed Oct 23 11:49:44 2024 +0800

    [refactor](exchange) Optimize the logic related to sending and closing 
(#41968)
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
---
 be/src/pipeline/exec/exchange_sink_buffer.cpp      |  49 ++---
 be/src/pipeline/exec/exchange_sink_buffer.h        |  25 +--
 be/src/pipeline/exec/exchange_sink_operator.cpp    | 135 +++++++------
 be/src/pipeline/exec/exchange_sink_operator.h      |  29 +--
 be/src/pipeline/exec/result_file_sink_operator.cpp |  12 +-
 be/src/pipeline/exec/result_file_sink_operator.h   |   9 -
 be/src/vec/sink/vdata_stream_sender.cpp            | 223 ++++++++-------------
 be/src/vec/sink/vdata_stream_sender.h              | 212 ++++++--------------
 8 files changed, 253 insertions(+), 441 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index ff06bc37e5d..016802f8f73 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -86,14 +86,13 @@ void BroadcastPBlockHolderMemLimiter::release(const 
BroadcastPBlockHolder& holde
 } // namespace vectorized
 
 namespace pipeline {
-
 ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId 
dest_node_id, int send_id,
                                        int be_number, RuntimeState* state,
                                        ExchangeSinkLocalState* parent)
         : HasTaskExecutionCtx(state),
           _queue_capacity(0),
           _is_finishing(false),
-          _query_id(query_id),
+          _query_id(std::move(query_id)),
           _dest_node_id(dest_node_id),
           _sender_id(send_id),
           _be_number(be_number),
@@ -110,12 +109,6 @@ void ExchangeSinkBuffer::close() {
     //_instance_to_request.clear();
 }
 
-void ExchangeSinkBuffer::_set_ready_to_finish(bool all_done) {
-    if (_finish_dependency && _should_stop && all_done) {
-        _finish_dependency->set_ready();
-    }
-}
-
 void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) {
     if (_is_finishing) {
         return;
@@ -135,7 +128,6 @@ void ExchangeSinkBuffer::register_sink(TUniqueId 
fragment_instance_id) {
     finst_id.set_hi(fragment_instance_id.hi);
     finst_id.set_lo(fragment_instance_id.lo);
     _rpc_channel_is_idle[low_id] = true;
-    _instance_to_rpc_ctx[low_id] = {};
     _instance_to_receiver_eof[low_id] = false;
     _instance_to_rpc_time[low_id] = 0;
     _construct_request(low_id, finst_id);
@@ -160,7 +152,6 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& 
request) {
         if (_rpc_channel_is_idle[ins_id]) {
             send_now = true;
             _rpc_channel_is_idle[ins_id] = false;
-            _busy_channels++;
         }
         if (request.block) {
             RETURN_IF_ERROR(
@@ -201,7 +192,6 @@ Status 
ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
         if (_rpc_channel_is_idle[ins_id]) {
             send_now = true;
             _rpc_channel_is_idle[ins_id] = false;
-            _busy_channels++;
         }
         if (request.block_holder->get_block()) {
             RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(
@@ -226,7 +216,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
             _instance_to_broadcast_package_queue[id];
 
     if (_is_finishing) {
-        _turn_off_channel(id);
+        _turn_off_channel(id, lock);
         return Status::OK();
     }
 
@@ -244,9 +234,6 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
         }
         auto send_callback = request.channel->get_send_callback(id, 
request.eos);
 
-        _instance_to_rpc_ctx[id]._send_callback = send_callback;
-        _instance_to_rpc_ctx[id].is_cancelled = false;
-
         
send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms);
         if (config::exchange_sink_ignore_eovercrowded) {
             send_callback->cntl_->ignore_eovercrowded();
@@ -325,12 +312,6 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
             
brpc_request->set_allocated_block(request.block_holder->get_block());
         }
         auto send_callback = request.channel->get_send_callback(id, 
request.eos);
-
-        ExchangeRpcContext rpc_ctx;
-        rpc_ctx._send_callback = send_callback;
-        rpc_ctx.is_cancelled = false;
-        _instance_to_rpc_ctx[id] = rpc_ctx;
-
         
send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms);
         if (config::exchange_sink_ignore_eovercrowded) {
             send_callback->cntl_->ignore_eovercrowded();
@@ -394,7 +375,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
         }
         broadcast_q.pop();
     } else {
-        _turn_off_channel(id);
+        _rpc_channel_is_idle[id] = true;
     }
 
     return Status::OK();
@@ -424,21 +405,19 @@ void ExchangeSinkBuffer::_ended(InstanceLoId id) {
         __builtin_unreachable();
     } else {
         std::unique_lock<std::mutex> 
lock(*_instance_to_package_queue_mutex[id]);
-        _turn_off_channel(id);
+        _turn_off_channel(id, lock);
     }
 }
 
 void ExchangeSinkBuffer::_failed(InstanceLoId id, const std::string& err) {
     _is_finishing = true;
     _context->cancel(Status::Cancelled(err));
-    std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
-    _turn_off_channel(id, true);
 }
 
 void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) {
     std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
     _instance_to_receiver_eof[id] = true;
-    _turn_off_channel(id, true);
+    _turn_off_channel(id, lock);
     std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& 
broadcast_q =
             _instance_to_broadcast_package_queue[id];
     for (; !broadcast_q.empty(); broadcast_q.pop()) {
@@ -470,17 +449,17 @@ bool ExchangeSinkBuffer::_is_receiver_eof(InstanceLoId 
id) {
     return _instance_to_receiver_eof[id];
 }
 
-void ExchangeSinkBuffer::_turn_off_channel(InstanceLoId id, bool cleanup) {
+// The unused parameter `with_lock` is to ensure that the function is called 
when the lock is held.
+void ExchangeSinkBuffer::_turn_off_channel(InstanceLoId id,
+                                           std::unique_lock<std::mutex>& 
/*with_lock*/) {
     if (!_rpc_channel_is_idle[id]) {
         _rpc_channel_is_idle[id] = true;
-        auto all_done = _busy_channels.fetch_sub(1) == 1;
-        _set_ready_to_finish(all_done);
-        if (cleanup && all_done) {
-            auto weak_task_ctx = weak_task_exec_ctx();
-            if (auto pip_ctx = weak_task_ctx.lock()) {
-                _parent->set_reach_limit();
-            }
-        }
+    }
+    _instance_to_receiver_eof[id] = true;
+
+    auto weak_task_ctx = weak_task_exec_ctx();
+    if (auto pip_ctx = weak_task_ctx.lock()) {
+        _parent->on_channel_finished(id);
     }
 }
 
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h 
b/be/src/pipeline/exec/exchange_sink_buffer.h
index 2d30a492a0d..2ff7a200864 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -22,9 +22,9 @@
 #include <gen_cpp/internal_service.pb.h>
 #include <gen_cpp/types.pb.h>
 #include <parallel_hashmap/phmap.h>
-#include <stdint.h>
 
 #include <atomic>
+#include <cstdint>
 #include <list>
 #include <memory>
 #include <mutex>
@@ -51,7 +51,7 @@ class ExchangeSinkLocalState;
 } // namespace pipeline
 
 namespace vectorized {
-class PipChannel;
+class Channel;
 
 // We use BroadcastPBlockHolder to hold a broadcasted PBlock. For broadcast 
shuffle, one PBlock
 // will be shared between different channel, so we have to use a ref count to 
mark if this
@@ -102,14 +102,14 @@ private:
 
 namespace pipeline {
 struct TransmitInfo {
-    vectorized::PipChannel* channel = nullptr;
+    vectorized::Channel* channel = nullptr;
     std::unique_ptr<PBlock> block;
     bool eos;
     Status exec_status;
 };
 
 struct BroadcastTransmitInfo {
-    vectorized::PipChannel* channel = nullptr;
+    vectorized::Channel* channel = nullptr;
     std::shared_ptr<vectorized::BroadcastPBlockHolder> block_holder = nullptr;
     bool eos;
 };
@@ -169,11 +169,6 @@ private:
     bool _eos;
 };
 
-struct ExchangeRpcContext {
-    std::shared_ptr<ExchangeSendCallback<PTransmitDataResult>> _send_callback;
-    bool is_cancelled = false;
-};
-
 // Each ExchangeSinkOperator have one ExchangeSinkBuffer
 class ExchangeSinkBuffer final : public HasTaskExecutionCtx {
 public:
@@ -198,11 +193,6 @@ public:
         _broadcast_dependency = broadcast_dependency;
     }
 
-    void set_should_stop() {
-        _should_stop = true;
-        _set_ready_to_finish(_busy_channels == 0);
-    }
-
 private:
     friend class ExchangeSinkLocalState;
     void _set_ready_to_finish(bool all_done);
@@ -224,11 +214,9 @@ private:
     phmap::flat_hash_map<InstanceLoId, std::shared_ptr<PTransmitDataParams>> 
_instance_to_request;
     // One channel is corresponding to a downstream instance.
     phmap::flat_hash_map<InstanceLoId, bool> _rpc_channel_is_idle;
-    // Number of busy channels;
-    std::atomic<int> _busy_channels = 0;
+
     phmap::flat_hash_map<InstanceLoId, bool> _instance_to_receiver_eof;
     phmap::flat_hash_map<InstanceLoId, int64_t> _instance_to_rpc_time;
-    phmap::flat_hash_map<InstanceLoId, ExchangeRpcContext> 
_instance_to_rpc_ctx;
 
     std::atomic<bool> _is_finishing;
     PUniqueId _query_id;
@@ -247,7 +235,7 @@ private:
     inline void _failed(InstanceLoId id, const std::string& err);
     inline void _set_receiver_eof(InstanceLoId id);
     inline bool _is_receiver_eof(InstanceLoId id);
-    inline void _turn_off_channel(InstanceLoId id, bool cleanup = false);
+    inline void _turn_off_channel(InstanceLoId id, 
std::unique_lock<std::mutex>& with_lock);
     void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time);
     int64_t get_sum_rpc_time();
 
@@ -255,7 +243,6 @@ private:
     std::shared_ptr<Dependency> _queue_dependency = nullptr;
     std::shared_ptr<Dependency> _finish_dependency = nullptr;
     std::shared_ptr<Dependency> _broadcast_dependency = nullptr;
-    std::atomic<bool> _should_stop = false;
     ExchangeSinkLocalState* _parent = nullptr;
 };
 
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 2f4919c78e2..e138cf010b6 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -23,6 +23,7 @@
 #include <gen_cpp/types.pb.h>
 
 #include <memory>
+#include <mutex>
 #include <random>
 
 #include "common/status.h"
@@ -31,6 +32,8 @@
 #include "pipeline/exec/operator.h"
 #include "pipeline/exec/sort_source_operator.h"
 #include "pipeline/local_exchange/local_exchange_sink_operator.h"
+#include "util/runtime_profile.h"
+#include "util/uid_util.h"
 #include "vec/columns/column_const.h"
 #include "vec/exprs/vexpr.h"
 
@@ -68,8 +71,10 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
     _rows_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "RowsProduced", 
TUnit::UNIT, 1);
     _overall_throughput = _profile->add_derived_counter(
             "OverallThroughput", TUnit::BYTES_PER_SECOND,
-            std::bind<int64_t>(&RuntimeProfile::units_per_second, 
_bytes_sent_counter,
-                               _profile->total_time_counter()),
+            [this]() {
+                return RuntimeProfile::units_per_second(_bytes_sent_counter,
+                                                        
_profile->total_time_counter());
+            },
             "");
     _merge_block_timer = ADD_TIMER(profile(), "MergeBlockTime");
     _local_bytes_send_counter = ADD_COUNTER(_profile, "LocalBytesSent", 
TUnit::BYTES);
@@ -84,15 +89,15 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
         const auto& fragment_instance_id = p._dests[i].fragment_instance_id;
         if (fragment_id_to_channel_index.find(fragment_instance_id.lo) ==
             fragment_id_to_channel_index.end()) {
-            channel_shared_ptrs.emplace_back(
-                    new vectorized::PipChannel(this, p._row_desc, 
p._dests[i].brpc_server,
-                                               fragment_instance_id, 
p._dest_node_id));
-            fragment_id_to_channel_index.emplace(fragment_instance_id.lo,
-                                                 channel_shared_ptrs.size() - 
1);
-            channels.push_back(channel_shared_ptrs.back().get());
+            channels.push_back(std::make_shared<vectorized::Channel>(
+                    this, p._dests[i].brpc_server, fragment_instance_id, 
p._dest_node_id));
+            fragment_id_to_channel_index.emplace(fragment_instance_id.lo, 
channels.size() - 1);
+
+            if (fragment_instance_id.hi != -1 && fragment_instance_id.lo != 
-1) {
+                _working_channels_count++;
+            }
         } else {
-            channel_shared_ptrs.emplace_back(
-                    
channel_shared_ptrs[fragment_id_to_channel_index[fragment_instance_id.lo]]);
+            
channels.emplace_back(channels[fragment_id_to_channel_index[fragment_instance_id.lo]]);
         }
     }
 
@@ -106,6 +111,24 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
     return Status::OK();
 }
 
+void ExchangeSinkLocalState::on_channel_finished(InstanceLoId channel_id) {
+    std::lock_guard<std::mutex> lock(_finished_channels_mutex);
+
+    if (_finished_channels.contains(channel_id)) {
+        LOG(WARNING) << "query: " << print_id(_state->query_id())
+                     << ", on_channel_finished on already finished channel: " 
<< channel_id;
+        return;
+    } else {
+        _finished_channels.emplace(channel_id);
+        if (_working_channels_count.fetch_sub(1) == 1) {
+            set_reach_limit();
+            if (_finish_dependency) {
+                _finish_dependency->set_ready();
+            }
+        }
+    }
+}
+
 Status ExchangeSinkLocalState::open(RuntimeState* state) {
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
@@ -139,7 +162,6 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
         _queue_dependency = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
                                                       
"ExchangeSinkQueueDependency", true);
         _sink_buffer->set_dependency(_queue_dependency, _finish_dependency);
-        _finish_dependency->block();
     }
 
     if ((_part_type == TPartitionType::UNPARTITIONED || channels.size() == 1) 
&&
@@ -151,7 +173,7 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
                 
vectorized::BroadcastPBlockHolderMemLimiter::create_shared(_broadcast_dependency);
     } else if (local_size > 0) {
         size_t dep_id = 0;
-        for (auto* channel : channels) {
+        for (auto& channel : channels) {
             if (channel->is_local()) {
                 if (auto dep = channel->get_local_channel_dependency()) {
                     _local_channels_dependency.push_back(dep);
@@ -166,16 +188,18 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
     }
     if (_part_type == TPartitionType::HASH_PARTITIONED) {
         _partition_count = channels.size();
-        _partitioner.reset(new 
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
-                channels.size()));
+        _partitioner =
+                
std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>(
+                        channels.size());
         RETURN_IF_ERROR(_partitioner->init(p._texprs));
         RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
         _profile->add_info_string("Partitioner",
                                   fmt::format("Crc32HashPartitioner({})", 
_partition_count));
     } else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
-        _partition_count = channel_shared_ptrs.size();
-        _partitioner.reset(new 
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
-                channel_shared_ptrs.size()));
+        _partition_count = channels.size();
+        _partitioner =
+                
std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>(
+                        channels.size());
         RETURN_IF_ERROR(_partitioner->init(p._texprs));
         RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
         _profile->add_info_string("Partitioner",
@@ -221,12 +245,13 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
     } else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
         _partition_count =
                 channels.size() * 
config::table_sink_partition_write_max_partition_nums_per_writer;
-        _partitioner.reset(new 
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
-                _partition_count));
-        _partition_function.reset(new 
HashPartitionFunction(_partitioner.get()));
+        _partitioner =
+                
std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>(
+                        _partition_count);
+        _partition_function = 
std::make_unique<HashPartitionFunction>(_partitioner.get());
 
-        scale_writer_partitioning_exchanger.reset(new 
vectorized::ScaleWriterPartitioningExchanger<
-                                                  HashPartitionFunction>(
+        scale_writer_partitioning_exchanger = std::make_unique<
+                
vectorized::ScaleWriterPartitioningExchanger<HashPartitionFunction>>(
                 channels.size(), *_partition_function, _partition_count, 
channels.size(), 1,
                 
config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold
 /
                                         state->task_num() ==
@@ -239,7 +264,7 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
                                 0
                         ? 
config::table_sink_partition_write_min_data_processed_rebalance_threshold
                         : 
config::table_sink_partition_write_min_data_processed_rebalance_threshold /
-                                  state->task_num()));
+                                  state->task_num());
 
         RETURN_IF_ERROR(_partitioner->init(p._texprs));
         RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
@@ -358,7 +383,7 @@ void 
ExchangeSinkOperatorX::_handle_eof_channel(RuntimeState* state, ChannelPtrT
                                                 Status st) {
     channel->set_receiver_eof(st);
     // Chanel will not send RPC to the downstream when eof, so close chanel by 
OK status.
-    static_cast<void>(channel->close(state, Status::OK()));
+    static_cast<void>(channel->close(state));
 }
 
 Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* 
block, bool eos) {
@@ -367,7 +392,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
     COUNTER_UPDATE(local_state.rows_sent_counter(), (int64_t)block->rows());
     SCOPED_TIMER(local_state.exec_time_counter());
     bool all_receiver_eof = true;
-    for (auto* channel : local_state.channels) {
+    for (auto& channel : local_state.channels) {
         if (!channel->is_receiver_eof()) {
             all_receiver_eof = false;
             break;
@@ -389,13 +414,13 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
             if (!block->empty()) {
                 Status status;
                 size_t idx = 0;
-                for (auto* channel : local_state.channels) {
+                for (auto& channel : local_state.channels) {
                     if (!channel->is_receiver_eof()) {
                         // If this channel is the last, we can move this block 
to downstream pipeline.
                         // Otherwise, this block also need to be broadcasted 
to other channels so should be copied.
                         DCHECK_GE(local_state._last_local_channel_idx, 0);
                         status = channel->send_local_block(
-                                block, idx == 
local_state._last_local_channel_idx);
+                                block, eos, idx == 
local_state._last_local_channel_idx);
                         HANDLE_CHANNEL_STATUS(state, channel, status);
                     }
                     idx++;
@@ -422,7 +447,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
 
                     size_t idx = 0;
                     bool moved = false;
-                    for (auto* channel : local_state.channels) {
+                    for (auto& channel : local_state.channels) {
                         if (!channel->is_receiver_eof()) {
                             Status status;
                             if (channel->is_local()) {
@@ -430,7 +455,8 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
                                 // Otherwise, this block also need to be 
broadcasted to other channels so should be copied.
                                 DCHECK_GE(local_state._last_local_channel_idx, 
0);
                                 status = channel->send_local_block(
-                                        &cur_block, idx == 
local_state._last_local_channel_idx);
+                                        &cur_block, eos,
+                                        idx == 
local_state._last_local_channel_idx);
                                 moved = idx == 
local_state._last_local_channel_idx;
                             } else {
                                 status = 
channel->send_broadcast_block(block_holder, eos);
@@ -451,20 +477,17 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
         }
     } else if (_part_type == TPartitionType::RANDOM) {
         // 1. select channel
-        vectorized::PipChannel* current_channel =
-                local_state.channels[local_state.current_channel_idx];
+        auto& current_channel = 
local_state.channels[local_state.current_channel_idx];
         if (!current_channel->is_receiver_eof()) {
             // 2. serialize, send and rollover block
             if (current_channel->is_local()) {
-                auto status = current_channel->send_local_block(block, true);
+                auto status = current_channel->send_local_block(block, eos, 
true);
                 HANDLE_CHANNEL_STATUS(state, current_channel, status);
             } else {
-                RETURN_IF_ERROR(local_state._serializer.serialize_block(
-                        block, current_channel->ch_cur_pb_block()));
-                auto status =
-                        
current_channel->send_remote_block(current_channel->ch_cur_pb_block(), eos);
+                auto pblock = std::make_unique<PBlock>();
+                RETURN_IF_ERROR(local_state._serializer.serialize_block(block, 
pblock.get()));
+                auto status = 
current_channel->send_remote_block(std::move(pblock), eos);
                 HANDLE_CHANNEL_STATUS(state, current_channel, status);
-                current_channel->ch_roll_pb_block();
             }
         }
         local_state.current_channel_idx =
@@ -486,7 +509,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
                     
local_state._partitioner->get_channel_ids().get<uint32_t>(), rows, block, eos));
         } else {
             RETURN_IF_ERROR(channel_add_rows(
-                    state, local_state.channel_shared_ptrs, 
local_state._partition_count,
+                    state, local_state.channels, local_state._partition_count,
                     
local_state._partitioner->get_channel_ids().get<uint32_t>(), rows, block, eos));
         }
         int64_t new_channel_mem_usage = 0;
@@ -570,20 +593,17 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
     } else if (_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
         // Control the number of channels according to the flow, thereby 
controlling the number of table sink writers.
         // 1. select channel
-        vectorized::PipChannel* current_channel =
-                local_state.channels[local_state.current_channel_idx];
+        auto& current_channel = 
local_state.channels[local_state.current_channel_idx];
         if (!current_channel->is_receiver_eof()) {
             // 2. serialize, send and rollover block
             if (current_channel->is_local()) {
-                auto status = current_channel->send_local_block(block, true);
+                auto status = current_channel->send_local_block(block, eos, 
true);
                 HANDLE_CHANNEL_STATUS(state, current_channel, status);
             } else {
-                RETURN_IF_ERROR(local_state._serializer.serialize_block(
-                        block, current_channel->ch_cur_pb_block()));
-                auto status =
-                        
current_channel->send_remote_block(current_channel->ch_cur_pb_block(), eos);
+                auto pblock = std::make_unique<PBlock>();
+                RETURN_IF_ERROR(local_state._serializer.serialize_block(block, 
pblock.get()));
+                auto status = 
current_channel->send_remote_block(std::move(pblock), eos);
                 HANDLE_CHANNEL_STATUS(state, current_channel, status);
-                current_channel->ch_roll_pb_block();
             }
             _data_processed += block->bytes();
         }
@@ -605,15 +625,12 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
     Status final_st = Status::OK();
     if (eos) {
         local_state._serializer.reset_block();
-        for (int i = 0; i < local_state.channels.size(); ++i) {
-            Status st = local_state.channels[i]->close(state, Status::OK());
+        for (auto& channel : local_state.channels) {
+            Status st = channel->close(state);
             if (!st.ok() && final_st.ok()) {
                 final_st = st;
             }
         }
-        if (local_state._sink_buffer) {
-            local_state._sink_buffer->set_should_stop();
-        }
     }
     return final_st;
 }
@@ -637,8 +654,8 @@ Status 
ExchangeSinkOperatorX::serialize_block(ExchangeSinkLocalState& state, vec
 }
 
 void ExchangeSinkLocalState::register_channels(pipeline::ExchangeSinkBuffer* 
buffer) {
-    for (auto channel : channels) {
-        ((vectorized::PipChannel*)channel)->register_exchange_buffer(buffer);
+    for (auto& channel : channels) {
+        channel->register_exchange_buffer(buffer);
     }
 }
 
@@ -685,12 +702,12 @@ std::string ExchangeSinkLocalState::debug_string(int 
indentation_level) const {
     fmt::memory_buffer debug_string_buffer;
     fmt::format_to(debug_string_buffer, "{}", 
Base::debug_string(indentation_level));
     if (_sink_buffer) {
-        fmt::format_to(
-                debug_string_buffer,
-                ", Sink Buffer: (_should_stop = {}, _busy_channels = {}, 
_is_finishing = {}), "
-                "_reach_limit: {}",
-                _sink_buffer->_should_stop.load(), 
_sink_buffer->_busy_channels.load(),
-                _sink_buffer->_is_finishing.load(), _reach_limit.load());
+        fmt::format_to(debug_string_buffer,
+                       ", Sink Buffer: (_is_finishing = {}, blocks in queue: 
{}, queue capacity: "
+                       "{}, queue dep: {}), _reach_limit: {}, working 
channels: {}",
+                       _sink_buffer->_is_finishing.load(), 
_sink_buffer->_total_queue_size,
+                       _sink_buffer->_queue_capacity, 
(void*)_sink_buffer->_queue_dependency.get(),
+                       _reach_limit.load(), _working_channels_count.load());
     }
     return fmt::to_string(debug_string_buffer);
 }
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index a34237145a7..8af944728a2 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -19,7 +19,9 @@
 
 #include <stdint.h>
 
+#include <atomic>
 #include <memory>
+#include <mutex>
 
 #include "common/status.h"
 #include "exchange_sink_buffer.h"
@@ -53,13 +55,10 @@ private:
 
 public:
     ExchangeSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
-            : Base(parent, state),
-              current_channel_idx(0),
-              only_local_exchange(false),
-              _serializer(this) {
+            : Base(parent, state), _serializer(this) {
         _finish_dependency =
                 std::make_shared<Dependency>(parent->operator_id(), 
parent->node_id(),
-                                             parent->get_name() + 
"_FINISH_DEPENDENCY", true);
+                                             parent->get_name() + 
"_FINISH_DEPENDENCY", false);
     }
 
     std::vector<Dependency*> dependencies() const override {
@@ -112,10 +111,11 @@ public:
         return Status::OK();
     }
     Status _send_new_partition_batch();
-    std::vector<vectorized::PipChannel*> channels;
-    std::vector<std::shared_ptr<vectorized::PipChannel>> channel_shared_ptrs;
-    int current_channel_idx; // index of current channel to send to if _random 
== true
-    bool only_local_exchange;
+    std::vector<std::shared_ptr<vectorized::Channel>> channels;
+    int current_channel_idx {0}; // index of current channel to send to if 
_random == true
+    bool only_local_exchange {false};
+
+    void on_channel_finished(InstanceLoId channel_id);
 
     // for external table sink hash partition
     
std::unique_ptr<vectorized::ScaleWriterPartitioningExchanger<HashPartitionFunction>>
@@ -123,9 +123,8 @@ public:
 
 private:
     friend class ExchangeSinkOperatorX;
-    friend class vectorized::Channel<ExchangeSinkLocalState>;
-    friend class vectorized::PipChannel;
-    friend class vectorized::BlockSerializer<ExchangeSinkLocalState>;
+    friend class vectorized::Channel;
+    friend class vectorized::BlockSerializer;
 
     std::unique_ptr<ExchangeSinkBuffer> _sink_buffer = nullptr;
     RuntimeProfile::Counter* _serialize_batch_timer = nullptr;
@@ -154,7 +153,7 @@ private:
     int _sender_id;
     std::shared_ptr<vectorized::BroadcastPBlockHolderMemLimiter> 
_broadcast_pb_mem_limiter;
 
-    vectorized::BlockSerializer<ExchangeSinkLocalState> _serializer;
+    vectorized::BlockSerializer _serializer;
 
     std::shared_ptr<Dependency> _queue_dependency = nullptr;
     std::shared_ptr<Dependency> _broadcast_dependency = nullptr;
@@ -203,6 +202,10 @@ private:
     std::unique_ptr<HashPartitionFunction> _partition_function = nullptr;
     std::atomic<bool> _reach_limit = false;
     int _last_local_channel_idx = -1;
+
+    std::atomic_int _working_channels_count = 0;
+    std::set<InstanceLoId> _finished_channels;
+    std::mutex _finished_channels_mutex;
 };
 
 class ExchangeSinkOperatorX final : public 
DataSinkOperatorX<ExchangeSinkLocalState> {
diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp 
b/be/src/pipeline/exec/result_file_sink_operator.cpp
index a11c4df6625..93026427b86 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -31,9 +31,7 @@ namespace doris::pipeline {
 
 ResultFileSinkLocalState::ResultFileSinkLocalState(DataSinkOperatorXBase* 
parent,
                                                    RuntimeState* state)
-        : AsyncWriterSink<vectorized::VFileResultWriter, 
ResultFileSinkOperatorX>(parent, state),
-          _serializer(
-                  
std::make_unique<vectorized::BlockSerializer<ResultFileSinkLocalState>>(this)) 
{}
+        : AsyncWriterSink<vectorized::VFileResultWriter, 
ResultFileSinkOperatorX>(parent, state) {}
 
 ResultFileSinkOperatorX::ResultFileSinkOperatorX(int operator_id, const 
RowDescriptor& row_desc,
                                                  const std::vector<TExpr>& 
t_output_expr)
@@ -145,14 +143,6 @@ Status ResultFileSinkLocalState::close(RuntimeState* 
state, Status exec_status)
     return Base::close(state, exec_status);
 }
 
-template <typename ChannelPtrType>
-void ResultFileSinkLocalState::_handle_eof_channel(RuntimeState* state, 
ChannelPtrType channel,
-                                                   Status st) {
-    channel->set_receiver_eof(st);
-    // Chanel will not send RPC to the downstream when eof, so close chanel by 
OK status.
-    static_cast<void>(channel->close(state, Status::OK()));
-}
-
 Status ResultFileSinkOperatorX::sink(RuntimeState* state, vectorized::Block* 
in_block, bool eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
diff --git a/be/src/pipeline/exec/result_file_sink_operator.h 
b/be/src/pipeline/exec/result_file_sink_operator.h
index e99eb709a9f..7268efe4de4 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.h
+++ b/be/src/pipeline/exec/result_file_sink_operator.h
@@ -21,10 +21,6 @@
 #include "vec/sink/writer/vfile_result_writer.h"
 
 namespace doris::vectorized {
-template <typename Parent>
-class BlockSerializer;
-template <typename Parent>
-class Channel;
 class BroadcastPBlockHolder;
 } // namespace doris::vectorized
 
@@ -55,13 +51,8 @@ public:
 private:
     friend class ResultFileSinkOperatorX;
 
-    template <typename ChannelPtrType>
-    void _handle_eof_channel(RuntimeState* state, ChannelPtrType channel, 
Status st);
-
     std::shared_ptr<BufferControlBlock> _sender;
 
-    std::vector<vectorized::Channel<ResultFileSinkLocalState>*> _channels;
-    std::unique_ptr<vectorized::BlockSerializer<ResultFileSinkLocalState>> 
_serializer;
     std::shared_ptr<vectorized::BroadcastPBlockHolder> _block_holder;
     RuntimeProfile::Counter* _brpc_wait_timer = nullptr;
     RuntimeProfile::Counter* _local_send_timer = nullptr;
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index b139c503c9a..9405ed2e43e 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -54,8 +54,7 @@
 
 namespace doris::vectorized {
 
-template <typename Parent>
-Status Channel<Parent>::init(RuntimeState* state) {
+Status Channel::init(RuntimeState* state) {
     if (_brpc_dest_addr.hostname.empty()) {
         LOG(WARNING) << "there is no brpc destination address's hostname"
                         ", maybe version is not compatible.";
@@ -64,9 +63,11 @@ Status Channel<Parent>::init(RuntimeState* state) {
     if (state->query_options().__isset.enable_local_exchange) {
         _is_local &= state->query_options().enable_local_exchange;
     }
+
     if (_is_local) {
         return Status::OK();
     }
+
     if (_brpc_dest_addr.hostname == BackendOptions::get_localhost()) {
         _brpc_stub = 
state->exec_env()->brpc_internal_client_cache()->get_client(
                 "127.0.0.1", _brpc_dest_addr.port);
@@ -83,8 +84,7 @@ Status Channel<Parent>::init(RuntimeState* state) {
     return Status::OK();
 }
 
-template <typename Parent>
-Status Channel<Parent>::open(RuntimeState* state) {
+Status Channel::open(RuntimeState* state) {
     if (_is_local) {
         auto st = _parent->state()->exec_env()->vstream_mgr()->find_recvr(
                 _fragment_instance_id, _dest_node_id, &_local_recvr);
@@ -94,19 +94,6 @@ Status Channel<Parent>::open(RuntimeState* state) {
         }
     }
     _be_number = state->be_number();
-    _brpc_request = std::make_shared<PTransmitDataParams>();
-    // initialize brpc request
-    _brpc_request->mutable_finst_id()->set_hi(_fragment_instance_id.hi);
-    _brpc_request->mutable_finst_id()->set_lo(_fragment_instance_id.lo);
-    _finst_id = _brpc_request->finst_id();
-
-    _brpc_request->mutable_query_id()->set_hi(state->query_id().hi);
-    _brpc_request->mutable_query_id()->set_lo(state->query_id().lo);
-    _query_id = _brpc_request->query_id();
-
-    _brpc_request->set_node_id(_dest_node_id);
-    _brpc_request->set_sender_id(_parent->sender_id());
-    _brpc_request->set_be_number(_be_number);
 
     _brpc_timeout_ms = std::min(3600, state->execution_timeout()) * 1000;
 
@@ -116,28 +103,26 @@ Status Channel<Parent>::open(RuntimeState* state) {
     // to build a camouflaged empty channel. the ip and port is '0.0.0.0:0"
     // so the empty channel not need call function close_internal()
     _need_close = (_fragment_instance_id.hi != -1 && _fragment_instance_id.lo 
!= -1);
+
     _state = state;
     return Status::OK();
 }
 
-std::shared_ptr<pipeline::Dependency> 
PipChannel::get_local_channel_dependency() {
-    if (!Channel<pipeline::ExchangeSinkLocalState>::_local_recvr) {
+std::shared_ptr<pipeline::Dependency> Channel::get_local_channel_dependency() {
+    if (!_local_recvr) {
         return nullptr;
     }
-    return 
Channel<pipeline::ExchangeSinkLocalState>::_local_recvr->get_local_channel_dependency(
-            Channel<pipeline::ExchangeSinkLocalState>::_parent->sender_id());
+    return _local_recvr->get_local_channel_dependency(_parent->sender_id());
 }
 
-int64_t PipChannel::mem_usage() const {
-    auto* mutable_block = 
Channel<pipeline::ExchangeSinkLocalState>::_serializer.get_block();
+int64_t Channel::mem_usage() const {
+    auto* mutable_block = _serializer.get_block();
     int64_t mem_usage = mutable_block ? mutable_block->allocated_bytes() : 0;
     return mem_usage;
 }
 
-Status PipChannel::send_remote_block(PBlock* block, bool eos, Status 
exec_status) {
-    
COUNTER_UPDATE(Channel<pipeline::ExchangeSinkLocalState>::_parent->blocks_sent_counter(),
 1);
-    std::unique_ptr<PBlock> pblock_ptr;
-    pblock_ptr.reset(block);
+Status Channel::send_remote_block(std::unique_ptr<PBlock>&& block, bool eos) {
+    COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
 
     if (eos) {
         if (_eos_send) {
@@ -147,13 +132,13 @@ Status PipChannel::send_remote_block(PBlock* block, bool 
eos, Status exec_status
         }
     }
     if (eos || block->column_metas_size()) {
-        RETURN_IF_ERROR(_buffer->add_block({this, std::move(pblock_ptr), eos, 
exec_status}));
+        RETURN_IF_ERROR(_buffer->add_block({this, std::move(block), eos, 
Status::OK()}));
     }
     return Status::OK();
 }
 
-Status 
PipChannel::send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block, 
bool eos) {
-    
COUNTER_UPDATE(Channel<pipeline::ExchangeSinkLocalState>::_parent->blocks_sent_counter(),
 1);
+Status Channel::send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& 
block, bool eos) {
+    COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
     if (eos) {
         if (_eos_send) {
             return Status::OK();
@@ -166,128 +151,88 @@ Status 
PipChannel::send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>&
     return Status::OK();
 }
 
-Status PipChannel::send_current_block(bool eos, Status exec_status) {
-    if (Channel<pipeline::ExchangeSinkLocalState>::is_local()) {
-        return 
Channel<pipeline::ExchangeSinkLocalState>::send_local_block(exec_status, eos);
+Status Channel::_send_current_block(bool eos) {
+    if (is_local()) {
+        return _send_local_block(eos);
     }
-    RETURN_IF_ERROR(send_remote_block(_pblock.release(), eos, exec_status));
-    return Status::OK();
+    return send_remote_block(std::move(_pblock), eos);
 }
 
-template <typename Parent>
-Status Channel<Parent>::send_local_block(Status exec_status, bool eos) {
-    SCOPED_TIMER(_parent->local_send_timer());
-    Block block = _serializer.get_block()->to_block();
-    _serializer.get_block()->set_mutable_columns(block.clone_empty_columns());
-    if (_recvr_is_valid()) {
-        if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, 
Parent>) {
-            COUNTER_UPDATE(_parent->local_bytes_send_counter(), block.bytes());
-            COUNTER_UPDATE(_parent->local_sent_rows(), block.rows());
-            COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
-        }
+Status Channel::_send_local_block(bool eos) {
+    Block block;
+    if (_serializer.get_block() != nullptr) {
+        block = _serializer.get_block()->to_block();
+        
_serializer.get_block()->set_mutable_columns(block.clone_empty_columns());
+    }
 
-        _local_recvr->add_block(&block, _parent->sender_id(), true);
-        if (eos) {
-            _local_recvr->remove_sender(_parent->sender_id(), _be_number, 
exec_status);
-        }
-        return Status::OK();
-    } else {
-        _serializer.reset_block();
-        return _receiver_status;
+    if (!block.empty() || eos) {
+        RETURN_IF_ERROR(send_local_block(&block, eos, true));
     }
+    return Status::OK();
 }
 
-template <typename Parent>
-Status Channel<Parent>::send_local_block(Block* block, bool can_be_moved) {
+Status Channel::send_local_block(Block* block, bool eos, bool can_be_moved) {
     SCOPED_TIMER(_parent->local_send_timer());
-    if (_recvr_is_valid()) {
-        if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, 
Parent>) {
-            COUNTER_UPDATE(_parent->local_bytes_send_counter(), 
block->bytes());
-            COUNTER_UPDATE(_parent->local_sent_rows(), block->rows());
-            COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
-        }
-        _local_recvr->add_block(block, _parent->sender_id(), can_be_moved);
-        return Status::OK();
-    } else {
-        return _receiver_status;
-    }
-}
 
-template <typename Parent>
-Status Channel<Parent>::close_wait(RuntimeState* state) {
-    if (_need_close) {
-        Status st = _wait_last_brpc();
-        if (st.is<ErrorCode::END_OF_FILE>()) {
-            st = Status::OK();
-        } else if (!st.ok()) {
-            state->log_error(st.to_string());
+    if (eos) {
+        if (_eos_send) {
+            return Status::OK();
+        } else {
+            _eos_send = true;
         }
-        _need_close = false;
-        return st;
     }
-    _serializer.reset_block();
-    return Status::OK();
-}
 
-Status PipChannel::close_internal(Status exec_status) {
-    if (!_need_close) {
-        return Status::OK();
-    }
-    VLOG_RPC << "Channel::close_internal() instance_id=" << 
print_id(_fragment_instance_id)
-             << " dest_node=" << _dest_node_id << " #rows= "
-             << ((_serializer.get_block() == nullptr) ? 0 : 
_serializer.get_block()->rows())
-             << " receiver status: " << _receiver_status << ", exec_status: " 
<< exec_status;
     if (is_receiver_eof()) {
-        _serializer.reset_block();
-        return Status::OK();
+        return _receiver_status;
     }
-    Status status;
-    if (_serializer.get_block() != nullptr && _serializer.get_block()->rows() 
> 0) {
-        status = send_current_block(true, exec_status);
-    } else {
-        if (is_local()) {
-            if (_recvr_is_valid()) {
-                _local_recvr->remove_sender(_parent->sender_id(), _be_number, 
exec_status);
-            }
-        } else {
-            // Non pipeline engine will send an empty eos block
-            status = send_remote_block((PBlock*)nullptr, true, exec_status);
+
+    auto receiver_status = _recvr_status();
+    if (receiver_status.ok()) {
+        COUNTER_UPDATE(_parent->local_bytes_send_counter(), block->bytes());
+        COUNTER_UPDATE(_parent->local_sent_rows(), block->rows());
+        COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
+
+        const auto sender_id = _parent->sender_id();
+        if (!block->empty()) [[likely]] {
+            _local_recvr->add_block(block, sender_id, can_be_moved);
+        }
+
+        if (eos) [[unlikely]] {
+            _local_recvr->remove_sender(sender_id, _be_number, Status::OK());
+            _parent->on_channel_finished(_fragment_instance_id.lo);
         }
-    }
-    // Don't wait for the last packet to finish, left it to close_wait.
-    if (status.is<ErrorCode::END_OF_FILE>()) {
         return Status::OK();
     } else {
-        return status;
+        _receiver_status = std::move(receiver_status);
+        _parent->on_channel_finished(_fragment_instance_id.lo);
+        return _receiver_status;
     }
 }
 
-Status PipChannel::close(RuntimeState* state, Status exec_status) {
+Status Channel::close(RuntimeState* state) {
     if (_closed) {
         return Status::OK();
     }
     _closed = true;
 
-    Status st = close_internal(exec_status);
-    if (!st.ok()) {
-        state->log_error(st.to_string());
+    if (!_need_close) {
+        return Status::OK();
     }
-    return st;
-}
 
-template <typename Parent>
-void Channel<Parent>::ch_roll_pb_block() {
-    _ch_cur_pb_block = (_ch_cur_pb_block == &_ch_pb_block1 ? &_ch_pb_block2 : 
&_ch_pb_block1);
+    if (is_receiver_eof()) {
+        _serializer.reset_block();
+        return Status::OK();
+    } else {
+        return _send_current_block(true);
+    }
 }
 
-template <typename Parent>
-BlockSerializer<Parent>::BlockSerializer(Parent* parent, bool is_local)
+BlockSerializer::BlockSerializer(pipeline::ExchangeSinkLocalState* parent, 
bool is_local)
         : _parent(parent), _is_local(is_local), 
_batch_size(parent->state()->batch_size()) {}
 
-template <typename Parent>
-Status BlockSerializer<Parent>::next_serialized_block(Block* block, PBlock* 
dest, int num_receivers,
-                                                      bool* serialized, bool 
eos,
-                                                      const 
std::vector<uint32_t>* rows) {
+Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest, int 
num_receivers,
+                                              bool* serialized, bool eos,
+                                              const std::vector<uint32_t>* 
rows) {
     if (_mutable_block == nullptr) {
         _mutable_block = MutableBlock::create_unique(block->clone_empty());
     }
@@ -316,8 +261,7 @@ Status 
BlockSerializer<Parent>::next_serialized_block(Block* block, PBlock* dest
     return Status::OK();
 }
 
-template <typename Parent>
-Status BlockSerializer<Parent>::serialize_block(PBlock* dest, int 
num_receivers) {
+Status BlockSerializer::serialize_block(PBlock* dest, int num_receivers) {
     if (_mutable_block && _mutable_block->rows() > 0) {
         auto block = _mutable_block->to_block();
         RETURN_IF_ERROR(serialize_block(&block, dest, num_receivers));
@@ -328,29 +272,20 @@ Status BlockSerializer<Parent>::serialize_block(PBlock* 
dest, int num_receivers)
     return Status::OK();
 }
 
-template <typename Parent>
-Status BlockSerializer<Parent>::serialize_block(const Block* src, PBlock* 
dest, int num_receivers) {
-    if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) 
{
-        SCOPED_TIMER(_parent->_serialize_batch_timer);
-        dest->Clear();
-        size_t uncompressed_bytes = 0, compressed_bytes = 0;
-        RETURN_IF_ERROR(src->serialize(
-                _parent->_state->be_exec_version(), dest, &uncompressed_bytes, 
&compressed_bytes,
-                _parent->compression_type(), 
_parent->transfer_large_data_by_brpc()));
-        COUNTER_UPDATE(_parent->_bytes_sent_counter, compressed_bytes * 
num_receivers);
-        COUNTER_UPDATE(_parent->_uncompressed_bytes_counter, 
uncompressed_bytes * num_receivers);
-        COUNTER_UPDATE(_parent->_compress_timer, src->get_compress_time());
-        
_parent->get_query_statistics_ptr()->add_shuffle_send_bytes(compressed_bytes *
-                                                                    
num_receivers);
-        _parent->get_query_statistics_ptr()->add_shuffle_send_rows(src->rows() 
* num_receivers);
-    }
+Status BlockSerializer::serialize_block(const Block* src, PBlock* dest, int 
num_receivers) {
+    SCOPED_TIMER(_parent->_serialize_batch_timer);
+    dest->Clear();
+    size_t uncompressed_bytes = 0, compressed_bytes = 0;
+    RETURN_IF_ERROR(src->serialize(_parent->_state->be_exec_version(), dest, 
&uncompressed_bytes,
+                                   &compressed_bytes, 
_parent->compression_type(),
+                                   _parent->transfer_large_data_by_brpc()));
+    COUNTER_UPDATE(_parent->_bytes_sent_counter, compressed_bytes * 
num_receivers);
+    COUNTER_UPDATE(_parent->_uncompressed_bytes_counter, uncompressed_bytes * 
num_receivers);
+    COUNTER_UPDATE(_parent->_compress_timer, src->get_compress_time());
+    
_parent->get_query_statistics_ptr()->add_shuffle_send_bytes(compressed_bytes * 
num_receivers);
+    _parent->get_query_statistics_ptr()->add_shuffle_send_rows(src->rows() * 
num_receivers);
 
     return Status::OK();
 }
 
-template class Channel<pipeline::ExchangeSinkLocalState>;
-template class Channel<pipeline::ResultFileSinkLocalState>;
-template class BlockSerializer<pipeline::ResultFileSinkLocalState>;
-template class BlockSerializer<pipeline::ExchangeSinkLocalState>;
-
 } // namespace doris::vectorized
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index f59a39e4969..da0ee22ac14 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -72,13 +72,10 @@ class ExchangeSinkLocalState;
 } // namespace pipeline
 
 namespace vectorized {
-template <typename>
-class Channel;
 
-template <typename Parent>
 class BlockSerializer {
 public:
-    BlockSerializer(Parent* parent, bool is_local = true);
+    BlockSerializer(pipeline::ExchangeSinkLocalState* parent, bool is_local = 
true);
     Status next_serialized_block(Block* src, PBlock* dest, int num_receivers, 
bool* serialized,
                                  bool eos, const std::vector<uint32_t>* rows = 
nullptr);
     Status serialize_block(PBlock* dest, int num_receivers = 1);
@@ -91,14 +88,13 @@ public:
     void set_is_local(bool is_local) { _is_local = is_local; }
 
 private:
-    Parent* _parent;
+    pipeline::ExchangeSinkLocalState* _parent;
     std::unique_ptr<MutableBlock> _mutable_block;
 
     bool _is_local;
     const int _batch_size;
 };
 
-template <typename Parent>
 class Channel {
 public:
     friend class pipeline::ExchangeSinkBuffer;
@@ -106,23 +102,15 @@ public:
     // combination. buffer_size is specified in bytes and a soft limit on
     // how much tuple data is getting accumulated before being sent; it only 
applies
     // when data is added via add_row() and not sent directly via send_batch().
-    Channel(Parent* parent, const RowDescriptor& row_desc, TNetworkAddress 
brpc_dest,
+    Channel(pipeline::ExchangeSinkLocalState* parent, TNetworkAddress 
brpc_dest,
             TUniqueId fragment_instance_id, PlanNodeId dest_node_id)
             : _parent(parent),
-              _row_desc(row_desc),
               _fragment_instance_id(std::move(fragment_instance_id)),
               _dest_node_id(dest_node_id),
-              _need_close(false),
-              _closed(false),
               _brpc_dest_addr(std::move(brpc_dest)),
               _is_local((_brpc_dest_addr.hostname == 
BackendOptions::get_localhost()) &&
                         (_brpc_dest_addr.port == config::brpc_port)),
-              _serializer(_parent, _is_local) {
-        if (_is_local) {
-            VLOG_NOTICE << "will use local Exchange, dest_node_id is : " << 
_dest_node_id;
-        }
-        _ch_cur_pb_block = &_ch_pb_block1;
-    }
+              _serializer(_parent, _is_local) {}
 
     virtual ~Channel() = default;
 
@@ -131,16 +119,12 @@ public:
     Status init(RuntimeState* state);
     Status open(RuntimeState* state);
 
-    Status send_local_block(Status exec_status, bool eos = false);
-
-    Status send_local_block(Block* block, bool can_be_moved);
-
-    // Get close wait's response, to finish channel close operation.
-    Status close_wait(RuntimeState* state);
-
-    int64_t num_data_bytes_sent() const { return _num_data_bytes_sent; }
-
-    PBlock* ch_cur_pb_block() { return _ch_cur_pb_block; }
+    Status send_local_block(Block* block, bool eos, bool can_be_moved);
+    // Flush buffered rows and close channel. This function don't wait the 
response
+    // of close operation, client should call close_wait() to finish channel's 
close.
+    // We split one close operation into two phases in order to make multiple 
channels
+    // can run parallel.
+    Status close(RuntimeState* state);
 
     std::string get_fragment_instance_id_str() {
         UniqueId uid(_fragment_instance_id);
@@ -149,155 +133,40 @@ public:
 
     bool is_local() const { return _is_local; }
 
-    virtual void ch_roll_pb_block();
-
     bool is_receiver_eof() const { return 
_receiver_status.is<ErrorCode::END_OF_FILE>(); }
 
     void set_receiver_eof(Status st) { _receiver_status = st; }
 
-protected:
-    bool _recvr_is_valid() {
-        if (_local_recvr && !_local_recvr->is_closed()) {
-            return true;
-        }
-        _receiver_status = Status::EndOfFile(
-                "local data stream receiver closed"); // local data stream 
receiver closed
-        return false;
-    }
-
-    Status _wait_last_brpc() {
-        SCOPED_TIMER(_parent->brpc_wait_timer());
-        if (_send_remote_block_callback == nullptr) {
-            return Status::OK();
-        }
-        _send_remote_block_callback->join();
-        if (_send_remote_block_callback->cntl_->Failed()) {
-            std::string err = fmt::format(
-                    "failed to send brpc batch, error={}, error_text={}, 
client: {}, "
-                    "latency = {}",
-                    berror(_send_remote_block_callback->cntl_->ErrorCode()),
-                    _send_remote_block_callback->cntl_->ErrorText(),
-                    BackendOptions::get_localhost(),
-                    _send_remote_block_callback->cntl_->latency_us());
-            LOG(WARNING) << err;
-            return Status::RpcError(err);
-        }
-        _receiver_status = 
Status::create(_send_remote_block_callback->response_->status());
-        return _receiver_status;
-    }
-
-    Parent* _parent = nullptr;
-
-    const RowDescriptor& _row_desc;
-    const TUniqueId _fragment_instance_id;
-    PlanNodeId _dest_node_id;
-
-    // the number of RowBatch.data bytes sent successfully
-    int64_t _num_data_bytes_sent {};
-    int64_t _packet_seq {};
-
-    bool _need_close;
-    bool _closed;
-    int _be_number;
-
-    TNetworkAddress _brpc_dest_addr;
-
-    PUniqueId _finst_id;
-    PUniqueId _query_id;
-    PBlock _pb_block;
-    std::shared_ptr<PTransmitDataParams> _brpc_request = nullptr;
-    std::shared_ptr<PBackendService_Stub> _brpc_stub = nullptr;
-    std::shared_ptr<DummyBrpcCallback<PTransmitDataResult>> 
_send_remote_block_callback;
-    Status _receiver_status;
-    int32_t _brpc_timeout_ms = 500;
-    RuntimeState* _state = nullptr;
-
-    bool _is_local;
-    std::shared_ptr<VDataStreamRecvr> _local_recvr;
-    // serialized blocks for broadcasting; we need two so we can write
-    // one while the other one is still being sent.
-    // Which is for same reason as `_cur_pb_block`, `_pb_block1` and 
`_pb_block2`
-    // in VDataStreamSender.
-    PBlock* _ch_cur_pb_block = nullptr;
-    PBlock _ch_pb_block1;
-    PBlock _ch_pb_block2;
-
-    BlockSerializer<Parent> _serializer;
-};
-
-#define HANDLE_CHANNEL_STATUS(state, channel, status)    \
-    do {                                                 \
-        if (status.is<ErrorCode::END_OF_FILE>()) {       \
-            _handle_eof_channel(state, channel, status); \
-        } else {                                         \
-            RETURN_IF_ERROR(status);                     \
-        }                                                \
-    } while (0)
-
-class PipChannel final : public Channel<pipeline::ExchangeSinkLocalState> {
-public:
-    PipChannel(pipeline::ExchangeSinkLocalState* parent, const RowDescriptor& 
row_desc,
-               const TNetworkAddress& brpc_dest, const TUniqueId& 
fragment_instance_id,
-               PlanNodeId dest_node_id)
-            : Channel<pipeline::ExchangeSinkLocalState>(parent, row_desc, 
brpc_dest,
-                                                        fragment_instance_id, 
dest_node_id) {
-        ch_roll_pb_block();
-    }
-
-    ~PipChannel() override { delete 
Channel<pipeline::ExchangeSinkLocalState>::_ch_cur_pb_block; }
-
     int64_t mem_usage() const;
 
-    void ch_roll_pb_block() override {
-        // We have two choices here.
-        // 1. Use a PBlock pool and fetch an available PBlock if we need one. 
In this way, we can
-        //    reuse the memory, but we have to use a lock to synchronize.
-        // 2. Create a new PBlock every time. In this way we don't need a lock 
but have to allocate
-        //    new memory.
-        // Now we use the second way.
-        Channel<pipeline::ExchangeSinkLocalState>::_ch_cur_pb_block = new 
PBlock();
-    }
-
-    // Flush buffered rows and close channel. This function don't wait the 
response
-    // of close operation, client should call close_wait() to finish channel's 
close.
-    // We split one close operation into two phases in order to make multiple 
channels
-    // can run parallel.
-    Status close(RuntimeState* state, Status exec_status);
-
-    Status close_internal(Status exec_status);
-
     // Asynchronously sends a block
     // Returns the status of the most recently finished transmit_data
     // rpc (or OK if there wasn't one that hasn't been reported yet).
     // if batch is nullptr, send the eof packet
-    Status send_remote_block(PBlock* block, bool eos = false, Status 
exec_status = Status::OK());
-
+    Status send_remote_block(std::unique_ptr<PBlock>&& block, bool eos = 
false);
     Status send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block, 
bool eos = false);
 
     Status add_rows(Block* block, const std::vector<uint32_t>& rows, bool eos) 
{
-        if 
(Channel<pipeline::ExchangeSinkLocalState>::_fragment_instance_id.lo == -1) {
+        if (_fragment_instance_id.lo == -1) {
             return Status::OK();
         }
 
         bool serialized = false;
-        _pblock = std::make_unique<PBlock>();
-        RETURN_IF_ERROR(
-                
Channel<pipeline::ExchangeSinkLocalState>::_serializer.next_serialized_block(
-                        block, _pblock.get(), 1, &serialized, eos, &rows));
+        if (_pblock == nullptr) {
+            _pblock = std::make_unique<PBlock>();
+        }
+        RETURN_IF_ERROR(_serializer.next_serialized_block(block, 
_pblock.get(), 1, &serialized, eos,
+                                                          &rows));
         if (serialized) {
-            Status exec_status = Status::OK();
-            RETURN_IF_ERROR(send_current_block(eos, exec_status));
+            RETURN_IF_ERROR(_send_current_block(eos));
         }
 
         return Status::OK();
     }
 
-    // send _mutable_block
-    Status send_current_block(bool eos, Status exec_status);
-
     void register_exchange_buffer(pipeline::ExchangeSinkBuffer* buffer) {
         _buffer = buffer;
-        
_buffer->register_sink(Channel<pipeline::ExchangeSinkLocalState>::_fragment_instance_id);
+        _buffer->register_sink(_fragment_instance_id);
     }
 
     std::shared_ptr<pipeline::ExchangeSendCallback<PTransmitDataResult>> 
get_send_callback(
@@ -313,12 +182,53 @@ public:
 
     std::shared_ptr<pipeline::Dependency> get_local_channel_dependency();
 
-private:
+protected:
+    Status _send_local_block(bool eos);
+    Status _send_current_block(bool eos);
+
+    Status _recvr_status() const {
+        if (_local_recvr && !_local_recvr->is_closed()) {
+            return Status::OK();
+        }
+        return Status::EndOfFile(
+                "local data stream receiver closed"); // local data stream 
receiver closed
+    }
+
+    pipeline::ExchangeSinkLocalState* _parent = nullptr;
+
+    const TUniqueId _fragment_instance_id;
+    PlanNodeId _dest_node_id;
+    bool _closed {false};
+    bool _need_close {false};
+    int _be_number;
+
+    TNetworkAddress _brpc_dest_addr;
+
+    PBlock _pb_block;
+    std::shared_ptr<PBackendService_Stub> _brpc_stub = nullptr;
+    Status _receiver_status;
+    int32_t _brpc_timeout_ms = 500;
+    RuntimeState* _state = nullptr;
+
+    bool _is_local;
+    std::shared_ptr<VDataStreamRecvr> _local_recvr;
+
+    BlockSerializer _serializer;
+
     pipeline::ExchangeSinkBuffer* _buffer = nullptr;
     bool _eos_send = false;
     std::shared_ptr<pipeline::ExchangeSendCallback<PTransmitDataResult>> 
_send_callback;
     std::unique_ptr<PBlock> _pblock;
 };
 
+#define HANDLE_CHANNEL_STATUS(state, channel, status)    \
+    do {                                                 \
+        if (status.is<ErrorCode::END_OF_FILE>()) {       \
+            _handle_eof_channel(state, channel, status); \
+        } else {                                         \
+            RETURN_IF_ERROR(status);                     \
+        }                                                \
+    } while (0)
+
 } // namespace vectorized
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to