This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new e042e9f3d91 [enhancement](brpc) remove client from brpc cache if the underlying channel has error (#47487) e042e9f3d91 is described below commit e042e9f3d91c69ec3b00e7415ecdeb5aafe5531f Author: yiguolei <guo...@selectdb.com> AuthorDate: Wed Feb 5 12:14:30 2025 +0800 [enhancement](brpc) remove client from brpc cache if the underlying channel has error (#47487) ### What problem does this PR solve? If the channel in the stub has error, it should not be reused any more, it should be removed from the cache. --- be/src/util/brpc_client_cache.h | 87 ++++++++++++++++++++++++++++++++- be/test/util/brpc_client_cache_test.cpp | 47 ++++++++++++++++++ 2 files changed, 132 insertions(+), 2 deletions(-) diff --git a/be/src/util/brpc_client_cache.h b/be/src/util/brpc_client_cache.h index 24bd284f302..58d544dd5a9 100644 --- a/be/src/util/brpc_client_cache.h +++ b/be/src/util/brpc_client_cache.h @@ -40,7 +40,9 @@ #include "common/compiler_util.h" // IWYU pragma: keep #include "common/config.h" +#include "common/status.h" #include "runtime/exec_env.h" +#include "service/backend_options.h" #include "util/dns_cache.h" #include "util/network_util.h" @@ -56,6 +58,80 @@ using StubMap = phmap::parallel_flat_hash_map< namespace doris { +class FailureDetectClosure : public ::google::protobuf::Closure { +public: + FailureDetectClosure(std::shared_ptr<AtomicStatus>& channel_st, + ::google::protobuf::RpcController* controller, + ::google::protobuf::Closure* done) + : _channel_st(channel_st), _controller(controller), _done(done) {} + + void Run() override { + Defer defer {[&]() { delete this; }}; + // All brpc related API will use brpc::Controller, so that it is safe + // to do static cast here. + auto* cntl = static_cast<brpc::Controller*>(_controller); + if (cntl->Failed() && cntl->ErrorCode() == EHOSTDOWN) { + Status error_st = Status::NetworkError( + "Failed to send brpc, error={}, error_text={}, client: {}, latency = {}", + berror(cntl->ErrorCode()), cntl->ErrorText(), BackendOptions::get_localhost(), + cntl->latency_us()); + LOG(WARNING) << error_st; + _channel_st->update(error_st); + } + // Sometimes done == nullptr, for example hand_shake API. + if (_done != nullptr) { + _done->Run(); + } + // _done->Run may throw exception, so that move delete this to Defer. + // delete this; + } + +private: + std::shared_ptr<AtomicStatus> _channel_st; + ::google::protobuf::RpcController* _controller; + ::google::protobuf::Closure* _done; +}; + +// This channel will use FailureDetectClosure to wrap the original closure +// If some non-recoverable rpc failure happens, it will save the error status in +// _channel_st. +// And brpc client cache will depend on it to detect if the client is health. +class FailureDetectChannel : public ::brpc::Channel { +public: + FailureDetectChannel() : ::brpc::Channel() { + _channel_st = std::make_shared<AtomicStatus>(); // default OK + } + void CallMethod(const google::protobuf::MethodDescriptor* method, + google::protobuf::RpcController* controller, + const google::protobuf::Message* request, google::protobuf::Message* response, + google::protobuf::Closure* done) override { + FailureDetectClosure* failure_detect_closure = nullptr; + if (done != nullptr) { + // If done == nullptr, then it means the call is sync call, so that should not + // gen a failure detect closure for it. Or it will core. + failure_detect_closure = new FailureDetectClosure(_channel_st, controller, done); + } + ::brpc::Channel::CallMethod(method, controller, request, response, failure_detect_closure); + // Done == nullptr, it is a sync call, should also deal with the bad channel. + if (done == nullptr) { + auto* cntl = static_cast<brpc::Controller*>(controller); + if (cntl->Failed() && cntl->ErrorCode() == EHOSTDOWN) { + Status error_st = Status::NetworkError( + "Failed to send brpc, error={}, error_text={}, client: {}, latency = {}", + berror(cntl->ErrorCode()), cntl->ErrorText(), + BackendOptions::get_localhost(), cntl->latency_us()); + LOG(WARNING) << error_st; + _channel_st->update(error_st); + } + } + } + + std::shared_ptr<AtomicStatus> channel_status() { return _channel_st; } + +private: + std::shared_ptr<AtomicStatus> _channel_st; +}; + template <class T> class BrpcClientCache { public: @@ -99,7 +175,14 @@ public: auto get_value = [&stub_ptr](const auto& v) { stub_ptr = v.second; }; if (LIKELY(_stub_map.if_contains(host_port, get_value))) { DCHECK(stub_ptr != nullptr); - return stub_ptr; + // All client created from this cache will use FailureDetectChannel, so it is + // safe to do static cast here. + // Check if the base channel is OK, if not ignore the stub and create new one. + if (static_cast<FailureDetectChannel*>(stub_ptr->channel())->channel_status()->ok()) { + return stub_ptr; + } else { + _stub_map.erase(host_port); + } } // new one stub and insert into map @@ -148,7 +231,7 @@ public: options.timeout_ms = 2000; options.max_retry = 10; - std::unique_ptr<brpc::Channel> channel(new brpc::Channel()); + std::unique_ptr<FailureDetectChannel> channel(new FailureDetectChannel()); int ret_code = 0; if (host_port.find("://") == std::string::npos) { ret_code = channel->Init(host_port.c_str(), &options); diff --git a/be/test/util/brpc_client_cache_test.cpp b/be/test/util/brpc_client_cache_test.cpp index 5377ce7eeb9..5ae2aa0e7f9 100644 --- a/be/test/util/brpc_client_cache_test.cpp +++ b/be/test/util/brpc_client_cache_test.cpp @@ -56,4 +56,51 @@ TEST_F(BrpcClientCacheTest, invalid) { EXPECT_EQ(nullptr, stub1); } +TEST_F(BrpcClientCacheTest, failure) { + BrpcClientCache<PBackendService_Stub> cache; + TNetworkAddress address; + address.hostname = "127.0.0.1"; + address.port = 123; + std::shared_ptr<PBackendService_Stub> stub1 = cache.get_client(address); + EXPECT_NE(nullptr, stub1); + std::shared_ptr<PBackendService_Stub> stub2 = cache.get_client(address); + EXPECT_NE(nullptr, stub2); + // The channel is ok, so that the stub is the same + EXPECT_EQ(stub1, stub2); + EXPECT_TRUE(static_cast<FailureDetectChannel*>(stub1->channel())->channel_status()->ok()); + + // update channel st to error, will get a new stub + static_cast<FailureDetectChannel*>(stub1->channel()) + ->channel_status() + ->update(Status::NetworkError("test brpc error")); + std::shared_ptr<PBackendService_Stub> stub3 = cache.get_client(address); + EXPECT_NE(nullptr, stub3); + EXPECT_TRUE(static_cast<FailureDetectChannel*>(stub3->channel())->channel_status()->ok()); + // Then will get a new brpc stub not the previous one. + EXPECT_NE(stub2, stub3); + // The previous channel is not ok. + EXPECT_FALSE(static_cast<FailureDetectChannel*>(stub2->channel())->channel_status()->ok()); + + // Call handshake method, it will trigger host is down error. It is a sync call, not use closure. + cache.available(stub3, address.hostname, address.port); + EXPECT_FALSE(static_cast<FailureDetectChannel*>(stub3->channel())->channel_status()->ok()); + + std::shared_ptr<PBackendService_Stub> stub4 = cache.get_client(address); + EXPECT_NE(nullptr, stub4); + EXPECT_TRUE(static_cast<FailureDetectChannel*>(stub4->channel())->channel_status()->ok()); + + // Call handshake method, it will trigger host is down error. It is a async all, will use closure. + std::string message = "hello doris!"; + PHandShakeRequest request; + request.set_hello(message); + PHandShakeResponse response; + brpc::Controller cntl4; + stub4->hand_shake(&cntl4, &request, &response, brpc::DoNothing()); + brpc::Join(cntl4.call_id()); + EXPECT_FALSE(static_cast<FailureDetectChannel*>(stub4->channel())->channel_status()->ok()); + + // Check map size is 1 + EXPECT_EQ(1, cache.size()); +} + } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org