This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 24eff502145 [Chore](exchange) remove some unused code of Channel 
(#42095)
24eff502145 is described below

commit 24eff50214594d886e9a6a7c7f5bfeaf9665546c
Author: Pxl <pxl...@qq.com>
AuthorDate: Mon Oct 21 14:03:10 2024 +0800

    [Chore](exchange) remove some unused code of Channel (#42095)
    
    ## Proposed changes
    remove some unused code of Channel
---
 be/src/vec/sink/vdata_stream_sender.cpp | 86 +--------------------------------
 be/src/vec/sink/vdata_stream_sender.h   | 41 +++++-----------
 2 files changed, 14 insertions(+), 113 deletions(-)

diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index 496e68c97f0..b139c503c9a 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -174,21 +174,6 @@ Status PipChannel::send_current_block(bool eos, Status 
exec_status) {
     return Status::OK();
 }
 
-template <typename Parent>
-Status Channel<Parent>::send_current_block(bool eos, Status exec_status) {
-    // FIXME: Now, local exchange will cause the performance problem is in a 
multi-threaded scenario
-    // so this feature is turned off here by default. We need to re-examine 
this logic
-    if (is_local()) {
-        return send_local_block(exec_status, eos);
-    }
-    if (eos) {
-        RETURN_IF_ERROR(_serializer.serialize_block(_ch_cur_pb_block, 1));
-    }
-    RETURN_IF_ERROR(send_remote_block(_ch_cur_pb_block, eos, exec_status));
-    ch_roll_pb_block();
-    return Status::OK();
-}
-
 template <typename Parent>
 Status Channel<Parent>::send_local_block(Status exec_status, bool eos) {
     SCOPED_TIMER(_parent->local_send_timer());
@@ -228,71 +213,6 @@ Status Channel<Parent>::send_local_block(Block* block, 
bool can_be_moved) {
     }
 }
 
-template <typename Parent>
-Status Channel<Parent>::send_remote_block(PBlock* block, bool eos, Status 
exec_status) {
-    if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) 
{
-        COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
-    }
-    SCOPED_TIMER(_parent->brpc_send_timer());
-
-    if (_send_remote_block_callback == nullptr) {
-        _send_remote_block_callback = 
DummyBrpcCallback<PTransmitDataResult>::create_shared();
-    } else {
-        RETURN_IF_ERROR(_wait_last_brpc());
-        _send_remote_block_callback->cntl_->Reset();
-    }
-    VLOG_ROW << "Channel<Parent>::send_batch() instance_id=" << 
print_id(_fragment_instance_id)
-             << " dest_node=" << _dest_node_id << " to_host=" << 
_brpc_dest_addr.hostname
-             << " _packet_seq=" << _packet_seq << " row_desc=" << 
_row_desc.debug_string();
-
-    _brpc_request->set_eos(eos);
-    if (!exec_status.ok()) {
-        exec_status.to_protobuf(_brpc_request->mutable_exec_status());
-    }
-    if (block != nullptr && !block->column_metas().empty()) {
-        _brpc_request->set_allocated_block(block);
-    }
-    _brpc_request->set_packet_seq(_packet_seq++);
-
-    _send_remote_block_callback->cntl_->set_timeout_ms(_brpc_timeout_ms);
-    if (config::exchange_sink_ignore_eovercrowded) {
-        _send_remote_block_callback->cntl_->ignore_eovercrowded();
-    }
-
-    {
-        auto send_remote_block_closure =
-                AutoReleaseClosure<PTransmitDataParams, 
DummyBrpcCallback<PTransmitDataResult>>::
-                        create_unique(_brpc_request, 
_send_remote_block_callback);
-        if (enable_http_send_block(*_brpc_request)) {
-            RETURN_IF_ERROR(transmit_block_httpv2(
-                    _state->exec_env(), std::move(send_remote_block_closure), 
_brpc_dest_addr));
-        } else {
-            transmit_blockv2(*_brpc_stub, 
std::move(send_remote_block_closure));
-        }
-    }
-
-    if (block != nullptr) {
-        static_cast<void>(_brpc_request->release_block());
-    }
-    return Status::OK();
-}
-
-template <typename Parent>
-Status Channel<Parent>::add_rows(Block* block, const std::vector<uint32_t>& 
rows, bool eos) {
-    if (_fragment_instance_id.lo == -1) {
-        return Status::OK();
-    }
-
-    bool serialized = false;
-    RETURN_IF_ERROR(
-            _serializer.next_serialized_block(block, _ch_cur_pb_block, 1, 
&serialized, eos, &rows));
-    if (serialized) {
-        RETURN_IF_ERROR(send_current_block(false, Status::OK()));
-    }
-
-    return Status::OK();
-}
-
 template <typename Parent>
 Status Channel<Parent>::close_wait(RuntimeState* state) {
     if (_need_close) {
@@ -309,8 +229,7 @@ Status Channel<Parent>::close_wait(RuntimeState* state) {
     return Status::OK();
 }
 
-template <typename Parent>
-Status Channel<Parent>::close_internal(Status exec_status) {
+Status PipChannel::close_internal(Status exec_status) {
     if (!_need_close) {
         return Status::OK();
     }
@@ -343,8 +262,7 @@ Status Channel<Parent>::close_internal(Status exec_status) {
     }
 }
 
-template <typename Parent>
-Status Channel<Parent>::close(RuntimeState* state, Status exec_status) {
+Status PipChannel::close(RuntimeState* state, Status exec_status) {
     if (_closed) {
         return Status::OK();
     }
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index 43d00b0164a..b0b0e0dc182 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -138,30 +138,9 @@ public:
     Status init(RuntimeState* state);
     Status open(RuntimeState* state);
 
-    // Asynchronously sends a row batch.
-    // Returns the status of the most recently finished transmit_data
-    // rpc (or OK if there wasn't one that hasn't been reported yet).
-    // if batch is nullptr, send the eof packet
-    virtual Status send_remote_block(PBlock* block, bool eos = false,
-                                     Status exec_status = Status::OK());
-
-    virtual Status 
send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block,
-                                        bool eos = false) {
-        return Status::InternalError("Send BroadcastPBlockHolder is not 
allowed!");
-    }
-
-    virtual Status add_rows(Block* block, const std::vector<uint32_t>& row, 
bool eos);
-
-    virtual Status send_current_block(bool eos, Status exec_status);
-
     Status send_local_block(Status exec_status, bool eos = false);
 
     Status send_local_block(Block* block, bool can_be_moved);
-    // Flush buffered rows and close channel. This function don't wait the 
response
-    // of close operation, client should call close_wait() to finish channel's 
close.
-    // We split one close operation into two phases in order to make multiple 
channels
-    // can run parallel.
-    Status close(RuntimeState* state, Status exec_status);
 
     // Get close wait's response, to finish channel close operation.
     Status close_wait(RuntimeState* state);
@@ -214,8 +193,6 @@ protected:
         return _receiver_status;
     }
 
-    Status close_internal(Status exec_status);
-
     Parent* _parent = nullptr;
 
     const RowDescriptor& _row_desc;
@@ -288,17 +265,23 @@ public:
         Channel<pipeline::ExchangeSinkLocalState>::_ch_cur_pb_block = new 
PBlock();
     }
 
+    // Flush buffered rows and close channel. This function don't wait the 
response
+    // of close operation, client should call close_wait() to finish channel's 
close.
+    // We split one close operation into two phases in order to make multiple 
channels
+    // can run parallel.
+    Status close(RuntimeState* state, Status exec_status);
+
+    Status close_internal(Status exec_status);
+
     // Asynchronously sends a block
     // Returns the status of the most recently finished transmit_data
     // rpc (or OK if there wasn't one that hasn't been reported yet).
     // if batch is nullptr, send the eof packet
-    Status send_remote_block(PBlock* block, bool eos = false,
-                             Status exec_status = Status::OK()) override;
+    Status send_remote_block(PBlock* block, bool eos = false, Status 
exec_status = Status::OK());
 
-    Status send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block,
-                                bool eos = false) override;
+    Status send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block, 
bool eos = false);
 
-    Status add_rows(Block* block, const std::vector<uint32_t>& rows, bool eos) 
override {
+    Status add_rows(Block* block, const std::vector<uint32_t>& rows, bool eos) 
{
         if 
(Channel<pipeline::ExchangeSinkLocalState>::_fragment_instance_id.lo == -1) {
             return Status::OK();
         }
@@ -317,7 +300,7 @@ public:
     }
 
     // send _mutable_block
-    Status send_current_block(bool eos, Status exec_status) override;
+    Status send_current_block(bool eos, Status exec_status);
 
     void register_exchange_buffer(pipeline::ExchangeSinkBuffer* buffer) {
         _buffer = buffer;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to