This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e042e9f3d91 [enhancement](brpc) remove client from brpc cache if the 
underlying channel has error (#47487)
e042e9f3d91 is described below

commit e042e9f3d91c69ec3b00e7415ecdeb5aafe5531f
Author: yiguolei <guo...@selectdb.com>
AuthorDate: Wed Feb 5 12:14:30 2025 +0800

    [enhancement](brpc) remove client from brpc cache if the underlying channel 
has error (#47487)
    
    ### What problem does this PR solve?
    
    If the channel in the stub has error, it should not be reused any more,
    it should be removed from the cache.
---
 be/src/util/brpc_client_cache.h         | 87 ++++++++++++++++++++++++++++++++-
 be/test/util/brpc_client_cache_test.cpp | 47 ++++++++++++++++++
 2 files changed, 132 insertions(+), 2 deletions(-)

diff --git a/be/src/util/brpc_client_cache.h b/be/src/util/brpc_client_cache.h
index 24bd284f302..58d544dd5a9 100644
--- a/be/src/util/brpc_client_cache.h
+++ b/be/src/util/brpc_client_cache.h
@@ -40,7 +40,9 @@
 
 #include "common/compiler_util.h" // IWYU pragma: keep
 #include "common/config.h"
+#include "common/status.h"
 #include "runtime/exec_env.h"
+#include "service/backend_options.h"
 #include "util/dns_cache.h"
 #include "util/network_util.h"
 
@@ -56,6 +58,80 @@ using StubMap = phmap::parallel_flat_hash_map<
 
 namespace doris {
 
+class FailureDetectClosure : public ::google::protobuf::Closure {
+public:
+    FailureDetectClosure(std::shared_ptr<AtomicStatus>& channel_st,
+                         ::google::protobuf::RpcController* controller,
+                         ::google::protobuf::Closure* done)
+            : _channel_st(channel_st), _controller(controller), _done(done) {}
+
+    void Run() override {
+        Defer defer {[&]() { delete this; }};
+        // All brpc related API will use brpc::Controller, so that it is safe
+        // to do static cast here.
+        auto* cntl = static_cast<brpc::Controller*>(_controller);
+        if (cntl->Failed() && cntl->ErrorCode() == EHOSTDOWN) {
+            Status error_st = Status::NetworkError(
+                    "Failed to send brpc, error={}, error_text={}, client: {}, 
latency = {}",
+                    berror(cntl->ErrorCode()), cntl->ErrorText(), 
BackendOptions::get_localhost(),
+                    cntl->latency_us());
+            LOG(WARNING) << error_st;
+            _channel_st->update(error_st);
+        }
+        // Sometimes done == nullptr, for example hand_shake API.
+        if (_done != nullptr) {
+            _done->Run();
+        }
+        // _done->Run may throw exception, so that move delete this to Defer.
+        // delete this;
+    }
+
+private:
+    std::shared_ptr<AtomicStatus> _channel_st;
+    ::google::protobuf::RpcController* _controller;
+    ::google::protobuf::Closure* _done;
+};
+
+// This channel will use FailureDetectClosure to wrap the original closure
+// If some non-recoverable rpc failure happens, it will save the error status 
in
+// _channel_st.
+// And brpc client cache will depend on it to detect if the client is health.
+class FailureDetectChannel : public ::brpc::Channel {
+public:
+    FailureDetectChannel() : ::brpc::Channel() {
+        _channel_st = std::make_shared<AtomicStatus>(); // default OK
+    }
+    void CallMethod(const google::protobuf::MethodDescriptor* method,
+                    google::protobuf::RpcController* controller,
+                    const google::protobuf::Message* request, 
google::protobuf::Message* response,
+                    google::protobuf::Closure* done) override {
+        FailureDetectClosure* failure_detect_closure = nullptr;
+        if (done != nullptr) {
+            // If done == nullptr, then it means the call is sync call, so 
that should not
+            // gen a failure detect closure for it. Or it will core.
+            failure_detect_closure = new FailureDetectClosure(_channel_st, 
controller, done);
+        }
+        ::brpc::Channel::CallMethod(method, controller, request, response, 
failure_detect_closure);
+        // Done == nullptr, it is a sync call, should also deal with the bad 
channel.
+        if (done == nullptr) {
+            auto* cntl = static_cast<brpc::Controller*>(controller);
+            if (cntl->Failed() && cntl->ErrorCode() == EHOSTDOWN) {
+                Status error_st = Status::NetworkError(
+                        "Failed to send brpc, error={}, error_text={}, client: 
{}, latency = {}",
+                        berror(cntl->ErrorCode()), cntl->ErrorText(),
+                        BackendOptions::get_localhost(), cntl->latency_us());
+                LOG(WARNING) << error_st;
+                _channel_st->update(error_st);
+            }
+        }
+    }
+
+    std::shared_ptr<AtomicStatus> channel_status() { return _channel_st; }
+
+private:
+    std::shared_ptr<AtomicStatus> _channel_st;
+};
+
 template <class T>
 class BrpcClientCache {
 public:
@@ -99,7 +175,14 @@ public:
         auto get_value = [&stub_ptr](const auto& v) { stub_ptr = v.second; };
         if (LIKELY(_stub_map.if_contains(host_port, get_value))) {
             DCHECK(stub_ptr != nullptr);
-            return stub_ptr;
+            // All client created from this cache will use 
FailureDetectChannel, so it is
+            // safe to do static cast here.
+            // Check if the base channel is OK, if not ignore the stub and 
create new one.
+            if 
(static_cast<FailureDetectChannel*>(stub_ptr->channel())->channel_status()->ok())
 {
+                return stub_ptr;
+            } else {
+                _stub_map.erase(host_port);
+            }
         }
 
         // new one stub and insert into map
@@ -148,7 +231,7 @@ public:
         options.timeout_ms = 2000;
         options.max_retry = 10;
 
-        std::unique_ptr<brpc::Channel> channel(new brpc::Channel());
+        std::unique_ptr<FailureDetectChannel> channel(new 
FailureDetectChannel());
         int ret_code = 0;
         if (host_port.find("://") == std::string::npos) {
             ret_code = channel->Init(host_port.c_str(), &options);
diff --git a/be/test/util/brpc_client_cache_test.cpp 
b/be/test/util/brpc_client_cache_test.cpp
index 5377ce7eeb9..5ae2aa0e7f9 100644
--- a/be/test/util/brpc_client_cache_test.cpp
+++ b/be/test/util/brpc_client_cache_test.cpp
@@ -56,4 +56,51 @@ TEST_F(BrpcClientCacheTest, invalid) {
     EXPECT_EQ(nullptr, stub1);
 }
 
+TEST_F(BrpcClientCacheTest, failure) {
+    BrpcClientCache<PBackendService_Stub> cache;
+    TNetworkAddress address;
+    address.hostname = "127.0.0.1";
+    address.port = 123;
+    std::shared_ptr<PBackendService_Stub> stub1 = cache.get_client(address);
+    EXPECT_NE(nullptr, stub1);
+    std::shared_ptr<PBackendService_Stub> stub2 = cache.get_client(address);
+    EXPECT_NE(nullptr, stub2);
+    // The channel is ok, so that the stub is the same
+    EXPECT_EQ(stub1, stub2);
+    
EXPECT_TRUE(static_cast<FailureDetectChannel*>(stub1->channel())->channel_status()->ok());
+
+    // update channel st to error, will get a new stub
+    static_cast<FailureDetectChannel*>(stub1->channel())
+            ->channel_status()
+            ->update(Status::NetworkError("test brpc error"));
+    std::shared_ptr<PBackendService_Stub> stub3 = cache.get_client(address);
+    EXPECT_NE(nullptr, stub3);
+    
EXPECT_TRUE(static_cast<FailureDetectChannel*>(stub3->channel())->channel_status()->ok());
+    // Then will get a new brpc stub not the previous one.
+    EXPECT_NE(stub2, stub3);
+    // The previous channel is not ok.
+    
EXPECT_FALSE(static_cast<FailureDetectChannel*>(stub2->channel())->channel_status()->ok());
+
+    // Call handshake method, it will trigger host is down error. It is a sync 
call, not use closure.
+    cache.available(stub3, address.hostname, address.port);
+    
EXPECT_FALSE(static_cast<FailureDetectChannel*>(stub3->channel())->channel_status()->ok());
+
+    std::shared_ptr<PBackendService_Stub> stub4 = cache.get_client(address);
+    EXPECT_NE(nullptr, stub4);
+    
EXPECT_TRUE(static_cast<FailureDetectChannel*>(stub4->channel())->channel_status()->ok());
+
+    // Call handshake method, it will trigger host is down error. It is a 
async all, will use closure.
+    std::string message = "hello doris!";
+    PHandShakeRequest request;
+    request.set_hello(message);
+    PHandShakeResponse response;
+    brpc::Controller cntl4;
+    stub4->hand_shake(&cntl4, &request, &response, brpc::DoNothing());
+    brpc::Join(cntl4.call_id());
+    
EXPECT_FALSE(static_cast<FailureDetectChannel*>(stub4->channel())->channel_status()->ok());
+
+    // Check map size is 1
+    EXPECT_EQ(1, cache.size());
+}
+
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to