github-actions[bot] commented on code in PR #42341:
URL: https://github.com/apache/doris/pull/42341#discussion_r1812483465


##########
be/src/pipeline/exec/exchange_sink_buffer.h:
##########
@@ -22,9 +22,9 @@
 #include <gen_cpp/internal_service.pb.h>
 #include <gen_cpp/types.pb.h>
 #include <parallel_hashmap/phmap.h>
+#include <stdint.h>

Review Comment:
   warning: inclusion of deprecated C++ header 'stdint.h'; consider using 
'cstdint' instead [modernize-deprecated-headers]
   
   ```suggestion
   #include <cstdint>
   ```
   



##########
be/src/pipeline/exec/exchange_sink_operator.cpp:
##########
@@ -107,28 +108,14 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
         _wait_channel_timer.push_back(_profile->add_nonzero_counter(
                 fmt::format("WaitForLocalExchangeBuffer{}", i), TUnit 
::TIME_NS, timer_name, 1));
     }
+
+    _sink_buffer = p._sink_buffer;
+    _sink_buffer->set_dependency(_queue_dependency, _finish_dependency);
+    _sink_buffer->inc_running_sink(this);
     _wait_broadcast_buffer_timer = ADD_CHILD_TIMER(_profile, 
"WaitForBroadcastBuffer", timer_name);
     return Status::OK();
 }
 
