This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new db2721915e1 [Bug](runtime-filter) release dependency when rf rpc 
failed or meet error status (#36297)
db2721915e1 is described below

commit db2721915e1eb8b94a65fec11e6728b4b7ab08aa
Author: Pxl <pxl...@qq.com>
AuthorDate: Fri Jun 14 23:44:08 2024 +0800

    [Bug](runtime-filter) release dependency when rf rpc failed or meet error 
status (#36297)
    
    pick from #36126
---
 be/src/exprs/runtime_filter.cpp | 30 ++++++++++++++++++++++++++----
 be/src/util/ref_count_closure.h | 32 ++++++++++++++++++++------------
 2 files changed, 46 insertions(+), 16 deletions(-)

diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 3e07943c45e..39eb814bbea 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -31,6 +31,7 @@
 #include <memory>
 #include <mutex>
 #include <ostream>
+#include <utility>
 
 #include "agent/be_exec_version_manager.h"
 #include "common/logging.h"
@@ -1029,6 +1030,30 @@ Status IRuntimeFilter::publish(bool publish_local) {
     return Status::OK();
 }
 
+class SyncSizeClosure : public AutoReleaseClosure<PSendFilterSizeRequest,
+                                                  
DummyBrpcCallback<PSendFilterSizeResponse>> {
+    std::shared_ptr<pipeline::Dependency> _dependency;
+    using Base =
+            AutoReleaseClosure<PSendFilterSizeRequest, 
DummyBrpcCallback<PSendFilterSizeResponse>>;
+    ENABLE_FACTORY_CREATOR(SyncSizeClosure);
+
+    void _process_if_rpc_failed() override {
+        ((pipeline::CountedFinishDependency*)_dependency.get())->sub();
+        Base::_process_if_rpc_failed();
+    }
+
+    void _process_if_meet_error_status(const Status& status) override {
+        ((pipeline::CountedFinishDependency*)_dependency.get())->sub();
+        Base::_process_if_meet_error_status(status);
+    }
+
+public:
+    SyncSizeClosure(std::shared_ptr<PSendFilterSizeRequest> req,
+                    
std::shared_ptr<DummyBrpcCallback<PSendFilterSizeResponse>> callback,
+                    std::shared_ptr<pipeline::Dependency> dependency)
+            : Base(req, callback), _dependency(std::move(dependency)) {}
+};
+
 Status IRuntimeFilter::send_filter_size(uint64_t local_filter_size) {
     DCHECK(is_producer());
 
@@ -1069,10 +1094,7 @@ Status IRuntimeFilter::send_filter_size(uint64_t 
local_filter_size) {
 
     auto request = std::make_shared<PSendFilterSizeRequest>();
     auto callback = 
DummyBrpcCallback<PSendFilterSizeResponse>::create_shared();
-    auto closure =
-            AutoReleaseClosure<PSendFilterSizeRequest,
-                               
DummyBrpcCallback<PSendFilterSizeResponse>>::create_unique(request,
-                                                                               
           callback);
+    auto closure = SyncSizeClosure::create_unique(request, callback, 
_dependency);
     auto* pquery_id = request->mutable_query_id();
     pquery_id->set_hi(_state->query_id.hi());
     pquery_id->set_lo(_state->query_id.lo());
diff --git a/be/src/util/ref_count_closure.h b/be/src/util/ref_count_closure.h
index 7bbcfb7da39..01e523d9b9a 100644
--- a/be/src/util/ref_count_closure.h
+++ b/be/src/util/ref_count_closure.h
@@ -71,16 +71,6 @@ public:
 template <typename T>
 concept HasStatus = requires(T* response) { response->status(); };
 
-template <typename Response>
-void process_status(Response* response) {}
-
-template <HasStatus Response>
-void process_status(Response* response) {
-    if (auto status = Status::create(response->status()); !status) {
-        LOG(WARNING) << "RPC meet error status: " << status;
-    }
-}
-
 template <typename Request, typename Callback>
 class AutoReleaseClosure : public google::protobuf::Closure {
     using Weak = typename std::shared_ptr<Callback>::weak_type;
@@ -105,9 +95,9 @@ public:
             tmp->call();
         }
         if (cntl_->Failed()) {
-            LOG(WARNING) << "RPC meet failed: " << cntl_->ErrorText();
+            _process_if_rpc_failed();
         } else {
-            process_status<ResponseType>(response_.get());
+            _process_status<ResponseType>(response_.get());
         }
     }
 
@@ -120,7 +110,25 @@ public:
     std::shared_ptr<Request> request_;
     std::shared_ptr<ResponseType> response_;
 
+protected:
+    virtual void _process_if_rpc_failed() {
+        LOG(WARNING) << "RPC meet failed: " << cntl_->ErrorText();
+    }
+
+    virtual void _process_if_meet_error_status(const Status& status) {
+        LOG(WARNING) << "RPC meet error status: " << status;
+    }
+
 private:
+    template <typename Response>
+    void _process_status(Response* response) {}
+
+    template <HasStatus Response>
+    void _process_status(Response* response) {
+        if (auto status = Status::create(response->status()); !status) {
+            _process_if_meet_error_status(status);
+        }
+    }
     // Use a weak ptr to keep the callback, so that the callback can be 
deleted if the main
     // thread is freed.
     Weak callback_;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to