This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 3c6676ce2ebc09783d6710943d202efa66a912cd Author: Pxl <pxl...@qq.com> AuthorDate: Mon Jul 24 09:25:32 2023 +0800 [Improvement](pipeline) support send eos on local exchange and remove some unused code (#22086) support send eos on local exchange and remove some unused code --- be/src/pipeline/exec/exchange_source_operator.cpp | 1 - be/src/runtime/fragment_mgr.cpp | 2 +- be/src/service/brpc_service.cpp | 2 +- be/src/service/internal_service.cpp | 17 ++----- be/src/service/internal_service.h | 2 +- be/src/util/proto_util.h | 56 ----------------------- be/src/vec/exec/vexchange_node.h | 1 - be/src/vec/sink/vdata_stream_sender.cpp | 16 +++++-- be/src/vec/sink/vdata_stream_sender.h | 12 ++--- be/test/vec/exec/vtablet_sink_test.cpp | 2 - be/test/vec/runtime/vdata_stream_test.cpp | 2 - 11 files changed, 24 insertions(+), 89 deletions(-) diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp b/be/src/pipeline/exec/exchange_source_operator.cpp index 1dd20f929c..84e4288dce 100644 --- a/be/src/pipeline/exec/exchange_source_operator.cpp +++ b/be/src/pipeline/exec/exchange_source_operator.cpp @@ -32,7 +32,6 @@ bool ExchangeSourceOperator::can_read() { } bool ExchangeSourceOperator::is_pending_finish() const { - // TODO HappenLee return false; } } // namespace doris::pipeline diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index f8e7750151..cfb682b75d 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -138,7 +138,7 @@ public: } // Update status of this fragment execute - Status update_status(Status status) { + Status update_status(const Status& status) { std::lock_guard<std::mutex> l(_status_lock); if (!status.ok() && _exec_status.ok()) { _exec_status = status; diff --git a/be/src/service/brpc_service.cpp b/be/src/service/brpc_service.cpp index 80138b99ee..2406d08b1e 100644 --- a/be/src/service/brpc_service.cpp +++ b/be/src/service/brpc_service.cpp @@ -47,7 +47,7 @@ BRpcService::BRpcService(ExecEnv* exec_env) : _exec_env(exec_env), _server(new b brpc::FLAGS_socket_max_unwritten_bytes = config::brpc_socket_max_unwritten_bytes; } -BRpcService::~BRpcService() {} +BRpcService::~BRpcService() = default; Status BRpcService::start(int port, int num_threads) { // Add service diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index c38416a593..924e829539 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -328,12 +328,7 @@ void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcControll PTabletWriterAddBlockResult* response, google::protobuf::Closure* done) { bool ret = _heavy_work_pool.try_offer([this, controller, request, response, done]() { - // TODO(zxy) delete in 1.2 version - google::protobuf::Closure* new_done = new NewHttpClosure<PTransmitDataParams>(done); - brpc::Controller* cntl = static_cast<brpc::Controller*>(controller); - attachment_transfer_request_block<PTabletWriterAddBlockRequest>(request, cntl); - - _tablet_writer_add_block(controller, request, response, new_done); + _tablet_writer_add_block(controller, request, response, done); }); if (!ret) { LOG(WARNING) << "fail to offer request to the work pool"; @@ -1024,13 +1019,9 @@ void PInternalServiceImpl::transmit_block(google::protobuf::RpcController* contr google::protobuf::Closure* done) { int64_t receive_time = GetCurrentTimeNanos(); response->set_receive_time(receive_time); - bool ret = _heavy_work_pool.try_offer([this, controller, request, response, done]() { - // TODO(zxy) delete in 1.2 version - google::protobuf::Closure* new_done = new NewHttpClosure<PTransmitDataParams>(done); - brpc::Controller* cntl = static_cast<brpc::Controller*>(controller); - attachment_transfer_request_block<PTransmitDataParams>(request, cntl); - - _transmit_block(controller, request, response, new_done, Status::OK()); + PriorityThreadPool& pool = request->has_block() ? _heavy_work_pool : _light_work_pool; + bool ret = pool.try_offer([this, controller, request, response, done]() { + _transmit_block(controller, request, response, done, Status::OK()); }); if (!ret) { LOG(WARNING) << "fail to offer request to the work pool"; diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index 0d568e2b84..823f29504b 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -41,7 +41,7 @@ class PHandShakeResponse; class PInternalServiceImpl : public PBackendService { public: PInternalServiceImpl(ExecEnv* exec_env); - virtual ~PInternalServiceImpl(); + ~PInternalServiceImpl() override; void transmit_data(::google::protobuf::RpcController* controller, const ::doris::PTransmitDataParams* request, diff --git a/be/src/util/proto_util.h b/be/src/util/proto_util.h index 2741092d80..3c583b867c 100644 --- a/be/src/util/proto_util.h +++ b/be/src/util/proto_util.h @@ -86,62 +86,6 @@ Status transmit_block_http(RuntimeState* state, Closure* closure, PTransmitDataP return Status::OK(); } -// TODO(zxy) delete in v1.3 version -// Transfer RowBatch in ProtoBuf Request to Controller Attachment. -// This can avoid reaching the upper limit of the ProtoBuf Request length (2G), -// and it is expected that performance can be improved. -template <typename Params, typename Closure> -void request_row_batch_transfer_attachment(Params* brpc_request, const std::string& tuple_data, - Closure* closure) { - auto row_batch = brpc_request->mutable_row_batch(); - row_batch->set_tuple_data(""); - brpc_request->set_transfer_by_attachment(true); - butil::IOBuf attachment; - attachment.append(tuple_data); - closure->cntl.request_attachment().swap(attachment); -} - -// TODO(zxy) delete in v1.3 version -// Transfer Block in ProtoBuf Request to Controller Attachment. -// This can avoid reaching the upper limit of the ProtoBuf Request length (2G), -// and it is expected that performance can be improved. -template <typename Params, typename Closure> -void request_block_transfer_attachment(Params* brpc_request, const std::string& column_values, - Closure* closure) { - auto block = brpc_request->mutable_block(); - block->set_column_values(""); - brpc_request->set_transfer_by_attachment(true); - butil::IOBuf attachment; - attachment.append(column_values); - closure->cntl.request_attachment().swap(attachment); -} - -// TODO(zxy) delete in v1.3 version -// Controller Attachment transferred to RowBatch in ProtoBuf Request. -template <typename Params> -void attachment_transfer_request_row_batch(const Params* brpc_request, brpc::Controller* cntl) { - Params* req = const_cast<Params*>(brpc_request); - if (req->has_row_batch() && req->transfer_by_attachment()) { - auto rb = req->mutable_row_batch(); - const butil::IOBuf& io_buf = cntl->request_attachment(); - CHECK(io_buf.size() > 0) << io_buf.size() << ", row num: " << req->row_batch().num_rows(); - io_buf.copy_to(rb->mutable_tuple_data(), io_buf.size(), 0); - } -} - -// TODO(zxy) delete in v1.3 version -// Controller Attachment transferred to Block in ProtoBuf Request. -template <typename Params> -void attachment_transfer_request_block(const Params* brpc_request, brpc::Controller* cntl) { - Params* req = const_cast<Params*>(brpc_request); - if (req->has_block() && req->transfer_by_attachment()) { - auto block = req->mutable_block(); - const butil::IOBuf& io_buf = cntl->request_attachment(); - CHECK(io_buf.size() > 0) << io_buf.size(); - io_buf.copy_to(block->mutable_column_values(), io_buf.size(), 0); - } -} - template <typename Params, typename Closure> Status request_embed_attachment(Params* brpc_request, const std::string& data, Closure* closure) { butil::IOBuf attachment; diff --git a/be/src/vec/exec/vexchange_node.h b/be/src/vec/exec/vexchange_node.h index 58b61dc9af..c4f083dda4 100644 --- a/be/src/vec/exec/vexchange_node.h +++ b/be/src/vec/exec/vexchange_node.h @@ -58,7 +58,6 @@ public: Status collect_query_statistics(QueryStatistics* statistics) override; Status close(RuntimeState* state) override; - // Status collect_query_statistics(QueryStatistics* statistics) override; void set_num_senders(int num_senders) { _num_senders = num_senders; } private: diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index b49c6ec92e..82050276d2 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -134,7 +134,7 @@ Status Channel::send_local_block(bool eos) { return Status::OK(); } else { _mutable_block.reset(); - return receiver_status_; + return _receiver_status; } } @@ -147,7 +147,7 @@ Status Channel::send_local_block(Block* block) { _local_recvr->add_block(block, _parent->_sender_id, false); return Status::OK(); } else { - return receiver_status_; + return _receiver_status; } } @@ -256,8 +256,8 @@ Status Channel::close_internal() { VLOG_RPC << "Channel::close() instance_id=" << _fragment_instance_id << " dest_node=" << _dest_node_id << " #rows= " << ((_mutable_block == nullptr) ? 0 : _mutable_block->rows()) - << " receiver status: " << receiver_status_; - if (receiver_status_.is<ErrorCode::END_OF_FILE>()) { + << " receiver status: " << _receiver_status; + if (is_receiver_eof()) { _mutable_block.reset(); return Status::OK(); } @@ -266,7 +266,13 @@ Status Channel::close_internal() { status = send_current_block(true); } else { SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get()); - status = send_block((PBlock*)nullptr, true); + if (is_local()) { + if (_recvr_is_valid()) { + _local_recvr->remove_sender(_parent->_sender_id, _be_number); + } + } else { + status = send_block((PBlock*)nullptr, true); + } } // Don't wait for the last packet to finish, left it to close_wait. if (status.is<ErrorCode::END_OF_FILE>()) { diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 2fdecac6cb..2512a21475 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -294,16 +294,16 @@ public: _local_recvr->sender_queue_empty(_parent->_sender_id); } - bool is_receiver_eof() const { return receiver_status_.is<ErrorCode::END_OF_FILE>(); } + bool is_receiver_eof() const { return _receiver_status.is<ErrorCode::END_OF_FILE>(); } - void set_receiver_eof(Status st) { receiver_status_ = st; } + void set_receiver_eof(Status st) { _receiver_status = st; } protected: bool _recvr_is_valid() { if (_local_recvr && !_local_recvr->is_closed()) { return true; } - receiver_status_ = Status::EndOfFile("local data stream receiver closed"); + _receiver_status = Status::EndOfFile("local data stream receiver closed"); return false; } @@ -315,7 +315,7 @@ protected: auto cntl = &_closure->cntl; auto call_id = _closure->cntl.call_id(); brpc::Join(call_id); - receiver_status_ = Status::create(_closure->result.status()); + _receiver_status = Status::create(_closure->result.status()); if (cntl->Failed()) { std::string err = fmt::format( "failed to send brpc batch, error={}, error_text={}, client: {}, " @@ -325,7 +325,7 @@ protected: LOG(WARNING) << err; return Status::RpcError(err); } - return receiver_status_; + return _receiver_status; } // Serialize _batch into _thrift_batch and send via send_batch(). @@ -358,7 +358,7 @@ protected: PTransmitDataParams _brpc_request; std::shared_ptr<PBackendService_Stub> _brpc_stub = nullptr; RefCountClosure<PTransmitDataResult>* _closure = nullptr; - Status receiver_status_; + Status _receiver_status; int32_t _brpc_timeout_ms = 500; // whether the dest can be treated as query statistics transfer chain. bool _is_transfer_chain; diff --git a/be/test/vec/exec/vtablet_sink_test.cpp b/be/test/vec/exec/vtablet_sink_test.cpp index 15ee6b79a2..5e60181c8f 100644 --- a/be/test/vec/exec/vtablet_sink_test.cpp +++ b/be/test/vec/exec/vtablet_sink_test.cpp @@ -316,8 +316,6 @@ public: k_add_batch_status.to_protobuf(response->mutable_status()); if (request->has_block() && _row_desc != nullptr) { - brpc::Controller* cntl = static_cast<brpc::Controller*>(controller); - attachment_transfer_request_block<PTabletWriterAddBlockRequest>(request, cntl); vectorized::Block block(request->block()); for (size_t row_num = 0; row_num < block.rows(); ++row_num) { diff --git a/be/test/vec/runtime/vdata_stream_test.cpp b/be/test/vec/runtime/vdata_stream_test.cpp index 4c42bde016..954bf8f194 100644 --- a/be/test/vec/runtime/vdata_stream_test.cpp +++ b/be/test/vec/runtime/vdata_stream_test.cpp @@ -74,8 +74,6 @@ public: const ::doris::PTransmitDataParams* request, ::doris::PTransmitDataResult* response, ::google::protobuf::Closure* done) { // stream_mgr->transmit_block(request, &done); - brpc::Controller* cntl = static_cast<brpc::Controller*>(controller); - attachment_transfer_request_block<PTransmitDataParams>(request, cntl); // The response is accessed when done->Run is called in transmit_block(), // give response a default value to avoid null pointers in high concurrency. Status st; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org