This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new db2721915e1 [Bug](runtime-filter) release dependency when rf rpc failed or meet error status (#36297) db2721915e1 is described below commit db2721915e1eb8b94a65fec11e6728b4b7ab08aa Author: Pxl <pxl...@qq.com> AuthorDate: Fri Jun 14 23:44:08 2024 +0800 [Bug](runtime-filter) release dependency when rf rpc failed or meet error status (#36297) pick from #36126 --- be/src/exprs/runtime_filter.cpp | 30 ++++++++++++++++++++++++++---- be/src/util/ref_count_closure.h | 32 ++++++++++++++++++++------------ 2 files changed, 46 insertions(+), 16 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 3e07943c45e..39eb814bbea 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -31,6 +31,7 @@ #include <memory> #include <mutex> #include <ostream> +#include <utility> #include "agent/be_exec_version_manager.h" #include "common/logging.h" @@ -1029,6 +1030,30 @@ Status IRuntimeFilter::publish(bool publish_local) { return Status::OK(); } +class SyncSizeClosure : public AutoReleaseClosure<PSendFilterSizeRequest, + DummyBrpcCallback<PSendFilterSizeResponse>> { + std::shared_ptr<pipeline::Dependency> _dependency; + using Base = + AutoReleaseClosure<PSendFilterSizeRequest, DummyBrpcCallback<PSendFilterSizeResponse>>; + ENABLE_FACTORY_CREATOR(SyncSizeClosure); + + void _process_if_rpc_failed() override { + ((pipeline::CountedFinishDependency*)_dependency.get())->sub(); + Base::_process_if_rpc_failed(); + } + + void _process_if_meet_error_status(const Status& status) override { + ((pipeline::CountedFinishDependency*)_dependency.get())->sub(); + Base::_process_if_meet_error_status(status); + } + +public: + SyncSizeClosure(std::shared_ptr<PSendFilterSizeRequest> req, + std::shared_ptr<DummyBrpcCallback<PSendFilterSizeResponse>> callback, + std::shared_ptr<pipeline::Dependency> dependency) + : Base(req, callback), _dependency(std::move(dependency)) {} +}; + Status IRuntimeFilter::send_filter_size(uint64_t local_filter_size) { DCHECK(is_producer()); @@ -1069,10 +1094,7 @@ Status IRuntimeFilter::send_filter_size(uint64_t local_filter_size) { auto request = std::make_shared<PSendFilterSizeRequest>(); auto callback = DummyBrpcCallback<PSendFilterSizeResponse>::create_shared(); - auto closure = - AutoReleaseClosure<PSendFilterSizeRequest, - DummyBrpcCallback<PSendFilterSizeResponse>>::create_unique(request, - callback); + auto closure = SyncSizeClosure::create_unique(request, callback, _dependency); auto* pquery_id = request->mutable_query_id(); pquery_id->set_hi(_state->query_id.hi()); pquery_id->set_lo(_state->query_id.lo()); diff --git a/be/src/util/ref_count_closure.h b/be/src/util/ref_count_closure.h index 7bbcfb7da39..01e523d9b9a 100644 --- a/be/src/util/ref_count_closure.h +++ b/be/src/util/ref_count_closure.h @@ -71,16 +71,6 @@ public: template <typename T> concept HasStatus = requires(T* response) { response->status(); }; -template <typename Response> -void process_status(Response* response) {} - -template <HasStatus Response> -void process_status(Response* response) { - if (auto status = Status::create(response->status()); !status) { - LOG(WARNING) << "RPC meet error status: " << status; - } -} - template <typename Request, typename Callback> class AutoReleaseClosure : public google::protobuf::Closure { using Weak = typename std::shared_ptr<Callback>::weak_type; @@ -105,9 +95,9 @@ public: tmp->call(); } if (cntl_->Failed()) { - LOG(WARNING) << "RPC meet failed: " << cntl_->ErrorText(); + _process_if_rpc_failed(); } else { - process_status<ResponseType>(response_.get()); + _process_status<ResponseType>(response_.get()); } } @@ -120,7 +110,25 @@ public: std::shared_ptr<Request> request_; std::shared_ptr<ResponseType> response_; +protected: + virtual void _process_if_rpc_failed() { + LOG(WARNING) << "RPC meet failed: " << cntl_->ErrorText(); + } + + virtual void _process_if_meet_error_status(const Status& status) { + LOG(WARNING) << "RPC meet error status: " << status; + } + private: + template <typename Response> + void _process_status(Response* response) {} + + template <HasStatus Response> + void _process_status(Response* response) { + if (auto status = Status::create(response->status()); !status) { + _process_if_meet_error_status(status); + } + } // Use a weak ptr to keep the callback, so that the callback can be deleted if the main // thread is freed. Weak callback_; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org