-void ExchangeSinkLocalState::on_channel_finished(InstanceLoId channel_id) {
-    std::lock_guard<std::mutex> lock(_finished_channels_mutex);
-
-    if (_finished_channels.contains(channel_id)) {
-        LOG(WARNING) << "query: " << print_id(_state->query_id())
-                     << ", on_channel_finished on already finished channel: " 
<< channel_id;
-        return;
-    } else {
-        _finished_channels.emplace(channel_id);
-        if (_working_channels_count.fetch_sub(1) == 1) {
-            set_reach_limit();
-            if (_finish_dependency) {
-                _finish_dependency->set_ready();
-            }
-        }
-    }
-}
-
 Status ExchangeSinkLocalState::open(RuntimeState* state) {

Review Comment:
   warning: function 'open' has cognitive complexity of 64 (threshold 50) 
[readability-function-cognitive-complexity]
   ```cpp
   Status ExchangeSinkLocalState::open(RuntimeState* state) {
                                  ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/pipeline/exec/exchange_sink_operator.cpp:120:** nesting level 
increased to 1
   ```cpp
       SCOPED_TIMER(_open_timer);
       ^
   ```
   **be/src/util/runtime_profile.h:67:** expanded from macro 'SCOPED_TIMER'
   ```cpp
   #define SCOPED_TIMER(c) ScopedTimer<MonotonicStopWatch> 
MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c)
                           ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:121:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       RETURN_IF_ERROR(Base::open(state));
       ^
   ```
   **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:121:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
       RETURN_IF_ERROR(Base::open(state));
       ^
   ```
   **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:124:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (_part_type == TPartitionType::UNPARTITIONED || _part_type == 
TPartitionType::RANDOM ||
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:144:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (!only_local_exchange) {
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:147:** +1, nesting level 
increased to 1
   ```cpp
       } else {
         ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:151:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       if ((_part_type == TPartitionType::UNPARTITIONED || channels.size() == 
1) &&
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:157:** +1, nesting level 
increased to 1
   ```cpp
       } else if (local_size > 0) {
              ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:172:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (_part_type == TPartitionType::HASH_PARTITIONED) {
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:176:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           RETURN_IF_ERROR(_partitioner->init(p._texprs));
           ^
   ```
   **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:176:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
           RETURN_IF_ERROR(_partitioner->init(p._texprs));
           ^
   ```
   **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:177:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
           ^
   ```
   **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:177:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
           RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
           ^
   ```
   **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:180:** +1, nesting level 
increased to 1
   ```cpp
       } else if (_part_type == 
TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
              ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:184:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           RETURN_IF_ERROR(_partitioner->init(p._texprs));
           ^
   ```
   **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:184:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
           RETURN_IF_ERROR(_partitioner->init(p._texprs));
           ^
   ```
   **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:185:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
           ^
   ```
   **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:185:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
           RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
           ^
   ```
   **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:188:** +1, nesting level 
increased to 1
   ```cpp
       } else if (_part_type == 
TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
              ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:194:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           RETURN_IF_ERROR(_schema->init(p._tablet_sink_schema));
           ^
   ```
   **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:194:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
           RETURN_IF_ERROR(_schema->init(p._tablet_sink_schema));
           ^
   ```
   **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:196:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           RETURN_IF_ERROR(_vpartition->init());
           ^
   ```
   **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:196:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
           RETURN_IF_ERROR(_vpartition->init());
           ^
   ```
   **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:226:** +1, nesting level 
increased to 1
   ```cpp
       } else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
              ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:249:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           RETURN_IF_ERROR(_partitioner->init(p._texprs));
           ^
   ```
   **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:249:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
           RETURN_IF_ERROR(_partitioner->init(p._texprs));
           ^
   ```
   **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:250:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
           ^
   ```
   **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:250:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
           RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
           ^
   ```
   **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:255:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (_part_type == TPartitionType::HASH_PARTITIONED ||
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:258:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           RETURN_IF_ERROR(_partitioner->open(state));
           ^
   ```
   **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:258:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
           RETURN_IF_ERROR(_partitioner->open(state));
           ^
   ```
   **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:259:** +1, nesting level 
increased to 1
   ```cpp
       } else if (_part_type == 
TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
              ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:260:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           RETURN_IF_ERROR(_row_distribution.open(_tablet_sink_row_desc));
           ^
   ```
   **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:260:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
           RETURN_IF_ERROR(_row_distribution.open(_tablet_sink_row_desc));
           ^
   ```
   **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   
   </details>
   



##########
be/src/vec/sink/vdata_stream_sender.h:
##########
@@ -119,106 +131,98 @@
     Status init(RuntimeState* state);
     Status open(RuntimeState* state);
 
-    Status send_local_block(Block* block, bool eos, bool can_be_moved);
-    // Flush buffered rows and close channel. This function don't wait the 
response
-    // of close operation, client should call close_wait() to finish channel's 
close.
-    // We split one close operation into two phases in order to make multiple 
channels
-    // can run parallel.
-    Status close(RuntimeState* state);
+    Status send_local_block(Status exec_status, bool eos = false);
+
+    Status send_local_block(Block* block, bool can_be_moved);
+
+    // Get close wait's response, to finish channel close operation.
+    Status close_wait(RuntimeState* state);
+
+    int64_t num_data_bytes_sent() const { return _num_data_bytes_sent; }
+
+    PBlock* ch_cur_pb_block() { return _ch_cur_pb_block; }
 
     std::string get_fragment_instance_id_str() {
-        UniqueId uid(_fragment_instance_id);
+        UniqueId uid(_dest_fragment_instance_id);
         return uid.to_string();
     }
 
     bool is_local() const { return _is_local; }
 
+    virtual void ch_roll_pb_block();
+
     bool is_receiver_eof() const { return 
_receiver_status.is<ErrorCode::END_OF_FILE>(); }
 
     void set_receiver_eof(Status st) { _receiver_status = st; }
 
-    int64_t mem_usage() const;
-
-    // Asynchronously sends a block
-    // Returns the status of the most recently finished transmit_data
-    // rpc (or OK if there wasn't one that hasn't been reported yet).
-    // if batch is nullptr, send the eof packet
-    Status send_remote_block(std::unique_ptr<PBlock>&& block, bool eos = 
false);
-    Status send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block, 
bool eos = false);
-
-    Status add_rows(Block* block, const std::vector<uint32_t>& rows, bool eos) 
{
-        if (_fragment_instance_id.lo == -1) {
-            return Status::OK();
-        }
-
-        bool serialized = false;
-        if (_pblock == nullptr) {
-            _pblock = std::make_unique<PBlock>();
-        }
-        RETURN_IF_ERROR(_serializer.next_serialized_block(block, 
_pblock.get(), 1, &serialized, eos,
-                                                          &rows));
-        if (serialized) {
-            RETURN_IF_ERROR(_send_current_block(eos));
-        }
-
-        return Status::OK();
-    }
-
-    void register_exchange_buffer(pipeline::ExchangeSinkBuffer* buffer) {
-        _buffer = buffer;
-        _buffer->register_sink(_fragment_instance_id);
-    }
-
-    std::shared_ptr<pipeline::ExchangeSendCallback<PTransmitDataResult>> 
get_send_callback(
-            InstanceLoId id, bool eos) {
-        if (!_send_callback) {
-            _send_callback = 
pipeline::ExchangeSendCallback<PTransmitDataResult>::create_shared();
-        } else {
-            _send_callback->cntl_->Reset();
+protected:
+    bool _recvr_is_valid() {
+        if (_local_recvr && !_local_recvr->is_closed()) {
+            return true;
         }
-        _send_callback->init(id, eos);
-        return _send_callback;
+        _receiver_status = Status::EndOfFile(
+                "local data stream receiver closed"); // local data stream 
receiver closed
+        return false;
     }
 
-    std::shared_ptr<pipeline::Dependency> get_local_channel_dependency();
-
-protected:
-    Status _send_local_block(bool eos);
-    Status _send_current_block(bool eos);
-
-    Status _recvr_status() const {
-        if (_local_recvr && !_local_recvr->is_closed()) {
+    Status _wait_last_brpc() {
+        SCOPED_TIMER(_parent->brpc_wait_timer());
+        if (_send_remote_block_callback == nullptr) {
             return Status::OK();
         }
-        return Status::EndOfFile(
-                "local data stream receiver closed"); // local data stream 
receiver closed
+        _send_remote_block_callback->join();
+        if (_send_remote_block_callback->cntl_->Failed()) {
+            std::string err = fmt::format(
+                    "failed to send brpc batch, error={}, error_text={}, 
client: {}, "
+                    "latency = {}",
+                    berror(_send_remote_block_callback->cntl_->ErrorCode()),
+                    _send_remote_block_callback->cntl_->ErrorText(),
+                    BackendOptions::get_localhost(),
+                    _send_remote_block_callback->cntl_->latency_us());
+            LOG(WARNING) << err;
+            return Status::RpcError(err);
+        }
+        _receiver_status = 
Status::create(_send_remote_block_callback->response_->status());
+        return _receiver_status;
     }
 
-    pipeline::ExchangeSinkLocalState* _parent = nullptr;
+    Parent* _parent = nullptr;
 
-    const TUniqueId _fragment_instance_id;
+    const RowDescriptor& _row_desc;
+    const TUniqueId _dest_fragment_instance_id;
     PlanNodeId _dest_node_id;
-    bool _closed {false};
-    bool _need_close {false};
+
+    // the number of RowBatch.data bytes sent successfully
+    int64_t _num_data_bytes_sent {};
+    int64_t _packet_seq {};
+
+    bool _need_close;

Review Comment:
   warning: use default member initializer for '_need_close' 
[modernize-use-default-member-init]
   
   be/src/vec/sink/vdata_stream_sender.h:114:
   ```diff
   -               _need_close(false),
   +               ,
   ```
   
   ```suggestion
       bool _need_close{false};
   ```
   



##########
be/src/pipeline/exec/exchange_sink_operator.cpp:
##########
@@ -383,7 +365,7 @@
                                                 Status st) {
     channel->set_receiver_eof(st);
     // Chanel will not send RPC to the downstream when eof, so close chanel by 
OK status.
-    static_cast<void>(channel->close(state));
+    static_cast<void>(channel->close(state, Status::OK()));
 }
 
 Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* 
block, bool eos) {

Review Comment:
   warning: function 'sink' has cognitive complexity of 232 (threshold 50) 
[readability-function-cognitive-complexity]
   ```cpp
   Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* 
block, bool eos) {
                                 ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/pipeline/exec/exchange_sink_operator.cpp:382:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (all_receiver_eof) {
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:390:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (_part_type == TPartitionType::UNPARTITIONED || 
local_state.channels.size() == 1) {
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:394:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           if (local_state.only_local_exchange) {
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:395:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               if (!block->empty()) {
               ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:410:** +1, nesting level 
increased to 2
   ```cpp
           } else {
             ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:414:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
                   
RETURN_IF_ERROR(local_state._serializer.next_serialized_block(
                   ^
   ```
   **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:414:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
                   
RETURN_IF_ERROR(local_state._serializer.next_serialized_block(
                   ^
   ```
   **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:417:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
                   if (serialized) {
                   ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:419:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
                       if (!cur_block.empty()) {
                       ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:420:** +5, including 
nesting penalty of 4, nesting level increased to 5
   ```cpp
                           
RETURN_IF_ERROR(local_state._serializer.serialize_block(
                           ^
   ```
   **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:420:** +6, including 
nesting penalty of 5, nesting level increased to 6
   ```cpp
                           
RETURN_IF_ERROR(local_state._serializer.serialize_block(
                           ^
   ```
   **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:423:** +1, nesting level 
increased to 4
   ```cpp
                       } else {
                         ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:448:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
                       if (moved) {
                       ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:450:** +1, nesting level 
increased to 4
   ```cpp
                       } else {
                         ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:458:** +1, nesting level 
increased to 1
   ```cpp
       } else if (_part_type == TPartitionType::RANDOM) {
              ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:462:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           if (!current_channel->is_receiver_eof()) {
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:464:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               if (current_channel->is_local()) {
               ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:466:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
                   HANDLE_CHANNEL_STATUS(state, current_channel, status);
                   ^
   ```
   **be/src/vec/sink/vdata_stream_sender.h:228:** expanded from macro 
'HANDLE_CHANNEL_STATUS'
   ```cpp
       do {                                                 \
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:466:** +5, including 
nesting penalty of 4, nesting level increased to 5
   ```cpp
                   HANDLE_CHANNEL_STATUS(state, current_channel, status);
                   ^
   ```
   **be/src/vec/sink/vdata_stream_sender.h:229:** expanded from macro 
'HANDLE_CHANNEL_STATUS'
   ```cpp
           if (status.is<ErrorCode::END_OF_FILE>()) {       \
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:466:** +1, nesting level 
increased to 5
   ```cpp
                   HANDLE_CHANNEL_STATUS(state, current_channel, status);
                   ^
   ```
   **be/src/vec/sink/vdata_stream_sender.h:231:** expanded from macro 
'HANDLE_CHANNEL_STATUS'
   ```cpp
           } else {                                         \
             ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:466:** +6, including 
nesting penalty of 5, nesting level increased to 6
   ```cpp
                   HANDLE_CHANNEL_STATUS(state, current_channel, status);
                   ^
   ```
   **be/src/vec/sink/vdata_stream_sender.h:232:** expanded from macro 
'HANDLE_CHANNEL_STATUS'
   ```cpp
               RETURN_IF_ERROR(status);                     \
               ^
   ```
   **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:466:** +7, including 
nesting penalty of 6, nesting level increased to 7
   ```cpp
                   HANDLE_CHANNEL_STATUS(state, current_channel, status);
                   ^
   ```
   **be/src/vec/sink/vdata_stream_sender.h:232:** expanded from macro 
'HANDLE_CHANNEL_STATUS'
   ```cpp
               RETURN_IF_ERROR(status);                     \
               ^
   ```
   **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:467:** +1, nesting level 
increased to 3
   ```cpp
               } else {
                 ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:468:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
                   RETURN_IF_ERROR(local_state._serializer.serialize_block(
                   ^
   ```
   **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:468:** +5, including 
nesting penalty of 4, nesting level increased to 5
   ```cpp
                   RETURN_IF_ERROR(local_state._serializer.serialize_block(
                   ^
   ```
   **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:472:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
                   HANDLE_CHANNEL_STATUS(state, current_channel, status);
                   ^
   ```
   **be/src/vec/sink/vdata_stream_sender.h:228:** expanded from macro 
'HANDLE_CHANNEL_STATUS'
   ```cpp
       do {                                                 \
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:472:** +5, including 
nesting penalty of 4, nesting level increased to 5
   ```cpp
                   HANDLE_CHANNEL_STATUS(state, current_channel, status);
                   ^
   ```
   **be/src/vec/sink/vdata_stream_sender.h:229:** expanded from macro 
'HANDLE_CHANNEL_STATUS'
   ```cpp
           if (status.is<ErrorCode::END_OF_FILE>()) {       \
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:472:** +1, nesting level 
increased to 5
   ```cpp
                   HANDLE_CHANNEL_STATUS(state, current_channel, status);
                   ^
   ```
   **be/src/vec/sink/vdata_stream_sender.h:231:** expanded from macro 
'HANDLE_CHANNEL_STATUS'
   ```cpp
           } else {                                         \
             ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:472:** +6, including 
nesting penalty of 5, nesting level increased to 6
   ```cpp
                   HANDLE_CHANNEL_STATUS(state, current_channel, status);
                   ^
   ```
   **be/src/vec/sink/vdata_stream_sender.h:232:** expanded from macro 
'HANDLE_CHANNEL_STATUS'
   ```cpp
               RETURN_IF_ERROR(status);                     \
               ^
   ```
   **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:472:** +7, including 
nesting penalty of 6, nesting level increased to 7
   ```cpp
                   HANDLE_CHANNEL_STATUS(state, current_channel, status);
                   ^
   ```
   **be/src/vec/sink/vdata_stream_sender.h:232:** expanded from macro 
'HANDLE_CHANNEL_STATUS'
   ```cpp
               RETURN_IF_ERROR(status);                     \
               ^
   ```
   **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:478:** +1, nesting level 
increased to 1
   ```cpp
       } else if (_part_type == TPartitionType::HASH_PARTITIONED ||
              ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:483:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
               RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, 
block));
               ^
   ```
   **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:483:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, 
block));
               ^
   ```
   **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:490:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           RETURN_IF_ERROR(channel_add_rows(
           ^
   ```
   **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:490:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
           RETURN_IF_ERROR(channel_add_rows(
           ^
   ```
   **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:502:** +1, nesting level 
increased to 1
   ```cpp
       } else if (_part_type == 
TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
              ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:508:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           RETURN_IF_ERROR(local_state._send_new_partition_batch());
           ^
   ```
   **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:508:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
           RETURN_IF_ERROR(local_state._send_new_partition_batch());
           ^
   ```
   **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:515:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           if (input_rows > 0) {
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:520:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               
RETURN_IF_ERROR(local_state._row_distribution.generate_rows_distribution(
               ^
   ```
   **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:520:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
               
RETURN_IF_ERROR(local_state._row_distribution.generate_rows_distribution(
               ^
   ```
   **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:526:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               for (int idx = 0; idx < row_ids.size(); ++idx) {
               ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:534:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           if (eos) {
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:536:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               RETURN_IF_ERROR(local_state._send_new_partition_batch());
               ^
   ```
   **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:536:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
               RETURN_IF_ERROR(local_state._send_new_partition_batch());
               ^
   ```
   **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:540:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           RETURN_IF_ERROR(channel_add_rows_with_idx(state, 
local_state.channels, num_channels,
           ^
   ```
   **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:540:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
           RETURN_IF_ERROR(channel_add_rows_with_idx(state, 
local_state.channels, num_channels,
           ^
   ```
   **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:550:** +1, nesting level 
increased to 1
   ```cpp
       } else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
              ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:557:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
               RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, 
block));
               ^
   ```
   **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:557:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, 
block));
               ^
   ```
   **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:561:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           RETURN_IF_ERROR(channel_add_rows_with_idx(
           ^
   ```
   **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:561:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
           RETURN_IF_ERROR(channel_add_rows_with_idx(
           ^
   ```
   **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:572:** +1, nesting level 
increased to 1
   ```cpp
       } else if (_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
              ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:577:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           if (!current_channel->is_receiver_eof()) {
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:579:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               if (current_channel->is_local()) {
               ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:581:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
                   HANDLE_CHANNEL_STATUS(state, current_channel, status);
                   ^
   ```
   **be/src/vec/sink/vdata_stream_sender.h:228:** expanded from macro 
'HANDLE_CHANNEL_STATUS'
   ```cpp
       do {                                                 \
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:581:** +5, including 
nesting penalty of 4, nesting level increased to 5
   ```cpp
                   HANDLE_CHANNEL_STATUS(state, current_channel, status);
                   ^
   ```
   **be/src/vec/sink/vdata_stream_sender.h:229:** expanded from macro 
'HANDLE_CHANNEL_STATUS'
   ```cpp
           if (status.is<ErrorCode::END_OF_FILE>()) {       \
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:581:** +1, nesting level 
increased to 5
   ```cpp
                   HANDLE_CHANNEL_STATUS(state, current_channel, status);
                   ^
   ```
   **be/src/vec/sink/vdata_stream_sender.h:231:** expanded from macro 
'HANDLE_CHANNEL_STATUS'
   ```cpp
           } else {                                         \
             ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:581:** +6, including 
nesting penalty of 5, nesting level increased to 6
   ```cpp
                   HANDLE_CHANNEL_STATUS(state, current_channel, status);
                   ^
   ```
   **be/src/vec/sink/vdata_stream_sender.h:232:** expanded from macro 
'HANDLE_CHANNEL_STATUS'
   ```cpp
               RETURN_IF_ERROR(status);                     \
               ^
   ```
   **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:581:** +7, including 
nesting penalty of 6, nesting level increased to 7
   ```cpp
                   HANDLE_CHANNEL_STATUS(state, current_channel, status);
                   ^
   ```
   **be/src/vec/sink/vdata_stream_sender.h:232:** expanded from macro 
'HANDLE_CHANNEL_STATUS'
   ```cpp
               RETURN_IF_ERROR(status);                     \
               ^
   ```
   **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:582:** +1, nesting level 
increased to 3
   ```cpp
               } else {
                 ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:583:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
                   RETURN_IF_ERROR(local_state._serializer.serialize_block(
                   ^
   ```
   **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:583:** +5, including 
nesting penalty of 4, nesting level increased to 5
   ```cpp
                   RETURN_IF_ERROR(local_state._serializer.serialize_block(
                   ^
   ```
   **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:587:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
                   HANDLE_CHANNEL_STATUS(state, current_channel, status);
                   ^
   ```
   **be/src/vec/sink/vdata_stream_sender.h:228:** expanded from macro 
'HANDLE_CHANNEL_STATUS'
   ```cpp
       do {                                                 \
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:587:** +5, including 
nesting penalty of 4, nesting level increased to 5
   ```cpp
                   HANDLE_CHANNEL_STATUS(state, current_channel, status);
                   ^
   ```
   **be/src/vec/sink/vdata_stream_sender.h:229:** expanded from macro 
'HANDLE_CHANNEL_STATUS'
   ```cpp
           if (status.is<ErrorCode::END_OF_FILE>()) {       \
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:587:** +1, nesting level 
increased to 5
   ```cpp
                   HANDLE_CHANNEL_STATUS(state, current_channel, status);
                   ^
   ```
   **be/src/vec/sink/vdata_stream_sender.h:231:** expanded from macro 
'HANDLE_CHANNEL_STATUS'
   ```cpp
           } else {                                         \
             ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:587:** +6, including 
nesting penalty of 5, nesting level increased to 6
   ```cpp
                   HANDLE_CHANNEL_STATUS(state, current_channel, status);
                   ^
   ```
   **be/src/vec/sink/vdata_stream_sender.h:232:** expanded from macro 
'HANDLE_CHANNEL_STATUS'
   ```cpp
               RETURN_IF_ERROR(status);                     \
               ^
   ```
   **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:587:** +7, including 
nesting penalty of 6, nesting level increased to 7
   ```cpp
                   HANDLE_CHANNEL_STATUS(state, current_channel, status);
                   ^
   ```
   **be/src/vec/sink/vdata_stream_sender.h:232:** expanded from macro 
'HANDLE_CHANNEL_STATUS'
   ```cpp
               RETURN_IF_ERROR(status);                     \
               ^
   ```
   **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:593:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           if (_writer_count < local_state.channels.size()) {
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:594:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               if (_data_processed >=
               ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:601:** +1, nesting level 
increased to 1
   ```cpp
       } else {
         ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:608:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (eos) {
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:610:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           for (int i = 0; i < local_state.channels.size(); ++i) {
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:612:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               if (!st.ok() && final_st.ok()) {
               ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:612:** +1
   ```cpp
               if (!st.ok() && final_st.ok()) {
                            ^
   ```
   **be/src/pipeline/exec/exchange_sink_operator.cpp:616:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
           if (local_state._sink_buffer) {
           ^
   ```
   
   </details>
   



##########
be/src/pipeline/exec/exchange_sink_operator.h:
##########
@@ -103,30 +106,30 @@ class ExchangeSinkLocalState final : public 
PipelineXSinkLocalState<> {
     void set_reach_limit() { _reach_limit = true; };
 
     [[nodiscard]] int sender_id() const { return _sender_id; }
-
+    [[nodiscard]] int be_number() const { return _state->be_number(); }
     std::string name_suffix() override;
     segment_v2::CompressionTypePB compression_type() const;
     std::string debug_string(int indentation_level) const override;
     static Status empty_callback_function(void* sender, 
TCreatePartitionResult* result) {
         return Status::OK();
     }
     Status _send_new_partition_batch();
-    std::vector<std::shared_ptr<vectorized::Channel>> channels;
-    int current_channel_idx {0}; // index of current channel to send to if 
_random == true
-    bool only_local_exchange {false};
-
-    void on_channel_finished(InstanceLoId channel_id);
+    std::vector<vectorized::PipChannel*> channels;
+    std::vector<std::shared_ptr<vectorized::PipChannel>> channel_shared_ptrs;
+    int current_channel_idx; // index of current channel to send to if _random 
== true

Review Comment:
   warning: use default member initializer for 'current_channel_idx' 
[modernize-use-default-member-init]
   
   be/src/pipeline/exec/exchange_sink_operator.h:56:
   ```diff
   -               current_channel_idx(0),
   +               ,
   ```
   
   ```suggestion
       int current_channel_idx{0}; // index of current channel to send to if 
_random == true
   ```
   



##########
be/src/pipeline/exec/exchange_sink_operator.h:
##########
@@ -103,30 +106,30 @@
     void set_reach_limit() { _reach_limit = true; };
 
     [[nodiscard]] int sender_id() const { return _sender_id; }
-
+    [[nodiscard]] int be_number() const { return _state->be_number(); }
     std::string name_suffix() override;
     segment_v2::CompressionTypePB compression_type() const;
     std::string debug_string(int indentation_level) const override;
     static Status empty_callback_function(void* sender, 
TCreatePartitionResult* result) {
         return Status::OK();
     }
     Status _send_new_partition_batch();
-    std::vector<std::shared_ptr<vectorized::Channel>> channels;
-    int current_channel_idx {0}; // index of current channel to send to if 
_random == true
-    bool only_local_exchange {false};
-
-    void on_channel_finished(InstanceLoId channel_id);
+    std::vector<vectorized::PipChannel*> channels;
+    std::vector<std::shared_ptr<vectorized::PipChannel>> channel_shared_ptrs;
+    int current_channel_idx; // index of current channel to send to if _random 
== true
+    bool only_local_exchange;

Review Comment:
   warning: use default member initializer for 'only_local_exchange' 
[modernize-use-default-member-init]
   
   be/src/pipeline/exec/exchange_sink_operator.h:57:
   ```diff
   -               only_local_exchange(false),
   +               ,
   ```
   
   ```suggestion
       bool only_local_exchange{false};
   ```
   



##########
be/src/pipeline/exec/exchange_sink_buffer.cpp:
##########
@@ -209,30 +222,40 @@ Status 
ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
 Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {

Review Comment:
   warning: function '_send_rpc' has cognitive complexity of 76 (threshold 50) 
[readability-function-cognitive-complexity]
   ```cpp
   Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
                              ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/pipeline/exec/exchange_sink_buffer.cpp:228:** +1, including nesting 
penalty of 0, nesting level increased to 1
   ```cpp
       if (_is_finishing) {
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_buffer.cpp:235:** +1, including nesting 
penalty of 0, nesting level increased to 1
   ```cpp
       while (!q.empty()) {
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_buffer.cpp:250:** +2, including nesting 
penalty of 1, nesting level increased to 2
   ```cpp
           if (request.block && !request.block->column_metas().empty()) {
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_buffer.cpp:250:** +1
   ```cpp
           if (request.block && !request.block->column_metas().empty()) {
                             ^
   ```
   **be/src/pipeline/exec/exchange_sink_buffer.cpp:253:** +2, including nesting 
penalty of 1, nesting level increased to 2
   ```cpp
           if (!request.exec_status.ok()) {
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_buffer.cpp:260:** +2, including nesting 
penalty of 1, nesting level increased to 2
   ```cpp
           if (config::exchange_sink_ignore_eovercrowded) {
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_buffer.cpp:263:** nesting level 
increased to 2
   ```cpp
           send_callback->addFailedHandler([&, weak_task_ctx = 
weak_task_exec_ctx()](
                                           ^
   ```
   **be/src/pipeline/exec/exchange_sink_buffer.cpp:266:** +3, including nesting 
penalty of 2, nesting level increased to 3
   ```cpp
               if (task_lock == nullptr) {
               ^
   ```
   **be/src/pipeline/exec/exchange_sink_buffer.cpp:274:** nesting level 
increased to 2
   ```cpp
           send_callback->addSuccessHandler([&, weak_task_ctx = 
weak_task_exec_ctx()](
                                            ^
   ```
   **be/src/pipeline/exec/exchange_sink_buffer.cpp:279:** +3, including nesting 
penalty of 2, nesting level increased to 3
   ```cpp
               if (task_lock == nullptr) {
               ^
   ```
   **be/src/pipeline/exec/exchange_sink_buffer.cpp:287:** +3, including nesting 
penalty of 2, nesting level increased to 3
   ```cpp
               if (s.is<ErrorCode::END_OF_FILE>()) {
               ^
   ```
   **be/src/pipeline/exec/exchange_sink_buffer.cpp:289:** +1, nesting level 
increased to 3
   ```cpp
               } else if (!s.ok()) {
                      ^
   ```
   **be/src/pipeline/exec/exchange_sink_buffer.cpp:292:** +1, nesting level 
increased to 3
   ```cpp
               } else {
                 ^
   ```
   **be/src/pipeline/exec/exchange_sink_buffer.cpp:294:** +4, including nesting 
penalty of 3, nesting level increased to 4
   ```cpp
                   if (!s) {
                   ^
   ```
   **be/src/pipeline/exec/exchange_sink_buffer.cpp:305:** +2, including nesting 
penalty of 1, nesting level increased to 2
   ```cpp
               if (enable_http_send_block(*brpc_request)) {
               ^
   ```
   **be/src/pipeline/exec/exchange_sink_buffer.cpp:306:** +3, including nesting 
penalty of 2, nesting level increased to 3
   ```cpp
                   RETURN_IF_ERROR(transmit_block_httpv2(_context->exec_env(),
                   ^
   ```
   **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_buffer.cpp:306:** +4, including nesting 
penalty of 3, nesting level increased to 4
   ```cpp
                   RETURN_IF_ERROR(transmit_block_httpv2(_context->exec_env(),
                   ^
   ```
   **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_buffer.cpp:309:** +1, nesting level 
increased to 2
   ```cpp
               } else {
                 ^
   ```
   **be/src/pipeline/exec/exchange_sink_buffer.cpp:314:** +2, including nesting 
penalty of 1, nesting level increased to 2
   ```cpp
           if (request.block) {
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_buffer.cpp:319:** +2, including nesting 
penalty of 1, nesting level increased to 2
   ```cpp
           if (_total_queue_size <= _queue_capacity) {
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_buffer.cpp:324:** +2, including nesting 
penalty of 1, nesting level increased to 2
   ```cpp
           if (_keep_order) {
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_buffer.cpp:329:** +1, including nesting 
penalty of 0, nesting level increased to 1
   ```cpp
       while (!broadcast_q.empty()) {
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_buffer.cpp:344:** +2, including nesting 
penalty of 1, nesting level increased to 2
   ```cpp
           if (request.block_holder->get_block() &&
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_buffer.cpp:344:** +1
   ```cpp
           if (request.block_holder->get_block() &&
                                                 ^
   ```
   **be/src/pipeline/exec/exchange_sink_buffer.cpp:352:** +2, including nesting 
penalty of 1, nesting level increased to 2
   ```cpp
           if (config::exchange_sink_ignore_eovercrowded) {
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_buffer.cpp:355:** nesting level 
increased to 2
   ```cpp
           send_callback->addFailedHandler([&, weak_task_ctx = 
weak_task_exec_ctx()](
                                           ^
   ```
   **be/src/pipeline/exec/exchange_sink_buffer.cpp:358:** +3, including nesting 
penalty of 2, nesting level increased to 3
   ```cpp
               if (task_lock == nullptr) {
               ^
   ```
   **be/src/pipeline/exec/exchange_sink_buffer.cpp:366:** nesting level 
increased to 2
   ```cpp
           send_callback->addSuccessHandler([&, weak_task_ctx = 
weak_task_exec_ctx()](
                                            ^
   ```
   **be/src/pipeline/exec/exchange_sink_buffer.cpp:371:** +3, including nesting 
penalty of 2, nesting level increased to 3
   ```cpp
               if (task_lock == nullptr) {
               ^
   ```
   **be/src/pipeline/exec/exchange_sink_buffer.cpp:379:** +3, including nesting 
penalty of 2, nesting level increased to 3
   ```cpp
               if (s.is<ErrorCode::END_OF_FILE>()) {
               ^
   ```
   **be/src/pipeline/exec/exchange_sink_buffer.cpp:381:** +1, nesting level 
increased to 3
   ```cpp
               } else if (!s.ok()) {
                      ^
   ```
   **be/src/pipeline/exec/exchange_sink_buffer.cpp:384:** +1, nesting level 
increased to 3
   ```cpp
               } else {
                 ^
   ```
   **be/src/pipeline/exec/exchange_sink_buffer.cpp:386:** +4, including nesting 
penalty of 3, nesting level increased to 4
   ```cpp
                   if (!s) {
                   ^
   ```
   **be/src/pipeline/exec/exchange_sink_buffer.cpp:397:** +2, including nesting 
penalty of 1, nesting level increased to 2
   ```cpp
               if (enable_http_send_block(*brpc_request)) {
               ^
   ```
   **be/src/pipeline/exec/exchange_sink_buffer.cpp:398:** +3, including nesting 
penalty of 2, nesting level increased to 3
   ```cpp
                   RETURN_IF_ERROR(transmit_block_httpv2(_context->exec_env(),
                   ^
   ```
   **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/exchange_sink_buffer.cpp:398:** +4, including nesting 
penalty of 3, nesting level increased to 4
   ```cpp
                   RETURN_IF_ERROR(transmit_block_httpv2(_context->exec_env(),
                   ^
   ```
   **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_buffer.cpp:401:** +1, nesting level 
increased to 2
   ```cpp
               } else {
                 ^
   ```
   **be/src/pipeline/exec/exchange_sink_buffer.cpp:406:** +2, including nesting 
penalty of 1, nesting level increased to 2
   ```cpp
           if (request.block_holder->get_block()) {
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_buffer.cpp:410:** +2, including nesting 
penalty of 1, nesting level increased to 2
   ```cpp
           if (_keep_order) {
           ^
   ```
   **be/src/pipeline/exec/exchange_sink_buffer.cpp:414:** +1, including nesting 
penalty of 0, nesting level increased to 1
   ```cpp
       if (is_empty) {
       ^
   ```
   
   </details>
   



##########
be/src/vec/sink/vdata_stream_sender.h:
##########
@@ -119,106 +131,98 @@ class Channel {
     Status init(RuntimeState* state);
     Status open(RuntimeState* state);
 
-    Status send_local_block(Block* block, bool eos, bool can_be_moved);
-    // Flush buffered rows and close channel. This function don't wait the 
response
-    // of close operation, client should call close_wait() to finish channel's 
close.
-    // We split one close operation into two phases in order to make multiple 
channels
-    // can run parallel.
-    Status close(RuntimeState* state);
+    Status send_local_block(Status exec_status, bool eos = false);
+
+    Status send_local_block(Block* block, bool can_be_moved);
+
+    // Get close wait's response, to finish channel close operation.
+    Status close_wait(RuntimeState* state);
+
+    int64_t num_data_bytes_sent() const { return _num_data_bytes_sent; }
+
+    PBlock* ch_cur_pb_block() { return _ch_cur_pb_block; }
 
     std::string get_fragment_instance_id_str() {
-        UniqueId uid(_fragment_instance_id);
+        UniqueId uid(_dest_fragment_instance_id);
         return uid.to_string();
     }
 
     bool is_local() const { return _is_local; }
 
+    virtual void ch_roll_pb_block();
+
     bool is_receiver_eof() const { return 
_receiver_status.is<ErrorCode::END_OF_FILE>(); }
 
     void set_receiver_eof(Status st) { _receiver_status = st; }
 
-    int64_t mem_usage() const;
-
-    // Asynchronously sends a block
-    // Returns the status of the most recently finished transmit_data
-    // rpc (or OK if there wasn't one that hasn't been reported yet).
-    // if batch is nullptr, send the eof packet
-    Status send_remote_block(std::unique_ptr<PBlock>&& block, bool eos = 
false);
-    Status send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block, 
bool eos = false);
-
-    Status add_rows(Block* block, const std::vector<uint32_t>& rows, bool eos) 
{
-        if (_fragment_instance_id.lo == -1) {
-            return Status::OK();
-        }
-
-        bool serialized = false;
-        if (_pblock == nullptr) {
-            _pblock = std::make_unique<PBlock>();
-        }
-        RETURN_IF_ERROR(_serializer.next_serialized_block(block, 
_pblock.get(), 1, &serialized, eos,
-                                                          &rows));
-        if (serialized) {
-            RETURN_IF_ERROR(_send_current_block(eos));
-        }
-
-        return Status::OK();
-    }
-
-    void register_exchange_buffer(pipeline::ExchangeSinkBuffer* buffer) {
-        _buffer = buffer;
-        _buffer->register_sink(_fragment_instance_id);
-    }
-
-    std::shared_ptr<pipeline::ExchangeSendCallback<PTransmitDataResult>> 
get_send_callback(
-            InstanceLoId id, bool eos) {
-        if (!_send_callback) {
-            _send_callback = 
pipeline::ExchangeSendCallback<PTransmitDataResult>::create_shared();
-        } else {
-            _send_callback->cntl_->Reset();
+protected:
+    bool _recvr_is_valid() {
+        if (_local_recvr && !_local_recvr->is_closed()) {
+            return true;

Review Comment:
   warning: redundant boolean literal in conditional return statement 
[readability-simplify-boolean-expr]
   ```cpp
               return true;
                      ^
   ```
   



##########
be/src/vec/sink/vdata_stream_sender.h:
##########
@@ -119,106 +131,98 @@
     Status init(RuntimeState* state);
     Status open(RuntimeState* state);
 
-    Status send_local_block(Block* block, bool eos, bool can_be_moved);
-    // Flush buffered rows and close channel. This function don't wait the 
response
-    // of close operation, client should call close_wait() to finish channel's 
close.
-    // We split one close operation into two phases in order to make multiple 
channels
-    // can run parallel.
-    Status close(RuntimeState* state);
+    Status send_local_block(Status exec_status, bool eos = false);
+
+    Status send_local_block(Block* block, bool can_be_moved);
+
+    // Get close wait's response, to finish channel close operation.
+    Status close_wait(RuntimeState* state);
+
+    int64_t num_data_bytes_sent() const { return _num_data_bytes_sent; }
+
+    PBlock* ch_cur_pb_block() { return _ch_cur_pb_block; }
 
     std::string get_fragment_instance_id_str() {
-        UniqueId uid(_fragment_instance_id);
+        UniqueId uid(_dest_fragment_instance_id);
         return uid.to_string();
     }
 
     bool is_local() const { return _is_local; }
 
+    virtual void ch_roll_pb_block();
+
     bool is_receiver_eof() const { return 
_receiver_status.is<ErrorCode::END_OF_FILE>(); }
 
     void set_receiver_eof(Status st) { _receiver_status = st; }
 
-    int64_t mem_usage() const;
-
-    // Asynchronously sends a block
-    // Returns the status of the most recently finished transmit_data
-    // rpc (or OK if there wasn't one that hasn't been reported yet).
-    // if batch is nullptr, send the eof packet
-    Status send_remote_block(std::unique_ptr<PBlock>&& block, bool eos = 
false);
-    Status send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block, 
bool eos = false);
-
-    Status add_rows(Block* block, const std::vector<uint32_t>& rows, bool eos) 
{
-        if (_fragment_instance_id.lo == -1) {
-            return Status::OK();
-        }
-
-        bool serialized = false;
-        if (_pblock == nullptr) {
-            _pblock = std::make_unique<PBlock>();
-        }
-        RETURN_IF_ERROR(_serializer.next_serialized_block(block, 
_pblock.get(), 1, &serialized, eos,
-                                                          &rows));
-        if (serialized) {
-            RETURN_IF_ERROR(_send_current_block(eos));
-        }
-
-        return Status::OK();
-    }
-
-    void register_exchange_buffer(pipeline::ExchangeSinkBuffer* buffer) {
-        _buffer = buffer;
-        _buffer->register_sink(_fragment_instance_id);
-    }
-
-    std::shared_ptr<pipeline::ExchangeSendCallback<PTransmitDataResult>> 
get_send_callback(
-            InstanceLoId id, bool eos) {
-        if (!_send_callback) {
-            _send_callback = 
pipeline::ExchangeSendCallback<PTransmitDataResult>::create_shared();
-        } else {
-            _send_callback->cntl_->Reset();
+protected:
+    bool _recvr_is_valid() {
+        if (_local_recvr && !_local_recvr->is_closed()) {
+            return true;
         }
-        _send_callback->init(id, eos);
-        return _send_callback;
+        _receiver_status = Status::EndOfFile(
+                "local data stream receiver closed"); // local data stream 
receiver closed
+        return false;
     }
 
-    std::shared_ptr<pipeline::Dependency> get_local_channel_dependency();
-
-protected:
-    Status _send_local_block(bool eos);
-    Status _send_current_block(bool eos);
-
-    Status _recvr_status() const {
-        if (_local_recvr && !_local_recvr->is_closed()) {
+    Status _wait_last_brpc() {
+        SCOPED_TIMER(_parent->brpc_wait_timer());
+        if (_send_remote_block_callback == nullptr) {
             return Status::OK();
         }
-        return Status::EndOfFile(
-                "local data stream receiver closed"); // local data stream 
receiver closed
+        _send_remote_block_callback->join();
+        if (_send_remote_block_callback->cntl_->Failed()) {
+            std::string err = fmt::format(
+                    "failed to send brpc batch, error={}, error_text={}, 
client: {}, "
+                    "latency = {}",
+                    berror(_send_remote_block_callback->cntl_->ErrorCode()),
+                    _send_remote_block_callback->cntl_->ErrorText(),
+                    BackendOptions::get_localhost(),
+                    _send_remote_block_callback->cntl_->latency_us());
+            LOG(WARNING) << err;
+            return Status::RpcError(err);
+        }
+        _receiver_status = 
Status::create(_send_remote_block_callback->response_->status());
+        return _receiver_status;
     }
 
-    pipeline::ExchangeSinkLocalState* _parent = nullptr;
+    Parent* _parent = nullptr;
 
-    const TUniqueId _fragment_instance_id;
+    const RowDescriptor& _row_desc;
+    const TUniqueId _dest_fragment_instance_id;
     PlanNodeId _dest_node_id;
-    bool _closed {false};
-    bool _need_close {false};
+
+    // the number of RowBatch.data bytes sent successfully
+    int64_t _num_data_bytes_sent {};
+    int64_t _packet_seq {};
+
+    bool _need_close;
+    bool _closed;

Review Comment:
   warning: use default member initializer for '_closed' 
[modernize-use-default-member-init]
   
   be/src/vec/sink/vdata_stream_sender.h:115:
   ```diff
   -               _closed(false),
   +               ,
   ```
   
   ```suggestion
       bool _closed{false};
   ```
   



##########
be/src/vec/sink/vdata_stream_sender.h:
##########
@@ -230,5 +234,88 @@
         }                                                \
     } while (0)
 
+class PipChannel final : public Channel<pipeline::ExchangeSinkLocalState> {
+public:
+    PipChannel(pipeline::ExchangeSinkLocalState* parent, const RowDescriptor& 
row_desc,
+               const TNetworkAddress& brpc_dest, const TUniqueId& 
dest_fragment_instance_id,
+               PlanNodeId dest_node_id)
+            : Channel<pipeline::ExchangeSinkLocalState>(parent, row_desc, 
brpc_dest,
+                                                        
dest_fragment_instance_id, dest_node_id) {
+        ch_roll_pb_block();
+    }
+
+    ~PipChannel() override { delete 
Channel<pipeline::ExchangeSinkLocalState>::_ch_cur_pb_block; }

Review Comment:
   warning: use '= default' to define a trivial destructor 
[modernize-use-equals-default]
   ```cpp
       ~PipChannel() override { delete 
Channel<pipeline::ExchangeSinkLocalState>::_ch_cur_pb_block; }
       ^
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to