This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 66e591f7f25 [enhancement](brpc) add a auto release closure to ensure the closue safety (#26567) 66e591f7f25 is described below commit 66e591f7f25464b51f43ab4c57f9889b6afb44eb Author: yiguolei <676222...@qq.com> AuthorDate: Thu Nov 9 08:50:42 2023 +0800 [enhancement](brpc) add a auto release closure to ensure the closue safety (#26567) --- be/src/olap/delta_writer.cpp | 56 +++++++++++++--------------- be/src/util/ref_count_closure.h | 81 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 107 insertions(+), 30 deletions(-) diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 23e1718cb7d..3e91e5efb06 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -243,21 +243,20 @@ void DeltaWriter::_request_slave_tablet_pull_rowset(PNodeInfo node_info) { } } - PTabletWriteSlaveRequest request; - RowsetMetaPB rowset_meta_pb = cur_rowset->rowset_meta()->get_rowset_pb(); - request.set_allocated_rowset_meta(&rowset_meta_pb); - request.set_host(BackendOptions::get_localhost()); - request.set_http_port(config::webserver_port); + auto request = std::make_shared<PTabletWriteSlaveRequest>(); + *(request->mutable_rowset_meta()) = cur_rowset->rowset_meta()->get_rowset_pb(); + request->set_host(BackendOptions::get_localhost()); + request->set_http_port(config::webserver_port); string tablet_path = _rowset_builder.tablet()->tablet_path(); - request.set_rowset_path(tablet_path); - request.set_token(ExecEnv::GetInstance()->token()); - request.set_brpc_port(config::brpc_port); - request.set_node_id(node_info.id()); + request->set_rowset_path(tablet_path); + request->set_token(ExecEnv::GetInstance()->token()); + request->set_brpc_port(config::brpc_port); + request->set_node_id(node_info.id()); for (int segment_id = 0; segment_id < cur_rowset->rowset_meta()->num_segments(); segment_id++) { std::stringstream segment_name; segment_name << cur_rowset->rowset_id() << "_" << segment_id << ".dat"; int64_t segment_size = std::filesystem::file_size(tablet_path + "/" + segment_name.str()); - request.mutable_segments_size()->insert({segment_id, segment_size}); + request->mutable_segments_size()->insert({segment_id, segment_size}); if (!indices_ids.empty()) { for (auto index_id : indices_ids) { @@ -269,41 +268,38 @@ void DeltaWriter::_request_slave_tablet_pull_rowset(PNodeInfo node_info) { index_size.set_size(size); // Fetch the map value for the current segment_id. // If it doesn't exist, this will insert a new default-constructed IndexSizeMapValue - auto& index_size_map_value = (*request.mutable_inverted_indices_size())[segment_id]; + auto& index_size_map_value = + (*(request->mutable_inverted_indices_size()))[segment_id]; // Add the new index size to the map value. *index_size_map_value.mutable_index_sizes()->Add() = std::move(index_size); } } } - RefCountClosure<PTabletWriteSlaveResult>* closure = - new RefCountClosure<PTabletWriteSlaveResult>(); - closure->ref(); - closure->ref(); - closure->cntl.set_timeout_ms(config::slave_replica_writer_rpc_timeout_sec * 1000); - closure->cntl.ignore_eovercrowded(); - stub->request_slave_tablet_pull_rowset(&closure->cntl, &request, &closure->result, closure); - static_cast<void>(request.release_rowset_meta()); - - closure->join(); - if (closure->cntl.Failed()) { + + auto pull_callback = DummyBrpcCallback<PTabletWriteSlaveResult>::create_shared(); + auto closure = AutoReleaseClosure< + PTabletWriteSlaveRequest, + DummyBrpcCallback<PTabletWriteSlaveResult>>::create_unique(request, pull_callback); + closure->cntl_->set_timeout_ms(config::slave_replica_writer_rpc_timeout_sec * 1000); + closure->cntl_->ignore_eovercrowded(); + stub->request_slave_tablet_pull_rowset(closure->cntl_.get(), closure->request_.get(), + closure->response_.get(), closure.release()); + + pull_callback->join(); + if (pull_callback->cntl_->Failed()) { if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->available( stub, node_info.host(), node_info.async_internal_port())) { ExecEnv::GetInstance()->brpc_internal_client_cache()->erase( - closure->cntl.remote_side()); + pull_callback->cntl_->remote_side()); } LOG(WARNING) << "failed to send pull rowset request to slave replica, error=" - << berror(closure->cntl.ErrorCode()) - << ", error_text=" << closure->cntl.ErrorText() + << berror(pull_callback->cntl_->ErrorCode()) + << ", error_text=" << pull_callback->cntl_->ErrorText() << ". slave host: " << node_info.host() << ", tablet_id=" << _req.tablet_id << ", txn_id=" << _req.txn_id; std::lock_guard<std::shared_mutex> lock(_slave_node_lock); _unfinished_slave_node.erase(node_info.id()); } - - if (closure->unref()) { - delete closure; - } - closure = nullptr; } void DeltaWriter::finish_slave_tablet_pull_rowset(int64_t node_id, bool is_succeed) { diff --git a/be/src/util/ref_count_closure.h b/be/src/util/ref_count_closure.h index d2fbd2fd14e..d844a7fc820 100644 --- a/be/src/util/ref_count_closure.h +++ b/be/src/util/ref_count_closure.h @@ -54,4 +54,85 @@ private: std::atomic<int> _refs; }; +template <typename Response> +class DummyBrpcCallback { + ENABLE_FACTORY_CREATOR(DummyBrpcCallback); + +public: + using ResponseType = Response; + DummyBrpcCallback() { + cntl_ = std::make_shared<brpc::Controller>(); + response_ = std::make_shared<Response>(); + } + + void call() {} + + void join() { brpc::Join(cntl_->call_id()); } + + // controller has to be the same lifecycle with the closure, because brpc may use + // it in any stage of the rpc. + std::shared_ptr<brpc::Controller> cntl_; + // We do not know if brpc will use request or response after brpc method returns. + // So that we need keep a shared ptr here to ensure that brpc could use req/rep + // at any stage. + std::shared_ptr<Response> response_; +}; + +// The closure will be deleted after callback. +// It could only be created by using shared ptr or unique ptr. +// It will hold a weak ptr of T and call run of T +// Callback() { +// xxxx; +// public +// void run() { +// logxxx +// } +// } +// +// std::shared_ptr<Callback> b; +// +// std::unique_ptr<AutoReleaseClosure> a(b); +// brpc_call(a.release()); + +template <typename Request, typename Callback> +class AutoReleaseClosure : public google::protobuf::Closure { + using Weak = typename std::shared_ptr<Callback>::weak_type; + using ResponseType = typename Callback::ResponseType; + ENABLE_FACTORY_CREATOR(AutoReleaseClosure); + +public: + AutoReleaseClosure(std::shared_ptr<Request> req, std::shared_ptr<Callback> callback) + : callback_(callback) { + this->cntl_ = callback->cntl_; + this->response_ = callback->response_; + } + + ~AutoReleaseClosure() override = default; + + // Will delete itself + void Run() override { + SCOPED_TRACK_MEMORY_TO_UNKNOWN(); + Defer defer {[&]() { delete this; }}; + // If lock failed, it means the callback object is deconstructed, then no need + // to deal with the callback any more. + if (auto tmp = callback_.lock()) { + tmp->call(); + } + } + + // controller has to be the same lifecycle with the closure, because brpc may use + // it in any stage of the rpc. + std::shared_ptr<brpc::Controller> cntl_; + // We do not know if brpc will use request or response after brpc method returns. + // So that we need keep a shared ptr here to ensure that brpc could use req/rep + // at any stage. + std::shared_ptr<Request> request_; + std::shared_ptr<ResponseType> response_; + +private: + // Use a weak ptr to keep the callback, so that the callback can be deleted if the main + // thread is freed. + Weak callback_; +}; + } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org