This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve by this push:
     new 82f2878bd4e [fix](scan) Avoid memory allocated by buffered_reader from 
being traced (#42558)
82f2878bd4e is described below

commit 82f2878bd4e632ec4564b5e61f51b84fcce7056c
Author: Jerry Hu <mrh...@gmail.com>
AuthorDate: Mon Oct 28 15:07:06 2024 +0800

    [fix](scan) Avoid memory allocated by buffered_reader from being traced 
(#42558)
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
    
    ---------
    
    Co-authored-by: Pxl <pxl...@qq.com>
    Co-authored-by: Gabriel <gabrielleeb...@gmail.com>
---
 be/src/io/fs/buffered_reader.cpp                   |  19 +-
 be/src/io/fs/buffered_reader.h                     |  11 +-
 be/src/pipeline/exec/exchange_sink_buffer.cpp      |  49 +---
 be/src/pipeline/exec/exchange_sink_buffer.h        |  28 +-
 be/src/pipeline/exec/exchange_sink_operator.cpp    | 135 +++++-----
 be/src/pipeline/exec/exchange_sink_operator.h      |  29 +-
 be/src/pipeline/exec/result_file_sink_operator.cpp |  12 +-
 be/src/pipeline/exec/result_file_sink_operator.h   |   9 -
 be/src/util/slice.h                                |   6 +
 be/src/vec/runtime/partitioner.cpp                 |  41 +--
 be/src/vec/runtime/partitioner.h                   |  49 +---
 be/src/vec/sink/vdata_stream_sender.cpp            | 291 +++++----------------
 be/src/vec/sink/vdata_stream_sender.h              | 232 +++++-----------
 13 files changed, 290 insertions(+), 621 deletions(-)

diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp
index 43445ed42ef..ce4767dd040 100644
--- a/be/src/io/fs/buffered_reader.cpp
+++ b/be/src/io/fs/buffered_reader.cpp
@@ -23,6 +23,7 @@
 
 #include <algorithm>
 #include <chrono>
+#include <memory>
 
 #include "common/compiler_util.h" // IWYU pragma: keep
 #include "common/config.h"
@@ -31,6 +32,7 @@
 #include "runtime/thread_context.h"
 #include "runtime/workload_management/io_throttle.h"
 #include "util/runtime_profile.h"
+#include "util/slice.h"
 #include "util/threadpool.h"
 
 namespace doris {
@@ -270,7 +272,7 @@ void MergeRangeFileReader::_read_in_box(RangeCachedData& 
cached_data, size_t off
             }
             if (copy_out != nullptr) {
                 memcpy(copy_out + to_handle - remaining,
-                       _boxes[box_index] + cached_data.box_start_offset[i], 
box_to_handle);
+                       _boxes[box_index].data() + 
cached_data.box_start_offset[i], box_to_handle);
             }
             remaining -= box_to_handle;
             cached_data.box_start_offset[i] += box_to_handle;
@@ -307,14 +309,15 @@ void MergeRangeFileReader::_read_in_box(RangeCachedData& 
cached_data, size_t off
 
 Status MergeRangeFileReader::_fill_box(int range_index, size_t start_offset, 
size_t to_read,
                                        size_t* bytes_read, const IOContext* 
io_ctx) {
-    if (_read_slice == nullptr) {
-        _read_slice = new char[READ_SLICE_SIZE];
+    if (!_read_slice) {
+        _read_slice = std::make_unique<OwnedSlice>(READ_SLICE_SIZE);
     }
+
     *bytes_read = 0;
     {
         SCOPED_RAW_TIMER(&_statistics.read_time);
-        RETURN_IF_ERROR(
-                _reader->read_at(start_offset, Slice(_read_slice, to_read), 
bytes_read, io_ctx));
+        RETURN_IF_ERROR(_reader->read_at(start_offset, 
Slice(_read_slice->data(), to_read),
+                                         bytes_read, io_ctx));
         _statistics.merged_io++;
         _statistics.merged_bytes += *bytes_read;
     }
@@ -328,8 +331,8 @@ Status MergeRangeFileReader::_fill_box(int range_index, 
size_t start_offset, siz
 
     auto fill_box = [&](int16 fill_box_ref, uint32 box_usage, size_t 
box_copy_end) {
         size_t copy_size = std::min(box_copy_end - copy_start, BOX_SIZE - 
box_usage);
-        memcpy(_boxes[fill_box_ref] + box_usage, _read_slice + copy_start - 
start_offset,
-               copy_size);
+        memcpy(_boxes[fill_box_ref].data() + box_usage,
+               _read_slice->data() + copy_start - start_offset, copy_size);
         filled_boxes.emplace_back(fill_box_ref, box_usage, copy_start, 
copy_start + copy_size);
         copy_start += copy_size;
         _last_box_ref = fill_box_ref;
@@ -367,7 +370,7 @@ Status MergeRangeFileReader::_fill_box(int range_index, 
size_t start_offset, siz
         }
         // apply for new box to copy data
         while (copy_start < range_copy_end && _boxes.size() < NUM_BOX) {
-            _boxes.emplace_back(new char[BOX_SIZE]);
+            _boxes.emplace_back(BOX_SIZE);
             _box_ref.emplace_back(0);
             fill_box(_boxes.size() - 1, 0, range_copy_end);
         }
diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h
index 70c8445db23..907ea11b216 100644
--- a/be/src/io/fs/buffered_reader.h
+++ b/be/src/io/fs/buffered_reader.h
@@ -168,12 +168,7 @@ public:
         }
     }
 
-    ~MergeRangeFileReader() override {
-        delete[] _read_slice;
-        for (char* box : _boxes) {
-            delete[] box;
-        }
-    }
+    ~MergeRangeFileReader() override = default;
 
     Status close() override {
         if (!_closed) {
@@ -244,8 +239,8 @@ private:
     bool _closed = false;
     size_t _remaining;
 
-    char* _read_slice = nullptr;
-    std::vector<char*> _boxes;
+    std::unique_ptr<OwnedSlice> _read_slice;
+    std::vector<OwnedSlice> _boxes;
     int16 _last_box_ref = -1;
     uint32 _last_box_usage = 0;
     std::vector<int16> _box_ref;
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index f0f66774092..31973997059 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -86,14 +86,13 @@ void BroadcastPBlockHolderMemLimiter::release(const 
BroadcastPBlockHolder& holde
 } // namespace vectorized
 
 namespace pipeline {
-
 ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId 
dest_node_id, int send_id,
                                        PlanNodeId node_id, int be_number, 
RuntimeState* state,
                                        ExchangeSinkLocalState* parent)
         : HasTaskExecutionCtx(state),
           _queue_capacity(0),
           _is_finishing(false),
-          _query_id(query_id),
+          _query_id(std::move(query_id)),
           _dest_node_id(dest_node_id),
           _sender_id(send_id),
           _node_id(node_id),
@@ -111,12 +110,6 @@ void ExchangeSinkBuffer::close() {
     //_instance_to_request.clear();
 }
 
-void ExchangeSinkBuffer::_set_ready_to_finish(bool all_done) {
-    if (_finish_dependency && _should_stop && all_done) {
-        _finish_dependency->set_ready();
-    }
-}
-
 void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) {
     if (_is_finishing) {
         return;
@@ -136,7 +129,6 @@ void ExchangeSinkBuffer::register_sink(TUniqueId 
fragment_instance_id) {
     finst_id.set_hi(fragment_instance_id.hi);
     finst_id.set_lo(fragment_instance_id.lo);
     _rpc_channel_is_idle[low_id] = true;
-    _instance_to_rpc_ctx[low_id] = {};
     _instance_to_receiver_eof[low_id] = false;
     _instance_to_rpc_time[low_id] = 0;
     _construct_request(low_id, finst_id);
@@ -161,7 +153,6 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& 
request) {
         if (_rpc_channel_is_idle[ins_id]) {
             send_now = true;
             _rpc_channel_is_idle[ins_id] = false;
-            _busy_channels++;
         }
         if (request.block) {
             RETURN_IF_ERROR(
@@ -202,7 +193,6 @@ Status 
ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
         if (_rpc_channel_is_idle[ins_id]) {
             send_now = true;
             _rpc_channel_is_idle[ins_id] = false;
-            _busy_channels++;
         }
         if (request.block_holder->get_block()) {
             RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(
@@ -227,7 +217,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
             _instance_to_broadcast_package_queue[id];
 
     if (_is_finishing) {
-        _turn_off_channel(id);
+        _turn_off_channel(id, lock);
         return Status::OK();
     }
 
@@ -245,9 +235,6 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
         }
         auto send_callback = request.channel->get_send_callback(id, 
request.eos);
 
-        _instance_to_rpc_ctx[id]._send_callback = send_callback;
-        _instance_to_rpc_ctx[id].is_cancelled = false;
-
         
send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms);
         if (config::exchange_sink_ignore_eovercrowded) {
             send_callback->cntl_->ignore_eovercrowded();
@@ -326,12 +313,6 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
             
brpc_request->set_allocated_block(request.block_holder->get_block());
         }
         auto send_callback = request.channel->get_send_callback(id, 
request.eos);
-
-        ExchangeRpcContext rpc_ctx;
-        rpc_ctx._send_callback = send_callback;
-        rpc_ctx.is_cancelled = false;
-        _instance_to_rpc_ctx[id] = rpc_ctx;
-
         
send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms);
         if (config::exchange_sink_ignore_eovercrowded) {
             send_callback->cntl_->ignore_eovercrowded();
@@ -395,7 +376,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
         }
         broadcast_q.pop();
     } else {
-        _turn_off_channel(id);
+        _rpc_channel_is_idle[id] = true;
     }
 
     return Status::OK();
@@ -425,7 +406,7 @@ void ExchangeSinkBuffer::_ended(InstanceLoId id) {
         __builtin_unreachable();
     } else {
         std::unique_lock<std::mutex> 
lock(*_instance_to_package_queue_mutex[id]);
-        _turn_off_channel(id);
+        _turn_off_channel(id, lock);
     }
 }
 
@@ -434,14 +415,12 @@ void ExchangeSinkBuffer::_failed(InstanceLoId id, const 
std::string& err) {
               << ",_sender_id: " << _sender_id << ", node id: " << _node_id << 
", err: " << err;
     _is_finishing = true;
     _context->cancel(Status::Cancelled(err));
-    std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
-    _turn_off_channel(id, true);
 }
 
 void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) {
     std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
     _instance_to_receiver_eof[id] = true;
-    _turn_off_channel(id, true);
+    _turn_off_channel(id, lock);
     std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& 
broadcast_q =
             _instance_to_broadcast_package_queue[id];
     for (; !broadcast_q.empty(); broadcast_q.pop()) {
@@ -473,17 +452,17 @@ bool ExchangeSinkBuffer::_is_receiver_eof(InstanceLoId 
id) {
     return _instance_to_receiver_eof[id];
 }
 
-void ExchangeSinkBuffer::_turn_off_channel(InstanceLoId id, bool cleanup) {
+// The unused parameter `with_lock` is to ensure that the function is called 
when the lock is held.
+void ExchangeSinkBuffer::_turn_off_channel(InstanceLoId id,
+                                           std::unique_lock<std::mutex>& 
/*with_lock*/) {
     if (!_rpc_channel_is_idle[id]) {
         _rpc_channel_is_idle[id] = true;
-        auto all_done = _busy_channels.fetch_sub(1) == 1;
-        _set_ready_to_finish(all_done);
-        if (cleanup && all_done) {
-            auto weak_task_ctx = weak_task_exec_ctx();
-            if (auto pip_ctx = weak_task_ctx.lock()) {
-                _parent->set_reach_limit();
-            }
-        }
+    }
+    _instance_to_receiver_eof[id] = true;
+
+    auto weak_task_ctx = weak_task_exec_ctx();
+    if (auto pip_ctx = weak_task_ctx.lock()) {
+        _parent->on_channel_finished(id);
     }
 }
 
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h 
b/be/src/pipeline/exec/exchange_sink_buffer.h
index 60193b1d7e6..972e366c027 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -24,6 +24,7 @@
 #include <parallel_hashmap/phmap.h>
 
 #include <atomic>
+#include <cstdint>
 #include <list>
 #include <memory>
 #include <mutex>
@@ -50,7 +51,7 @@ class ExchangeSinkLocalState;
 } // namespace pipeline
 
 namespace vectorized {
-class PipChannel;
+class Channel;
 
 // We use BroadcastPBlockHolder to hold a broadcasted PBlock. For broadcast 
shuffle, one PBlock
 // will be shared between different channel, so we have to use a ref count to 
mark if this
@@ -110,14 +111,14 @@ private:
 
 namespace pipeline {
 struct TransmitInfo {
-    vectorized::PipChannel* channel = nullptr;
+    vectorized::Channel* channel = nullptr;
     std::unique_ptr<PBlock> block;
     bool eos;
     Status exec_status;
 };
 
 struct BroadcastTransmitInfo {
-    vectorized::PipChannel* channel = nullptr;
+    vectorized::Channel* channel = nullptr;
     std::shared_ptr<vectorized::BroadcastPBlockHolder> block_holder = nullptr;
     bool eos;
 };
@@ -177,11 +178,6 @@ private:
     bool _eos;
 };
 
-struct ExchangeRpcContext {
-    std::shared_ptr<ExchangeSendCallback<PTransmitDataResult>> _send_callback;
-    bool is_cancelled = false;
-};
-
 // Each ExchangeSinkOperator have one ExchangeSinkBuffer
 class ExchangeSinkBuffer final : public HasTaskExecutionCtx {
 public:
@@ -202,12 +198,10 @@ public:
         _finish_dependency = finish_dependency;
     }
 
-    void set_should_stop() {
-        _should_stop = true;
-        _set_ready_to_finish(_busy_channels == 0);
-    }
-
     void set_low_memory_mode() { _queue_capacity = 8; }
+    void set_broadcast_dependency(std::shared_ptr<Dependency> 
broadcast_dependency) {
+        _broadcast_dependency = broadcast_dependency;
+    }
 
 private:
     friend class ExchangeSinkLocalState;
@@ -230,11 +224,9 @@ private:
     phmap::flat_hash_map<InstanceLoId, std::shared_ptr<PTransmitDataParams>> 
_instance_to_request;
     // One channel is corresponding to a downstream instance.
     phmap::flat_hash_map<InstanceLoId, bool> _rpc_channel_is_idle;
-    // Number of busy channels;
-    std::atomic<int> _busy_channels = 0;
+
     phmap::flat_hash_map<InstanceLoId, bool> _instance_to_receiver_eof;
     phmap::flat_hash_map<InstanceLoId, int64_t> _instance_to_rpc_time;
-    phmap::flat_hash_map<InstanceLoId, ExchangeRpcContext> 
_instance_to_rpc_ctx;
 
     std::atomic<bool> _is_finishing;
     PUniqueId _query_id;
@@ -254,14 +246,14 @@ private:
     inline void _failed(InstanceLoId id, const std::string& err);
     inline void _set_receiver_eof(InstanceLoId id);
     inline bool _is_receiver_eof(InstanceLoId id);
-    inline void _turn_off_channel(InstanceLoId id, bool cleanup = false);
+    inline void _turn_off_channel(InstanceLoId id, 
std::unique_lock<std::mutex>& with_lock);
     void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time);
     int64_t get_sum_rpc_time();
 
     std::atomic<int> _total_queue_size = 0;
     std::shared_ptr<Dependency> _queue_dependency = nullptr;
     std::shared_ptr<Dependency> _finish_dependency = nullptr;
-    std::atomic<bool> _should_stop = false;
+    std::shared_ptr<Dependency> _broadcast_dependency = nullptr;
     ExchangeSinkLocalState* _parent = nullptr;
 };
 
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 3cfc5e45cfe..14290145664 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -23,6 +23,7 @@
 #include <gen_cpp/types.pb.h>
 
 #include <memory>
+#include <mutex>
 #include <random>
 
 #include "common/status.h"
@@ -31,6 +32,8 @@
 #include "pipeline/exec/operator.h"
 #include "pipeline/exec/sort_source_operator.h"
 #include "pipeline/local_exchange/local_exchange_sink_operator.h"
+#include "util/runtime_profile.h"
+#include "util/uid_util.h"
 #include "vec/columns/column_const.h"
 #include "vec/exprs/vexpr.h"
 
@@ -68,8 +71,10 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
     _rows_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "RowsProduced", 
TUnit::UNIT, 1);
     _overall_throughput = _profile->add_derived_counter(
             "OverallThroughput", TUnit::BYTES_PER_SECOND,
-            std::bind<int64_t>(&RuntimeProfile::units_per_second, 
_bytes_sent_counter,
-                               _profile->total_time_counter()),
+            [this]() {
+                return RuntimeProfile::units_per_second(_bytes_sent_counter,
+                                                        
_profile->total_time_counter());
+            },
             "");
     _merge_block_timer = ADD_TIMER(profile(), "MergeBlockTime");
     _local_bytes_send_counter = ADD_COUNTER(_profile, "LocalBytesSent", 
TUnit::BYTES);
@@ -84,15 +89,15 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
         const auto& fragment_instance_id = p._dests[i].fragment_instance_id;
         if (fragment_id_to_channel_index.find(fragment_instance_id.lo) ==
             fragment_id_to_channel_index.end()) {
-            channel_shared_ptrs.emplace_back(
-                    new vectorized::PipChannel(this, p._row_desc, 
p._dests[i].brpc_server,
-                                               fragment_instance_id, 
p._dest_node_id));
-            fragment_id_to_channel_index.emplace(fragment_instance_id.lo,
-                                                 channel_shared_ptrs.size() - 
1);
-            channels.push_back(channel_shared_ptrs.back().get());
+            channels.push_back(std::make_shared<vectorized::Channel>(
+                    this, p._dests[i].brpc_server, fragment_instance_id, 
p._dest_node_id));
+            fragment_id_to_channel_index.emplace(fragment_instance_id.lo, 
channels.size() - 1);
+
+            if (fragment_instance_id.hi != -1 && fragment_instance_id.lo != 
-1) {
+                _working_channels_count++;
+            }
         } else {
-            channel_shared_ptrs.emplace_back(
-                    
channel_shared_ptrs[fragment_id_to_channel_index[fragment_instance_id.lo]]);
+            
channels.emplace_back(channels[fragment_id_to_channel_index[fragment_instance_id.lo]]);
         }
     }
 
@@ -106,6 +111,24 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
     return Status::OK();
 }
 
+void ExchangeSinkLocalState::on_channel_finished(InstanceLoId channel_id) {
+    std::lock_guard<std::mutex> lock(_finished_channels_mutex);
+
+    if (_finished_channels.contains(channel_id)) {
+        LOG(WARNING) << "query: " << print_id(_state->query_id())
+                     << ", on_channel_finished on already finished channel: " 
<< channel_id;
+        return;
+    } else {
+        _finished_channels.emplace(channel_id);
+        if (_working_channels_count.fetch_sub(1) == 1) {
+            set_reach_limit();
+            if (_finish_dependency) {
+                _finish_dependency->set_ready();
+            }
+        }
+    }
+}
+
 Status ExchangeSinkLocalState::open(RuntimeState* state) {
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
@@ -140,7 +163,6 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
         _queue_dependency = Dependency::create_shared(_parent->operator_id(), 
_parent->node_id(),
                                                       
"ExchangeSinkQueueDependency", true);
         _sink_buffer->set_dependency(_queue_dependency, _finish_dependency);
-        _finish_dependency->block();
     }
 
     if ((_part_type == TPartitionType::UNPARTITIONED || channels.size() == 1) 
&&
@@ -151,7 +173,7 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
                 
vectorized::BroadcastPBlockHolderMemLimiter::create_shared(_broadcast_dependency);
     } else if (local_size > 0) {
         size_t dep_id = 0;
-        for (auto* channel : channels) {
+        for (auto& channel : channels) {
             if (channel->is_local()) {
                 if (auto dep = channel->get_local_channel_dependency()) {
                     _local_channels_dependency.push_back(dep);
@@ -166,16 +188,18 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
     }
     if (_part_type == TPartitionType::HASH_PARTITIONED) {
         _partition_count = channels.size();
-        _partitioner.reset(new 
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
-                channels.size()));
+        _partitioner =
+                
std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>(
+                        channels.size());
         RETURN_IF_ERROR(_partitioner->init(p._texprs));
         RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
         _profile->add_info_string("Partitioner",
                                   fmt::format("Crc32HashPartitioner({})", 
_partition_count));
     } else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
-        _partition_count = channel_shared_ptrs.size();
-        _partitioner.reset(new 
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
-                channel_shared_ptrs.size()));
+        _partition_count = channels.size();
+        _partitioner =
+                
std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>(
+                        channels.size());
         RETURN_IF_ERROR(_partitioner->init(p._texprs));
         RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
         _profile->add_info_string("Partitioner",
@@ -221,12 +245,13 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
     } else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
         _partition_count =
                 channels.size() * 
config::table_sink_partition_write_max_partition_nums_per_writer;
-        _partitioner.reset(new 
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
-                _partition_count));
-        _partition_function.reset(new 
HashPartitionFunction(_partitioner.get()));
+        _partitioner =
+                
std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>(
+                        _partition_count);
+        _partition_function = 
std::make_unique<HashPartitionFunction>(_partitioner.get());
 
-        scale_writer_partitioning_exchanger.reset(new 
vectorized::ScaleWriterPartitioningExchanger<
-                                                  HashPartitionFunction>(
+        scale_writer_partitioning_exchanger = std::make_unique<
+                
vectorized::ScaleWriterPartitioningExchanger<HashPartitionFunction>>(
                 channels.size(), *_partition_function, _partition_count, 
channels.size(), 1,
                 
config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold
 /
                                         state->task_num() ==
@@ -239,7 +264,7 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
                                 0
                         ? 
config::table_sink_partition_write_min_data_processed_rebalance_threshold
                         : 
config::table_sink_partition_write_min_data_processed_rebalance_threshold /
-                                  state->task_num()));
+                                  state->task_num());
 
         RETURN_IF_ERROR(_partitioner->init(p._texprs));
         RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
@@ -358,7 +383,7 @@ void 
ExchangeSinkOperatorX::_handle_eof_channel(RuntimeState* state, ChannelPtrT
                                                 Status st) {
     channel->set_receiver_eof(st);
     // Chanel will not send RPC to the downstream when eof, so close chanel by 
OK status.
-    static_cast<void>(channel->close(state, Status::OK()));
+    static_cast<void>(channel->close(state));
 }
 
 Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* 
block, bool eos) {
@@ -367,7 +392,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
     COUNTER_UPDATE(local_state.rows_sent_counter(), (int64_t)block->rows());
     SCOPED_TIMER(local_state.exec_time_counter());
     bool all_receiver_eof = true;
-    for (auto* channel : local_state.channels) {
+    for (auto& channel : local_state.channels) {
         if (!channel->is_receiver_eof()) {
             all_receiver_eof = false;
             break;
@@ -394,13 +419,13 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
             if (!block->empty()) {
                 Status status;
                 size_t idx = 0;
-                for (auto* channel : local_state.channels) {
+                for (auto& channel : local_state.channels) {
                     if (!channel->is_receiver_eof()) {
                         // If this channel is the last, we can move this block 
to downstream pipeline.
                         // Otherwise, this block also need to be broadcasted 
to other channels so should be copied.
                         DCHECK_GE(local_state._last_local_channel_idx, 0);
                         status = channel->send_local_block(
-                                block, idx == 
local_state._last_local_channel_idx);
+                                block, eos, idx == 
local_state._last_local_channel_idx);
                         HANDLE_CHANNEL_STATUS(state, channel, status);
                     }
                     idx++;
@@ -430,7 +455,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
 
                     size_t idx = 0;
                     bool moved = false;
-                    for (auto* channel : local_state.channels) {
+                    for (auto& channel : local_state.channels) {
                         if (!channel->is_receiver_eof()) {
                             Status status;
                             if (channel->is_local()) {
@@ -438,7 +463,8 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
                                 // Otherwise, this block also need to be 
broadcasted to other channels so should be copied.
                                 DCHECK_GE(local_state._last_local_channel_idx, 
0);
                                 status = channel->send_local_block(
-                                        &cur_block, idx == 
local_state._last_local_channel_idx);
+                                        &cur_block, eos,
+                                        idx == 
local_state._last_local_channel_idx);
                                 moved = idx == 
local_state._last_local_channel_idx;
                             } else {
                                 status = 
channel->send_broadcast_block(block_holder, eos);
@@ -459,20 +485,17 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
         }
     } else if (_part_type == TPartitionType::RANDOM) {
         // 1. select channel
-        vectorized::PipChannel* current_channel =
-                local_state.channels[local_state.current_channel_idx];
+        auto& current_channel = 
local_state.channels[local_state.current_channel_idx];
         if (!current_channel->is_receiver_eof()) {
             // 2. serialize, send and rollover block
             if (current_channel->is_local()) {
-                auto status = current_channel->send_local_block(block, true);
+                auto status = current_channel->send_local_block(block, eos, 
true);
                 HANDLE_CHANNEL_STATUS(state, current_channel, status);
             } else {
-                RETURN_IF_ERROR(local_state._serializer.serialize_block(
-                        block, current_channel->ch_cur_pb_block()));
-                auto status =
-                        
current_channel->send_remote_block(current_channel->ch_cur_pb_block(), eos);
+                auto pblock = std::make_unique<PBlock>();
+                RETURN_IF_ERROR(local_state._serializer.serialize_block(block, 
pblock.get()));
+                auto status = 
current_channel->send_remote_block(std::move(pblock), eos);
                 HANDLE_CHANNEL_STATUS(state, current_channel, status);
-                current_channel->ch_roll_pb_block();
             }
         }
         local_state.current_channel_idx =
@@ -494,7 +517,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
                     
local_state._partitioner->get_channel_ids().get<uint32_t>(), rows, block, eos));
         } else {
             RETURN_IF_ERROR(channel_add_rows(
-                    state, local_state.channel_shared_ptrs, 
local_state._partition_count,
+                    state, local_state.channels, local_state._partition_count,
                     
local_state._partitioner->get_channel_ids().get<uint32_t>(), rows, block, eos));
         }
         int64_t new_channel_mem_usage = 0;
@@ -578,20 +601,17 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
     } else if (_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
         // Control the number of channels according to the flow, thereby 
controlling the number of table sink writers.
         // 1. select channel
-        vectorized::PipChannel* current_channel =
-                local_state.channels[local_state.current_channel_idx];
+        auto& current_channel = 
local_state.channels[local_state.current_channel_idx];
         if (!current_channel->is_receiver_eof()) {
             // 2. serialize, send and rollover block
             if (current_channel->is_local()) {
-                auto status = current_channel->send_local_block(block, true);
+                auto status = current_channel->send_local_block(block, eos, 
true);
                 HANDLE_CHANNEL_STATUS(state, current_channel, status);
             } else {
-                RETURN_IF_ERROR(local_state._serializer.serialize_block(
-                        block, current_channel->ch_cur_pb_block()));
-                auto status =
-                        
current_channel->send_remote_block(current_channel->ch_cur_pb_block(), eos);
+                auto pblock = std::make_unique<PBlock>();
+                RETURN_IF_ERROR(local_state._serializer.serialize_block(block, 
pblock.get()));
+                auto status = 
current_channel->send_remote_block(std::move(pblock), eos);
                 HANDLE_CHANNEL_STATUS(state, current_channel, status);
-                current_channel->ch_roll_pb_block();
             }
             _data_processed += block->bytes();
         }
@@ -613,15 +633,12 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
     Status final_st = Status::OK();
     if (eos) {
         local_state._serializer.reset_block();
-        for (int i = 0; i < local_state.channels.size(); ++i) {
-            Status st = local_state.channels[i]->close(state, Status::OK());
+        for (auto& channel : local_state.channels) {
+            Status st = channel->close(state);
             if (!st.ok() && final_st.ok()) {
                 final_st = st;
             }
         }
-        if (local_state._sink_buffer) {
-            local_state._sink_buffer->set_should_stop();
-        }
     }
     return final_st;
 }
@@ -645,8 +662,8 @@ Status 
ExchangeSinkOperatorX::serialize_block(ExchangeSinkLocalState& state, vec
 }
 
 void ExchangeSinkLocalState::register_channels(pipeline::ExchangeSinkBuffer* 
buffer) {
-    for (auto channel : channels) {
-        ((vectorized::PipChannel*)channel)->register_exchange_buffer(buffer);
+    for (auto& channel : channels) {
+        channel->register_exchange_buffer(buffer);
     }
 }
 
@@ -693,12 +710,12 @@ std::string ExchangeSinkLocalState::debug_string(int 
indentation_level) const {
     fmt::memory_buffer debug_string_buffer;
     fmt::format_to(debug_string_buffer, "{}", 
Base::debug_string(indentation_level));
     if (_sink_buffer) {
-        fmt::format_to(
-                debug_string_buffer,
-                ", Sink Buffer: (_should_stop = {}, _busy_channels = {}, 
_is_finishing = {}), "
-                "_reach_limit: {}",
-                _sink_buffer->_should_stop.load(), 
_sink_buffer->_busy_channels.load(),
-                _sink_buffer->_is_finishing.load(), _reach_limit.load());
+        fmt::format_to(debug_string_buffer,
+                       ", Sink Buffer: (_is_finishing = {}, blocks in queue: 
{}, queue capacity: "
+                       "{}, queue dep: {}), _reach_limit: {}, working 
channels: {}",
+                       _sink_buffer->_is_finishing.load(), 
_sink_buffer->_total_queue_size,
+                       _sink_buffer->_queue_capacity, 
(void*)_sink_buffer->_queue_dependency.get(),
+                       _reach_limit.load(), _working_channels_count.load());
     }
     return fmt::to_string(debug_string_buffer);
 }
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index a34237145a7..8af944728a2 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -19,7 +19,9 @@
 
 #include <stdint.h>
 
+#include <atomic>
 #include <memory>
+#include <mutex>
 
 #include "common/status.h"
 #include "exchange_sink_buffer.h"
@@ -53,13 +55,10 @@ private:
 
 public:
     ExchangeSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
-            : Base(parent, state),
-              current_channel_idx(0),
-              only_local_exchange(false),
-              _serializer(this) {
+            : Base(parent, state), _serializer(this) {
         _finish_dependency =
                 std::make_shared<Dependency>(parent->operator_id(), 
parent->node_id(),
-                                             parent->get_name() + 
"_FINISH_DEPENDENCY", true);
+                                             parent->get_name() + 
"_FINISH_DEPENDENCY", false);
     }
 
     std::vector<Dependency*> dependencies() const override {
@@ -112,10 +111,11 @@ public:
         return Status::OK();
     }
     Status _send_new_partition_batch();
-    std::vector<vectorized::PipChannel*> channels;
-    std::vector<std::shared_ptr<vectorized::PipChannel>> channel_shared_ptrs;
-    int current_channel_idx; // index of current channel to send to if _random 
== true
-    bool only_local_exchange;
+    std::vector<std::shared_ptr<vectorized::Channel>> channels;
+    int current_channel_idx {0}; // index of current channel to send to if 
_random == true
+    bool only_local_exchange {false};
+
+    void on_channel_finished(InstanceLoId channel_id);
 
     // for external table sink hash partition
     
std::unique_ptr<vectorized::ScaleWriterPartitioningExchanger<HashPartitionFunction>>
@@ -123,9 +123,8 @@ public:
 
 private:
     friend class ExchangeSinkOperatorX;
-    friend class vectorized::Channel<ExchangeSinkLocalState>;
-    friend class vectorized::PipChannel;
-    friend class vectorized::BlockSerializer<ExchangeSinkLocalState>;
+    friend class vectorized::Channel;
+    friend class vectorized::BlockSerializer;
 
     std::unique_ptr<ExchangeSinkBuffer> _sink_buffer = nullptr;
     RuntimeProfile::Counter* _serialize_batch_timer = nullptr;
@@ -154,7 +153,7 @@ private:
     int _sender_id;
     std::shared_ptr<vectorized::BroadcastPBlockHolderMemLimiter> 
_broadcast_pb_mem_limiter;
 
-    vectorized::BlockSerializer<ExchangeSinkLocalState> _serializer;
+    vectorized::BlockSerializer _serializer;
 
     std::shared_ptr<Dependency> _queue_dependency = nullptr;
     std::shared_ptr<Dependency> _broadcast_dependency = nullptr;
@@ -203,6 +202,10 @@ private:
     std::unique_ptr<HashPartitionFunction> _partition_function = nullptr;
     std::atomic<bool> _reach_limit = false;
     int _last_local_channel_idx = -1;
+
+    std::atomic_int _working_channels_count = 0;
+    std::set<InstanceLoId> _finished_channels;
+    std::mutex _finished_channels_mutex;
 };
 
 class ExchangeSinkOperatorX final : public 
DataSinkOperatorX<ExchangeSinkLocalState> {
diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp 
b/be/src/pipeline/exec/result_file_sink_operator.cpp
index a11c4df6625..93026427b86 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -31,9 +31,7 @@ namespace doris::pipeline {
 
 ResultFileSinkLocalState::ResultFileSinkLocalState(DataSinkOperatorXBase* 
parent,
                                                    RuntimeState* state)
-        : AsyncWriterSink<vectorized::VFileResultWriter, 
ResultFileSinkOperatorX>(parent, state),
-          _serializer(
-                  
std::make_unique<vectorized::BlockSerializer<ResultFileSinkLocalState>>(this)) 
{}
+        : AsyncWriterSink<vectorized::VFileResultWriter, 
ResultFileSinkOperatorX>(parent, state) {}
 
 ResultFileSinkOperatorX::ResultFileSinkOperatorX(int operator_id, const 
RowDescriptor& row_desc,
                                                  const std::vector<TExpr>& 
t_output_expr)
@@ -145,14 +143,6 @@ Status ResultFileSinkLocalState::close(RuntimeState* 
state, Status exec_status)
     return Base::close(state, exec_status);
 }
 
-template <typename ChannelPtrType>
-void ResultFileSinkLocalState::_handle_eof_channel(RuntimeState* state, 
ChannelPtrType channel,
-                                                   Status st) {
-    channel->set_receiver_eof(st);
-    // Chanel will not send RPC to the downstream when eof, so close chanel by 
OK status.
-    static_cast<void>(channel->close(state, Status::OK()));
-}
-
 Status ResultFileSinkOperatorX::sink(RuntimeState* state, vectorized::Block* 
in_block, bool eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
diff --git a/be/src/pipeline/exec/result_file_sink_operator.h 
b/be/src/pipeline/exec/result_file_sink_operator.h
index e99eb709a9f..7268efe4de4 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.h
+++ b/be/src/pipeline/exec/result_file_sink_operator.h
@@ -21,10 +21,6 @@
 #include "vec/sink/writer/vfile_result_writer.h"
 
 namespace doris::vectorized {
-template <typename Parent>
-class BlockSerializer;
-template <typename Parent>
-class Channel;
 class BroadcastPBlockHolder;
 } // namespace doris::vectorized
 
@@ -55,13 +51,8 @@ public:
 private:
     friend class ResultFileSinkOperatorX;
 
-    template <typename ChannelPtrType>
-    void _handle_eof_channel(RuntimeState* state, ChannelPtrType channel, 
Status st);
-
     std::shared_ptr<BufferControlBlock> _sender;
 
-    std::vector<vectorized::Channel<ResultFileSinkLocalState>*> _channels;
-    std::unique_ptr<vectorized::BlockSerializer<ResultFileSinkLocalState>> 
_serializer;
     std::shared_ptr<vectorized::BroadcastPBlockHolder> _block_holder;
     RuntimeProfile::Counter* _brpc_wait_timer = nullptr;
     RuntimeProfile::Counter* _local_send_timer = nullptr;
diff --git a/be/src/util/slice.h b/be/src/util/slice.h
index b38b1147894..fd6bcf0adfb 100644
--- a/be/src/util/slice.h
+++ b/be/src/util/slice.h
@@ -344,6 +344,10 @@ class OwnedSlice : private Allocator<false, false, false, 
DefaultMemoryAllocator
 public:
     OwnedSlice() : _slice((uint8_t*)nullptr, 0) {}
 
+    OwnedSlice(size_t length)
+            : _slice(reinterpret_cast<char*>(Allocator::alloc(length)), 
length),
+              _capacity(length) {}
+
     OwnedSlice(OwnedSlice&& src) : _slice(src._slice), 
_capacity(src._capacity) {
         src._slice.data = nullptr;
         src._slice.size = 0;
@@ -369,6 +373,8 @@ public:
         }
     }
 
+    char* data() const { return _slice.data; }
+
     const Slice& slice() const { return _slice; }
 
 private:
diff --git a/be/src/vec/runtime/partitioner.cpp 
b/be/src/vec/runtime/partitioner.cpp
index 89656a74508..660ffe51a83 100644
--- a/be/src/vec/runtime/partitioner.cpp
+++ b/be/src/vec/runtime/partitioner.cpp
@@ -24,9 +24,8 @@
 
 namespace doris::vectorized {
 
-template <typename HashValueType, typename ChannelIds>
-Status Partitioner<HashValueType, ChannelIds>::do_partitioning(RuntimeState* 
state,
-                                                               Block* block) 
const {
+template <typename ChannelIds>
+Status Crc32HashPartitioner<ChannelIds>::do_partitioning(RuntimeState* state, 
Block* block) const {
     int rows = block->rows();
 
     if (rows > 0) {
@@ -55,47 +54,23 @@ Status Partitioner<HashValueType, 
ChannelIds>::do_partitioning(RuntimeState* sta
 template <typename ChannelIds>
 void Crc32HashPartitioner<ChannelIds>::_do_hash(const ColumnPtr& column,
                                                 uint32_t* __restrict result, 
int idx) const {
-    column->update_crcs_with_value(result, 
Base::_partition_expr_ctxs[idx]->root()->type().type,
+    column->update_crcs_with_value(result, 
_partition_expr_ctxs[idx]->root()->type().type,
                                    column->size());
 }
 
-template <typename ChannelIds>
-void XXHashPartitioner<ChannelIds>::_do_hash(const ColumnPtr& column, 
uint64_t* __restrict result,
-                                             int /*idx*/) const {
-    column->update_hashes_with_value(result);
-}
-
-template <typename ChannelIds>
-Status XXHashPartitioner<ChannelIds>::clone(RuntimeState* state,
-                                            std::unique_ptr<PartitionerBase>& 
partitioner) {
-    auto* new_partitioner = new XXHashPartitioner(Base::_partition_count);
-    partitioner.reset(new_partitioner);
-    
new_partitioner->_partition_expr_ctxs.resize(Base::_partition_expr_ctxs.size());
-    for (size_t i = 0; i < Base::_partition_expr_ctxs.size(); i++) {
-        RETURN_IF_ERROR(Base::_partition_expr_ctxs[i]->clone(
-                state, new_partitioner->_partition_expr_ctxs[i]));
-    }
-    return Status::OK();
-}
-
 template <typename ChannelIds>
 Status Crc32HashPartitioner<ChannelIds>::clone(RuntimeState* state,
                                                
std::unique_ptr<PartitionerBase>& partitioner) {
-    auto* new_partitioner = new Crc32HashPartitioner(Base::_partition_count);
+    auto* new_partitioner = new 
Crc32HashPartitioner<ChannelIds>(_partition_count);
     partitioner.reset(new_partitioner);
-    
new_partitioner->_partition_expr_ctxs.resize(Base::_partition_expr_ctxs.size());
-    for (size_t i = 0; i < Base::_partition_expr_ctxs.size(); i++) {
-        RETURN_IF_ERROR(Base::_partition_expr_ctxs[i]->clone(
-                state, new_partitioner->_partition_expr_ctxs[i]));
+    new_partitioner->_partition_expr_ctxs.resize(_partition_expr_ctxs.size());
+    for (size_t i = 0; i < _partition_expr_ctxs.size(); i++) {
+        RETURN_IF_ERROR(
+                _partition_expr_ctxs[i]->clone(state, 
new_partitioner->_partition_expr_ctxs[i]));
     }
     return Status::OK();
 }
 
-template class Partitioner<size_t, pipeline::LocalExchangeChannelIds>;
-template class XXHashPartitioner<pipeline::LocalExchangeChannelIds>;
-template class Partitioner<size_t, ShuffleChannelIds>;
-template class XXHashPartitioner<ShuffleChannelIds>;
-template class Partitioner<uint32_t, ShuffleChannelIds>;
 template class Crc32HashPartitioner<ShuffleChannelIds>;
 template class Crc32HashPartitioner<SpillPartitionChannelIds>;
 
diff --git a/be/src/vec/runtime/partitioner.h b/be/src/vec/runtime/partitioner.h
index 5607a83327b..e8feb74335a 100644
--- a/be/src/vec/runtime/partitioner.h
+++ b/be/src/vec/runtime/partitioner.h
@@ -58,11 +58,11 @@ protected:
     const size_t _partition_count;
 };
 
-template <typename HashValueType, typename ChannelIds>
-class Partitioner : public PartitionerBase {
+template <typename ChannelIds>
+class Crc32HashPartitioner : public PartitionerBase {
 public:
-    Partitioner(int partition_count) : PartitionerBase(partition_count) {}
-    ~Partitioner() override = default;
+    Crc32HashPartitioner(int partition_count) : 
PartitionerBase(partition_count) {}
+    ~Crc32HashPartitioner() override = default;
 
     Status init(const std::vector<TExpr>& texprs) override {
         return VExpr::create_expr_trees(texprs, _partition_expr_ctxs);
@@ -76,9 +76,9 @@ public:
 
     Status do_partitioning(RuntimeState* state, Block* block) const override;
 
-    ChannelField get_channel_ids() const override {
-        return {_hash_vals.data(), sizeof(HashValueType)};
-    }
+    ChannelField get_channel_ids() const override { return {_hash_vals.data(), 
sizeof(uint32_t)}; }
+
+    Status clone(RuntimeState* state, std::unique_ptr<PartitionerBase>& 
partitioner) override;
 
 protected:
     Status _get_partition_column_result(Block* block, std::vector<int>& 
result) const {
@@ -89,38 +89,17 @@ protected:
         return Status::OK();
     }
 
-    virtual void _do_hash(const ColumnPtr& column, HashValueType* __restrict 
result,
-                          int idx) const = 0;
+    void _do_hash(const ColumnPtr& column, uint32_t* __restrict result, int 
idx) const;
 
     VExprContextSPtrs _partition_expr_ctxs;
-    mutable std::vector<HashValueType> _hash_vals;
+    mutable std::vector<uint32_t> _hash_vals;
 };
 
-template <typename ChannelIds>
-class XXHashPartitioner final : public Partitioner<uint64_t, ChannelIds> {
-public:
-    using Base = Partitioner<uint64_t, ChannelIds>;
-    XXHashPartitioner(int partition_count) : Partitioner<uint64_t, 
ChannelIds>(partition_count) {}
-    ~XXHashPartitioner() override = default;
-
-    Status clone(RuntimeState* state, std::unique_ptr<PartitionerBase>& 
partitioner) override;
-
-private:
-    void _do_hash(const ColumnPtr& column, uint64_t* __restrict result, int 
idx) const override;
-};
-
-template <typename ChannelIds>
-class Crc32HashPartitioner final : public Partitioner<uint32_t, ChannelIds> {
-public:
-    using Base = Partitioner<uint32_t, ChannelIds>;
-    Crc32HashPartitioner(int partition_count)
-            : Partitioner<uint32_t, ChannelIds>(partition_count) {}
-    ~Crc32HashPartitioner() override = default;
-
-    Status clone(RuntimeState* state, std::unique_ptr<PartitionerBase>& 
partitioner) override;
-
-private:
-    void _do_hash(const ColumnPtr& column, uint32_t* __restrict result, int 
idx) const override;
+struct ShuffleChannelIds {
+    template <typename HashValueType>
+    HashValueType operator()(HashValueType l, size_t r) {
+        return l % r;
+    }
 };
 
 struct SpillPartitionChannelIds {
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index 9c9a11d60aa..9e437afa8e7 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -54,8 +54,7 @@
 
 namespace doris::vectorized {
 
-template <typename Parent>
-Status Channel<Parent>::init(RuntimeState* state) {
+Status Channel::init(RuntimeState* state) {
     if (_brpc_dest_addr.hostname.empty()) {
         LOG(WARNING) << "there is no brpc destination address's hostname"
                         ", maybe version is not compatible.";
@@ -64,9 +63,11 @@ Status Channel<Parent>::init(RuntimeState* state) {
     if (state->query_options().__isset.enable_local_exchange) {
         _is_local &= state->query_options().enable_local_exchange;
     }
+
     if (_is_local) {
         return Status::OK();
     }
+
     if (_brpc_dest_addr.hostname == BackendOptions::get_localhost()) {
         _brpc_stub = 
state->exec_env()->brpc_internal_client_cache()->get_client(
                 "127.0.0.1", _brpc_dest_addr.port);
@@ -83,8 +84,7 @@ Status Channel<Parent>::init(RuntimeState* state) {
     return Status::OK();
 }
 
-template <typename Parent>
-Status Channel<Parent>::open(RuntimeState* state) {
+Status Channel::open(RuntimeState* state) {
     if (_is_local) {
         auto st = _parent->state()->exec_env()->vstream_mgr()->find_recvr(
                 _fragment_instance_id, _dest_node_id, &_local_recvr);
@@ -94,19 +94,6 @@ Status Channel<Parent>::open(RuntimeState* state) {
         }
     }
     _be_number = state->be_number();
-    _brpc_request = std::make_shared<PTransmitDataParams>();
-    // initialize brpc request
-    _brpc_request->mutable_finst_id()->set_hi(_fragment_instance_id.hi);
-    _brpc_request->mutable_finst_id()->set_lo(_fragment_instance_id.lo);
-    _finst_id = _brpc_request->finst_id();
-
-    _brpc_request->mutable_query_id()->set_hi(state->query_id().hi);
-    _brpc_request->mutable_query_id()->set_lo(state->query_id().lo);
-    _query_id = _brpc_request->query_id();
-
-    _brpc_request->set_node_id(_dest_node_id);
-    _brpc_request->set_sender_id(_parent->sender_id());
-    _brpc_request->set_be_number(_be_number);
 
     const auto& query_options = state->query_options();
     if (query_options.__isset.query_timeout) {
@@ -121,28 +108,26 @@ Status Channel<Parent>::open(RuntimeState* state) {
     // to build a camouflaged empty channel. the ip and port is '0.0.0.0:0"
     // so the empty channel not need call function close_internal()
     _need_close = (_fragment_instance_id.hi != -1 && _fragment_instance_id.lo 
!= -1);
+
     _state = state;
     return Status::OK();
 }
 
-std::shared_ptr<pipeline::Dependency> 
PipChannel::get_local_channel_dependency() {
-    if (!Channel<pipeline::ExchangeSinkLocalState>::_local_recvr) {
+std::shared_ptr<pipeline::Dependency> Channel::get_local_channel_dependency() {
+    if (!_local_recvr) {
         return nullptr;
     }
-    return 
Channel<pipeline::ExchangeSinkLocalState>::_local_recvr->get_local_channel_dependency(
-            Channel<pipeline::ExchangeSinkLocalState>::_parent->sender_id());
+    return _local_recvr->get_local_channel_dependency(_parent->sender_id());
 }
 
-int64_t PipChannel::mem_usage() const {
-    auto* mutable_block = 
Channel<pipeline::ExchangeSinkLocalState>::_serializer.get_block();
+int64_t Channel::mem_usage() const {
+    auto* mutable_block = _serializer.get_block();
     int64_t mem_usage = mutable_block ? mutable_block->allocated_bytes() : 0;
     return mem_usage;
 }
 
-Status PipChannel::send_remote_block(PBlock* block, bool eos, Status 
exec_status) {
-    
COUNTER_UPDATE(Channel<pipeline::ExchangeSinkLocalState>::_parent->blocks_sent_counter(),
 1);
-    std::unique_ptr<PBlock> pblock_ptr;
-    pblock_ptr.reset(block);
+Status Channel::send_remote_block(std::unique_ptr<PBlock>&& block, bool eos) {
+    COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
 
     if (eos) {
         if (_eos_send) {
@@ -152,13 +137,13 @@ Status PipChannel::send_remote_block(PBlock* block, bool 
eos, Status exec_status
         }
     }
     if (eos || block->column_metas_size()) {
-        RETURN_IF_ERROR(_buffer->add_block({this, std::move(pblock_ptr), eos, 
exec_status}));
+        RETURN_IF_ERROR(_buffer->add_block({this, std::move(block), eos, 
Status::OK()}));
     }
     return Status::OK();
 }
 
-Status 
PipChannel::send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block, 
bool eos) {
-    
COUNTER_UPDATE(Channel<pipeline::ExchangeSinkLocalState>::_parent->blocks_sent_counter(),
 1);
+Status Channel::send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& 
block, bool eos) {
+    COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
     if (eos) {
         if (_eos_send) {
             return Status::OK();
@@ -171,210 +156,88 @@ Status 
PipChannel::send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>&
     return Status::OK();
 }
 
-Status PipChannel::send_current_block(bool eos, Status exec_status) {
-    if (Channel<pipeline::ExchangeSinkLocalState>::is_local()) {
-        return 
Channel<pipeline::ExchangeSinkLocalState>::send_local_block(exec_status, eos);
+Status Channel::_send_current_block(bool eos) {
+    if (is_local()) {
+        return _send_local_block(eos);
     }
-    RETURN_IF_ERROR(send_remote_block(_pblock.release(), eos, exec_status));
-    return Status::OK();
+    return send_remote_block(std::move(_pblock), eos);
 }
 
-template <typename Parent>
-Status Channel<Parent>::send_current_block(bool eos, Status exec_status) {
-    // FIXME: Now, local exchange will cause the performance problem is in a 
multi-threaded scenario
-    // so this feature is turned off here by default. We need to re-examine 
this logic
-    if (is_local()) {
-        return send_local_block(exec_status, eos);
+Status Channel::_send_local_block(bool eos) {
+    Block block;
+    if (_serializer.get_block() != nullptr) {
+        block = _serializer.get_block()->to_block();
+        
_serializer.get_block()->set_mutable_columns(block.clone_empty_columns());
     }
-    if (eos) {
-        RETURN_IF_ERROR(_serializer.serialize_block(_ch_cur_pb_block, 1));
+
+    if (!block.empty() || eos) {
+        RETURN_IF_ERROR(send_local_block(&block, eos, true));
     }
-    RETURN_IF_ERROR(send_remote_block(_ch_cur_pb_block, eos, exec_status));
-    ch_roll_pb_block();
     return Status::OK();
 }
 
-template <typename Parent>
-Status Channel<Parent>::send_local_block(Status exec_status, bool eos) {
+Status Channel::send_local_block(Block* block, bool eos, bool can_be_moved) {
     SCOPED_TIMER(_parent->local_send_timer());
-    Block block = _serializer.get_block()->to_block();
-    _serializer.get_block()->set_mutable_columns(block.clone_empty_columns());
-    if (_recvr_is_valid()) {
-        if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, 
Parent>) {
-            COUNTER_UPDATE(_parent->local_bytes_send_counter(), block.bytes());
-            COUNTER_UPDATE(_parent->local_sent_rows(), block.rows());
-            COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
-        }
 
-        _local_recvr->add_block(&block, _parent->sender_id(), true);
-        if (eos) {
-            _local_recvr->remove_sender(_parent->sender_id(), _be_number, 
exec_status);
+    if (eos) {
+        if (_eos_send) {
+            return Status::OK();
+        } else {
+            _eos_send = true;
         }
-        return Status::OK();
-    } else {
-        _serializer.reset_block();
-        return _receiver_status;
     }
-}
 
-template <typename Parent>
-Status Channel<Parent>::send_local_block(Block* block, bool can_be_moved) {
-    SCOPED_TIMER(_parent->local_send_timer());
-    if (_recvr_is_valid()) {
-        if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, 
Parent>) {
-            COUNTER_UPDATE(_parent->local_bytes_send_counter(), 
block->bytes());
-            COUNTER_UPDATE(_parent->local_sent_rows(), block->rows());
-            COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
-        }
-        _local_recvr->add_block(block, _parent->sender_id(), can_be_moved);
-        return Status::OK();
-    } else {
+    if (is_receiver_eof()) {
         return _receiver_status;
     }
-}
 
-template <typename Parent>
-Status Channel<Parent>::send_remote_block(PBlock* block, bool eos, Status 
exec_status) {
-    if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) 
{
+    auto receiver_status = _recvr_status();
+    if (receiver_status.ok()) {
+        COUNTER_UPDATE(_parent->local_bytes_send_counter(), block->bytes());
+        COUNTER_UPDATE(_parent->local_sent_rows(), block->rows());
         COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
-    }
-    SCOPED_TIMER(_parent->brpc_send_timer());
 
-    if (_send_remote_block_callback == nullptr) {
-        _send_remote_block_callback = 
DummyBrpcCallback<PTransmitDataResult>::create_shared();
-    } else {
-        RETURN_IF_ERROR(_wait_last_brpc());
-        _send_remote_block_callback->cntl_->Reset();
-    }
-    VLOG_ROW << "Channel<Parent>::send_batch() instance_id=" << 
print_id(_fragment_instance_id)
-             << " dest_node=" << _dest_node_id << " to_host=" << 
_brpc_dest_addr.hostname
-             << " _packet_seq=" << _packet_seq << " row_desc=" << 
_row_desc.debug_string();
-
-    _brpc_request->set_eos(eos);
-    if (!exec_status.ok()) {
-        exec_status.to_protobuf(_brpc_request->mutable_exec_status());
-    }
-    if (block != nullptr && !block->column_metas().empty()) {
-        _brpc_request->set_allocated_block(block);
-    }
-    _brpc_request->set_packet_seq(_packet_seq++);
-
-    _send_remote_block_callback->cntl_->set_timeout_ms(_brpc_timeout_ms);
-    if (config::exchange_sink_ignore_eovercrowded) {
-        _send_remote_block_callback->cntl_->ignore_eovercrowded();
-    }
-
-    {
-        auto send_remote_block_closure =
-                AutoReleaseClosure<PTransmitDataParams, 
DummyBrpcCallback<PTransmitDataResult>>::
-                        create_unique(_brpc_request, 
_send_remote_block_callback);
-        if (enable_http_send_block(*_brpc_request)) {
-            RETURN_IF_ERROR(transmit_block_httpv2(
-                    _state->exec_env(), std::move(send_remote_block_closure), 
_brpc_dest_addr));
-        } else {
-            transmit_blockv2(*_brpc_stub, 
std::move(send_remote_block_closure));
+        const auto sender_id = _parent->sender_id();
+        if (!block->empty()) [[likely]] {
+            _local_recvr->add_block(block, sender_id, can_be_moved);
         }
-    }
-
-    if (block != nullptr) {
-        static_cast<void>(_brpc_request->release_block());
-    }
-    return Status::OK();
-}
 
-template <typename Parent>
-Status Channel<Parent>::add_rows(Block* block, const std::vector<uint32_t>& 
rows, bool eos) {
-    if (_fragment_instance_id.lo == -1) {
+        if (eos) [[unlikely]] {
+            _local_recvr->remove_sender(sender_id, _be_number, Status::OK());
+            _parent->on_channel_finished(_fragment_instance_id.lo);
+        }
         return Status::OK();
+    } else {
+        _receiver_status = std::move(receiver_status);
+        _parent->on_channel_finished(_fragment_instance_id.lo);
+        return _receiver_status;
     }
-
-    bool serialized = false;
-    RETURN_IF_ERROR(
-            _serializer.next_serialized_block(block, _ch_cur_pb_block, 1, 
&serialized, eos, &rows));
-    if (serialized) {
-        RETURN_IF_ERROR(send_current_block(false, Status::OK()));
-    }
-
-    return Status::OK();
 }
 
-template <typename Parent>
-Status Channel<Parent>::close_wait(RuntimeState* state) {
-    if (_need_close) {
-        Status st = _wait_last_brpc();
-        if (st.is<ErrorCode::END_OF_FILE>()) {
-            st = Status::OK();
-        } else if (!st.ok()) {
-            state->log_error(st.to_string());
-        }
-        _need_close = false;
-        return st;
+Status Channel::close(RuntimeState* state) {
+    if (_closed) {
+        return Status::OK();
     }
-    _serializer.reset_block();
-    return Status::OK();
-}
+    _closed = true;
 
-template <typename Parent>
-Status Channel<Parent>::close_internal(Status exec_status) {
     if (!_need_close) {
         return Status::OK();
     }
-    VLOG_RPC << "Channel::close_internal() instance_id=" << 
print_id(_fragment_instance_id)
-             << " dest_node=" << _dest_node_id << " #rows= "
-             << ((_serializer.get_block() == nullptr) ? 0 : 
_serializer.get_block()->rows())
-             << " receiver status: " << _receiver_status << ", exec_status: " 
<< exec_status;
+
     if (is_receiver_eof()) {
         _serializer.reset_block();
         return Status::OK();
-    }
-    Status status;
-    if (_serializer.get_block() != nullptr && _serializer.get_block()->rows() 
> 0) {
-        status = send_current_block(true, exec_status);
-    } else {
-        if (is_local()) {
-            if (_recvr_is_valid()) {
-                _local_recvr->remove_sender(_parent->sender_id(), _be_number, 
exec_status);
-            }
-        } else {
-            // Non pipeline engine will send an empty eos block
-            status = send_remote_block((PBlock*)nullptr, true, exec_status);
-        }
-    }
-    // Don't wait for the last packet to finish, left it to close_wait.
-    if (status.is<ErrorCode::END_OF_FILE>()) {
-        return Status::OK();
     } else {
-        return status;
+        return _send_current_block(true);
     }
 }
 
-template <typename Parent>
-Status Channel<Parent>::close(RuntimeState* state, Status exec_status) {
-    if (_closed) {
-        return Status::OK();
-    }
-    _closed = true;
-
-    Status st = close_internal(exec_status);
-    if (!st.ok()) {
-        state->log_error(st.to_string());
-    }
-    return st;
-}
-
-template <typename Parent>
-void Channel<Parent>::ch_roll_pb_block() {
-    _ch_cur_pb_block = (_ch_cur_pb_block == &_ch_pb_block1 ? &_ch_pb_block2 : 
&_ch_pb_block1);
-}
-
-template <typename Parent>
-BlockSerializer<Parent>::BlockSerializer(Parent* parent, bool is_local)
+BlockSerializer::BlockSerializer(pipeline::ExchangeSinkLocalState* parent, 
bool is_local)
         : _parent(parent), _is_local(is_local), 
_batch_size(parent->state()->batch_size()) {}
 
-template <typename Parent>
-Status BlockSerializer<Parent>::next_serialized_block(Block* block, PBlock* 
dest, int num_receivers,
-                                                      bool* serialized, bool 
eos,
-                                                      const 
std::vector<uint32_t>* rows) {
+Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest, int 
num_receivers,
+                                              bool* serialized, bool eos,
+                                              const std::vector<uint32_t>* 
rows) {
     if (_mutable_block == nullptr) {
         _mutable_block = MutableBlock::create_unique(block->clone_empty());
     }
@@ -403,8 +266,7 @@ Status 
BlockSerializer<Parent>::next_serialized_block(Block* block, PBlock* dest
     return Status::OK();
 }
 
-template <typename Parent>
-Status BlockSerializer<Parent>::serialize_block(PBlock* dest, int 
num_receivers) {
+Status BlockSerializer::serialize_block(PBlock* dest, int num_receivers) {
     if (_mutable_block && _mutable_block->rows() > 0) {
         auto block = _mutable_block->to_block();
         RETURN_IF_ERROR(serialize_block(&block, dest, num_receivers));
@@ -415,29 +277,20 @@ Status BlockSerializer<Parent>::serialize_block(PBlock* 
dest, int num_receivers)
     return Status::OK();
 }
 
-template <typename Parent>
-Status BlockSerializer<Parent>::serialize_block(const Block* src, PBlock* 
dest, int num_receivers) {
-    if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) 
{
-        SCOPED_TIMER(_parent->_serialize_batch_timer);
-        dest->Clear();
-        size_t uncompressed_bytes = 0, compressed_bytes = 0;
-        RETURN_IF_ERROR(src->serialize(
-                _parent->_state->be_exec_version(), dest, &uncompressed_bytes, 
&compressed_bytes,
-                _parent->compression_type(), 
_parent->transfer_large_data_by_brpc()));
-        COUNTER_UPDATE(_parent->_bytes_sent_counter, compressed_bytes * 
num_receivers);
-        COUNTER_UPDATE(_parent->_uncompressed_bytes_counter, 
uncompressed_bytes * num_receivers);
-        COUNTER_UPDATE(_parent->_compress_timer, src->get_compress_time());
-        
_parent->get_query_statistics_ptr()->add_shuffle_send_bytes(compressed_bytes *
-                                                                    
num_receivers);
-        _parent->get_query_statistics_ptr()->add_shuffle_send_rows(src->rows() 
* num_receivers);
-    }
+Status BlockSerializer::serialize_block(const Block* src, PBlock* dest, int 
num_receivers) {
+    SCOPED_TIMER(_parent->_serialize_batch_timer);
+    dest->Clear();
+    size_t uncompressed_bytes = 0, compressed_bytes = 0;
+    RETURN_IF_ERROR(src->serialize(_parent->_state->be_exec_version(), dest, 
&uncompressed_bytes,
+                                   &compressed_bytes, 
_parent->compression_type(),
+                                   _parent->transfer_large_data_by_brpc()));
+    COUNTER_UPDATE(_parent->_bytes_sent_counter, compressed_bytes * 
num_receivers);
+    COUNTER_UPDATE(_parent->_uncompressed_bytes_counter, uncompressed_bytes * 
num_receivers);
+    COUNTER_UPDATE(_parent->_compress_timer, src->get_compress_time());
+    
_parent->get_query_statistics_ptr()->add_shuffle_send_bytes(compressed_bytes * 
num_receivers);
+    _parent->get_query_statistics_ptr()->add_shuffle_send_rows(src->rows() * 
num_receivers);
 
     return Status::OK();
 }
 
-template class Channel<pipeline::ExchangeSinkLocalState>;
-template class Channel<pipeline::ResultFileSinkLocalState>;
-template class BlockSerializer<pipeline::ResultFileSinkLocalState>;
-template class BlockSerializer<pipeline::ExchangeSinkLocalState>;
-
 } // namespace doris::vectorized
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index 43d00b0164a..da0ee22ac14 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -72,13 +72,10 @@ class ExchangeSinkLocalState;
 } // namespace pipeline
 
 namespace vectorized {
-template <typename>
-class Channel;
 
-template <typename Parent>
 class BlockSerializer {
 public:
-    BlockSerializer(Parent* parent, bool is_local = true);
+    BlockSerializer(pipeline::ExchangeSinkLocalState* parent, bool is_local = 
true);
     Status next_serialized_block(Block* src, PBlock* dest, int num_receivers, 
bool* serialized,
                                  bool eos, const std::vector<uint32_t>* rows = 
nullptr);
     Status serialize_block(PBlock* dest, int num_receivers = 1);
@@ -91,21 +88,13 @@ public:
     void set_is_local(bool is_local) { _is_local = is_local; }
 
 private:
-    Parent* _parent;
+    pipeline::ExchangeSinkLocalState* _parent;
     std::unique_ptr<MutableBlock> _mutable_block;
 
     bool _is_local;
     const int _batch_size;
 };
 
-struct ShuffleChannelIds {
-    template <typename HashValueType>
-    HashValueType operator()(HashValueType l, size_t r) {
-        return l % r;
-    }
-};
-
-template <typename Parent>
 class Channel {
 public:
     friend class pipeline::ExchangeSinkBuffer;
@@ -113,23 +102,15 @@ public:
     // combination. buffer_size is specified in bytes and a soft limit on
     // how much tuple data is getting accumulated before being sent; it only 
applies
     // when data is added via add_row() and not sent directly via send_batch().
-    Channel(Parent* parent, const RowDescriptor& row_desc, TNetworkAddress 
brpc_dest,
+    Channel(pipeline::ExchangeSinkLocalState* parent, TNetworkAddress 
brpc_dest,
             TUniqueId fragment_instance_id, PlanNodeId dest_node_id)
             : _parent(parent),
-              _row_desc(row_desc),
               _fragment_instance_id(std::move(fragment_instance_id)),
               _dest_node_id(dest_node_id),
-              _need_close(false),
-              _closed(false),
               _brpc_dest_addr(std::move(brpc_dest)),
               _is_local((_brpc_dest_addr.hostname == 
BackendOptions::get_localhost()) &&
                         (_brpc_dest_addr.port == config::brpc_port)),
-              _serializer(_parent, _is_local) {
-        if (_is_local) {
-            VLOG_NOTICE << "will use local Exchange, dest_node_id is : " << 
_dest_node_id;
-        }
-        _ch_cur_pb_block = &_ch_pb_block1;
-    }
+              _serializer(_parent, _is_local) {}
 
     virtual ~Channel() = default;
 
@@ -138,37 +119,12 @@ public:
     Status init(RuntimeState* state);
     Status open(RuntimeState* state);
 
-    // Asynchronously sends a row batch.
-    // Returns the status of the most recently finished transmit_data
-    // rpc (or OK if there wasn't one that hasn't been reported yet).
-    // if batch is nullptr, send the eof packet
-    virtual Status send_remote_block(PBlock* block, bool eos = false,
-                                     Status exec_status = Status::OK());
-
-    virtual Status 
send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block,
-                                        bool eos = false) {
-        return Status::InternalError("Send BroadcastPBlockHolder is not 
allowed!");
-    }
-
-    virtual Status add_rows(Block* block, const std::vector<uint32_t>& row, 
bool eos);
-
-    virtual Status send_current_block(bool eos, Status exec_status);
-
-    Status send_local_block(Status exec_status, bool eos = false);
-
-    Status send_local_block(Block* block, bool can_be_moved);
+    Status send_local_block(Block* block, bool eos, bool can_be_moved);
     // Flush buffered rows and close channel. This function don't wait the 
response
     // of close operation, client should call close_wait() to finish channel's 
close.
     // We split one close operation into two phases in order to make multiple 
channels
     // can run parallel.
-    Status close(RuntimeState* state, Status exec_status);
-
-    // Get close wait's response, to finish channel close operation.
-    Status close_wait(RuntimeState* state);
-
-    int64_t num_data_bytes_sent() const { return _num_data_bytes_sent; }
-
-    PBlock* ch_cur_pb_block() { return _ch_cur_pb_block; }
+    Status close(RuntimeState* state);
 
     std::string get_fragment_instance_id_str() {
         UniqueId uid(_fragment_instance_id);
@@ -177,151 +133,40 @@ public:
 
     bool is_local() const { return _is_local; }
 
-    virtual void ch_roll_pb_block();
-
     bool is_receiver_eof() const { return 
_receiver_status.is<ErrorCode::END_OF_FILE>(); }
 
     void set_receiver_eof(Status st) { _receiver_status = st; }
 
-protected:
-    bool _recvr_is_valid() {
-        if (_local_recvr && !_local_recvr->is_closed()) {
-            return true;
-        }
-        _receiver_status = Status::EndOfFile(
-                "local data stream receiver closed"); // local data stream 
receiver closed
-        return false;
-    }
-
-    Status _wait_last_brpc() {
-        SCOPED_TIMER(_parent->brpc_wait_timer());
-        if (_send_remote_block_callback == nullptr) {
-            return Status::OK();
-        }
-        _send_remote_block_callback->join();
-        if (_send_remote_block_callback->cntl_->Failed()) {
-            std::string err = fmt::format(
-                    "failed to send brpc batch, error={}, error_text={}, 
client: {}, "
-                    "latency = {}",
-                    berror(_send_remote_block_callback->cntl_->ErrorCode()),
-                    _send_remote_block_callback->cntl_->ErrorText(),
-                    BackendOptions::get_localhost(),
-                    _send_remote_block_callback->cntl_->latency_us());
-            LOG(WARNING) << err;
-            return Status::RpcError(err);
-        }
-        _receiver_status = 
Status::create(_send_remote_block_callback->response_->status());
-        return _receiver_status;
-    }
-
-    Status close_internal(Status exec_status);
-
-    Parent* _parent = nullptr;
-
-    const RowDescriptor& _row_desc;
-    const TUniqueId _fragment_instance_id;
-    PlanNodeId _dest_node_id;
-
-    // the number of RowBatch.data bytes sent successfully
-    int64_t _num_data_bytes_sent {};
-    int64_t _packet_seq {};
-
-    bool _need_close;
-    bool _closed;
-    int _be_number;
-
-    TNetworkAddress _brpc_dest_addr;
-
-    PUniqueId _finst_id;
-    PUniqueId _query_id;
-    PBlock _pb_block;
-    std::shared_ptr<PTransmitDataParams> _brpc_request = nullptr;
-    std::shared_ptr<PBackendService_Stub> _brpc_stub = nullptr;
-    std::shared_ptr<DummyBrpcCallback<PTransmitDataResult>> 
_send_remote_block_callback;
-    Status _receiver_status;
-    int32_t _brpc_timeout_ms = 500;
-    RuntimeState* _state = nullptr;
-
-    bool _is_local;
-    std::shared_ptr<VDataStreamRecvr> _local_recvr;
-    // serialized blocks for broadcasting; we need two so we can write
-    // one while the other one is still being sent.
-    // Which is for same reason as `_cur_pb_block`, `_pb_block1` and 
`_pb_block2`
-    // in VDataStreamSender.
-    PBlock* _ch_cur_pb_block = nullptr;
-    PBlock _ch_pb_block1;
-    PBlock _ch_pb_block2;
-
-    BlockSerializer<Parent> _serializer;
-};
-
-#define HANDLE_CHANNEL_STATUS(state, channel, status)    \
-    do {                                                 \
-        if (status.is<ErrorCode::END_OF_FILE>()) {       \
-            _handle_eof_channel(state, channel, status); \
-        } else {                                         \
-            RETURN_IF_ERROR(status);                     \
-        }                                                \
-    } while (0)
-
-class PipChannel final : public Channel<pipeline::ExchangeSinkLocalState> {
-public:
-    PipChannel(pipeline::ExchangeSinkLocalState* parent, const RowDescriptor& 
row_desc,
-               const TNetworkAddress& brpc_dest, const TUniqueId& 
fragment_instance_id,
-               PlanNodeId dest_node_id)
-            : Channel<pipeline::ExchangeSinkLocalState>(parent, row_desc, 
brpc_dest,
-                                                        fragment_instance_id, 
dest_node_id) {
-        ch_roll_pb_block();
-    }
-
-    ~PipChannel() override { delete 
Channel<pipeline::ExchangeSinkLocalState>::_ch_cur_pb_block; }
-
     int64_t mem_usage() const;
 
-    void ch_roll_pb_block() override {
-        // We have two choices here.
-        // 1. Use a PBlock pool and fetch an available PBlock if we need one. 
In this way, we can
-        //    reuse the memory, but we have to use a lock to synchronize.
-        // 2. Create a new PBlock every time. In this way we don't need a lock 
but have to allocate
-        //    new memory.
-        // Now we use the second way.
-        Channel<pipeline::ExchangeSinkLocalState>::_ch_cur_pb_block = new 
PBlock();
-    }
-
     // Asynchronously sends a block
     // Returns the status of the most recently finished transmit_data
     // rpc (or OK if there wasn't one that hasn't been reported yet).
     // if batch is nullptr, send the eof packet
-    Status send_remote_block(PBlock* block, bool eos = false,
-                             Status exec_status = Status::OK()) override;
+    Status send_remote_block(std::unique_ptr<PBlock>&& block, bool eos = 
false);
+    Status send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block, 
bool eos = false);
 
-    Status send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block,
-                                bool eos = false) override;
-
-    Status add_rows(Block* block, const std::vector<uint32_t>& rows, bool eos) 
override {
-        if 
(Channel<pipeline::ExchangeSinkLocalState>::_fragment_instance_id.lo == -1) {
+    Status add_rows(Block* block, const std::vector<uint32_t>& rows, bool eos) 
{
+        if (_fragment_instance_id.lo == -1) {
             return Status::OK();
         }
 
         bool serialized = false;
-        _pblock = std::make_unique<PBlock>();
-        RETURN_IF_ERROR(
-                
Channel<pipeline::ExchangeSinkLocalState>::_serializer.next_serialized_block(
-                        block, _pblock.get(), 1, &serialized, eos, &rows));
+        if (_pblock == nullptr) {
+            _pblock = std::make_unique<PBlock>();
+        }
+        RETURN_IF_ERROR(_serializer.next_serialized_block(block, 
_pblock.get(), 1, &serialized, eos,
+                                                          &rows));
         if (serialized) {
-            Status exec_status = Status::OK();
-            RETURN_IF_ERROR(send_current_block(eos, exec_status));
+            RETURN_IF_ERROR(_send_current_block(eos));
         }
 
         return Status::OK();
     }
 
-    // send _mutable_block
-    Status send_current_block(bool eos, Status exec_status) override;
-
     void register_exchange_buffer(pipeline::ExchangeSinkBuffer* buffer) {
         _buffer = buffer;
-        
_buffer->register_sink(Channel<pipeline::ExchangeSinkLocalState>::_fragment_instance_id);
+        _buffer->register_sink(_fragment_instance_id);
     }
 
     std::shared_ptr<pipeline::ExchangeSendCallback<PTransmitDataResult>> 
get_send_callback(
@@ -337,12 +182,53 @@ public:
 
     std::shared_ptr<pipeline::Dependency> get_local_channel_dependency();
 
-private:
+protected:
+    Status _send_local_block(bool eos);
+    Status _send_current_block(bool eos);
+
+    Status _recvr_status() const {
+        if (_local_recvr && !_local_recvr->is_closed()) {
+            return Status::OK();
+        }
+        return Status::EndOfFile(
+                "local data stream receiver closed"); // local data stream 
receiver closed
+    }
+
+    pipeline::ExchangeSinkLocalState* _parent = nullptr;
+
+    const TUniqueId _fragment_instance_id;
+    PlanNodeId _dest_node_id;
+    bool _closed {false};
+    bool _need_close {false};
+    int _be_number;
+
+    TNetworkAddress _brpc_dest_addr;
+
+    PBlock _pb_block;
+    std::shared_ptr<PBackendService_Stub> _brpc_stub = nullptr;
+    Status _receiver_status;
+    int32_t _brpc_timeout_ms = 500;
+    RuntimeState* _state = nullptr;
+
+    bool _is_local;
+    std::shared_ptr<VDataStreamRecvr> _local_recvr;
+
+    BlockSerializer _serializer;
+
     pipeline::ExchangeSinkBuffer* _buffer = nullptr;
     bool _eos_send = false;
     std::shared_ptr<pipeline::ExchangeSendCallback<PTransmitDataResult>> 
_send_callback;
     std::unique_ptr<PBlock> _pblock;
 };
 
+#define HANDLE_CHANNEL_STATUS(state, channel, status)    \
+    do {                                                 \
+        if (status.is<ErrorCode::END_OF_FILE>()) {       \
+            _handle_eof_channel(state, channel, status); \
+        } else {                                         \
+            RETURN_IF_ERROR(status);                     \
+        }                                                \
+    } while (0)
+
 } // namespace vectorized
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to