This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 24eff502145 [Chore](exchange) remove some unused code of Channel (#42095) 24eff502145 is described below commit 24eff50214594d886e9a6a7c7f5bfeaf9665546c Author: Pxl <pxl...@qq.com> AuthorDate: Mon Oct 21 14:03:10 2024 +0800 [Chore](exchange) remove some unused code of Channel (#42095) ## Proposed changes remove some unused code of Channel --- be/src/vec/sink/vdata_stream_sender.cpp | 86 +-------------------------------- be/src/vec/sink/vdata_stream_sender.h | 41 +++++----------- 2 files changed, 14 insertions(+), 113 deletions(-) diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 496e68c97f0..b139c503c9a 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -174,21 +174,6 @@ Status PipChannel::send_current_block(bool eos, Status exec_status) { return Status::OK(); } -template <typename Parent> -Status Channel<Parent>::send_current_block(bool eos, Status exec_status) { - // FIXME: Now, local exchange will cause the performance problem is in a multi-threaded scenario - // so this feature is turned off here by default. We need to re-examine this logic - if (is_local()) { - return send_local_block(exec_status, eos); - } - if (eos) { - RETURN_IF_ERROR(_serializer.serialize_block(_ch_cur_pb_block, 1)); - } - RETURN_IF_ERROR(send_remote_block(_ch_cur_pb_block, eos, exec_status)); - ch_roll_pb_block(); - return Status::OK(); -} - template <typename Parent> Status Channel<Parent>::send_local_block(Status exec_status, bool eos) { SCOPED_TIMER(_parent->local_send_timer()); @@ -228,71 +213,6 @@ Status Channel<Parent>::send_local_block(Block* block, bool can_be_moved) { } } -template <typename Parent> -Status Channel<Parent>::send_remote_block(PBlock* block, bool eos, Status exec_status) { - if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) { - COUNTER_UPDATE(_parent->blocks_sent_counter(), 1); - } - SCOPED_TIMER(_parent->brpc_send_timer()); - - if (_send_remote_block_callback == nullptr) { - _send_remote_block_callback = DummyBrpcCallback<PTransmitDataResult>::create_shared(); - } else { - RETURN_IF_ERROR(_wait_last_brpc()); - _send_remote_block_callback->cntl_->Reset(); - } - VLOG_ROW << "Channel<Parent>::send_batch() instance_id=" << print_id(_fragment_instance_id) - << " dest_node=" << _dest_node_id << " to_host=" << _brpc_dest_addr.hostname - << " _packet_seq=" << _packet_seq << " row_desc=" << _row_desc.debug_string(); - - _brpc_request->set_eos(eos); - if (!exec_status.ok()) { - exec_status.to_protobuf(_brpc_request->mutable_exec_status()); - } - if (block != nullptr && !block->column_metas().empty()) { - _brpc_request->set_allocated_block(block); - } - _brpc_request->set_packet_seq(_packet_seq++); - - _send_remote_block_callback->cntl_->set_timeout_ms(_brpc_timeout_ms); - if (config::exchange_sink_ignore_eovercrowded) { - _send_remote_block_callback->cntl_->ignore_eovercrowded(); - } - - { - auto send_remote_block_closure = - AutoReleaseClosure<PTransmitDataParams, DummyBrpcCallback<PTransmitDataResult>>:: - create_unique(_brpc_request, _send_remote_block_callback); - if (enable_http_send_block(*_brpc_request)) { - RETURN_IF_ERROR(transmit_block_httpv2( - _state->exec_env(), std::move(send_remote_block_closure), _brpc_dest_addr)); - } else { - transmit_blockv2(*_brpc_stub, std::move(send_remote_block_closure)); - } - } - - if (block != nullptr) { - static_cast<void>(_brpc_request->release_block()); - } - return Status::OK(); -} - -template <typename Parent> -Status Channel<Parent>::add_rows(Block* block, const std::vector<uint32_t>& rows, bool eos) { - if (_fragment_instance_id.lo == -1) { - return Status::OK(); - } - - bool serialized = false; - RETURN_IF_ERROR( - _serializer.next_serialized_block(block, _ch_cur_pb_block, 1, &serialized, eos, &rows)); - if (serialized) { - RETURN_IF_ERROR(send_current_block(false, Status::OK())); - } - - return Status::OK(); -} - template <typename Parent> Status Channel<Parent>::close_wait(RuntimeState* state) { if (_need_close) { @@ -309,8 +229,7 @@ Status Channel<Parent>::close_wait(RuntimeState* state) { return Status::OK(); } -template <typename Parent> -Status Channel<Parent>::close_internal(Status exec_status) { +Status PipChannel::close_internal(Status exec_status) { if (!_need_close) { return Status::OK(); } @@ -343,8 +262,7 @@ Status Channel<Parent>::close_internal(Status exec_status) { } } -template <typename Parent> -Status Channel<Parent>::close(RuntimeState* state, Status exec_status) { +Status PipChannel::close(RuntimeState* state, Status exec_status) { if (_closed) { return Status::OK(); } diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 43d00b0164a..b0b0e0dc182 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -138,30 +138,9 @@ public: Status init(RuntimeState* state); Status open(RuntimeState* state); - // Asynchronously sends a row batch. - // Returns the status of the most recently finished transmit_data - // rpc (or OK if there wasn't one that hasn't been reported yet). - // if batch is nullptr, send the eof packet - virtual Status send_remote_block(PBlock* block, bool eos = false, - Status exec_status = Status::OK()); - - virtual Status send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block, - bool eos = false) { - return Status::InternalError("Send BroadcastPBlockHolder is not allowed!"); - } - - virtual Status add_rows(Block* block, const std::vector<uint32_t>& row, bool eos); - - virtual Status send_current_block(bool eos, Status exec_status); - Status send_local_block(Status exec_status, bool eos = false); Status send_local_block(Block* block, bool can_be_moved); - // Flush buffered rows and close channel. This function don't wait the response - // of close operation, client should call close_wait() to finish channel's close. - // We split one close operation into two phases in order to make multiple channels - // can run parallel. - Status close(RuntimeState* state, Status exec_status); // Get close wait's response, to finish channel close operation. Status close_wait(RuntimeState* state); @@ -214,8 +193,6 @@ protected: return _receiver_status; } - Status close_internal(Status exec_status); - Parent* _parent = nullptr; const RowDescriptor& _row_desc; @@ -288,17 +265,23 @@ public: Channel<pipeline::ExchangeSinkLocalState>::_ch_cur_pb_block = new PBlock(); } + // Flush buffered rows and close channel. This function don't wait the response + // of close operation, client should call close_wait() to finish channel's close. + // We split one close operation into two phases in order to make multiple channels + // can run parallel. + Status close(RuntimeState* state, Status exec_status); + + Status close_internal(Status exec_status); + // Asynchronously sends a block // Returns the status of the most recently finished transmit_data // rpc (or OK if there wasn't one that hasn't been reported yet). // if batch is nullptr, send the eof packet - Status send_remote_block(PBlock* block, bool eos = false, - Status exec_status = Status::OK()) override; + Status send_remote_block(PBlock* block, bool eos = false, Status exec_status = Status::OK()); - Status send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block, - bool eos = false) override; + Status send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block, bool eos = false); - Status add_rows(Block* block, const std::vector<uint32_t>& rows, bool eos) override { + Status add_rows(Block* block, const std::vector<uint32_t>& rows, bool eos) { if (Channel<pipeline::ExchangeSinkLocalState>::_fragment_instance_id.lo == -1) { return Status::OK(); } @@ -317,7 +300,7 @@ public: } // send _mutable_block - Status send_current_block(bool eos, Status exec_status) override; + Status send_current_block(bool eos, Status exec_status); void register_exchange_buffer(pipeline::ExchangeSinkBuffer* buffer) { _buffer = buffer; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org