This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 7851563829b [fix](brpc_client_cache) resolve hostname in DNS cache 
before passing to brpc (#40074) (#40786)
7851563829b is described below

commit 7851563829b6c5f6f78fbb4f61ff03459d45fefc
Author: Kaijie Chen <c...@apache.org>
AuthorDate: Fri Sep 13 14:28:01 2024 +0800

    [fix](brpc_client_cache) resolve hostname in DNS cache before passing to 
brpc (#40074) (#40786)
    
    backport #40074
---
 be/src/exec/rowid_fetcher.cpp             |  3 ++-
 be/src/exprs/runtime_filter.cpp           |  7 +++----
 be/src/olap/single_replica_compaction.cpp |  2 +-
 be/src/runtime/runtime_filter_mgr.cpp     | 12 +++++++++--
 be/src/service/internal_service.cpp       |  5 +++++
 be/src/util/brpc_client_cache.h           | 35 ++++++++++++++++++++++---------
 be/src/util/proto_util.h                  | 15 +++++++++++++
 be/src/vec/functions/function_rpc.cpp     |  5 +++++
 be/src/vec/sink/load_stream_stub.cpp      | 10 ++++++---
 be/src/vec/sink/writer/vtablet_writer.cpp | 21 ++++++++++++++++++-
 10 files changed, 93 insertions(+), 22 deletions(-)

diff --git a/be/src/exec/rowid_fetcher.cpp b/be/src/exec/rowid_fetcher.cpp
index c9aa200e1d6..b51e263d86b 100644
--- a/be/src/exec/rowid_fetcher.cpp
+++ b/be/src/exec/rowid_fetcher.cpp
@@ -71,7 +71,8 @@ Status RowIDFetcher::init() {
         if (!client) {
             LOG(WARNING) << "Get rpc stub failed, host=" << node_info.host
                          << ", port=" << node_info.brpc_port;
-            return Status::InternalError("RowIDFetcher failed to init rpc 
client");
+            return Status::InternalError("RowIDFetcher failed to init rpc 
client, host={}, port={}",
+                                         node_info.host, node_info.brpc_port);
         }
         _stubs.push_back(client);
     }
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 1c0cdffc0f5..4e9a12b5bc5 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -1100,9 +1100,8 @@ Status IRuntimeFilter::send_filter_size(RuntimeState* 
state, uint64_t local_filt
     std::shared_ptr<PBackendService_Stub> stub(
             _state->exec_env->brpc_internal_client_cache()->get_client(addr));
     if (!stub) {
-        std::string msg =
-                fmt::format("Get rpc stub failed, host={},  port=", 
addr.hostname, addr.port);
-        return Status::InternalError(msg);
+        return Status::InternalError("Get rpc stub failed, host={}, port={}", 
addr.hostname,
+                                     addr.port);
     }
 
     auto request = std::make_shared<PSendFilterSizeRequest>();
@@ -1135,7 +1134,7 @@ Status IRuntimeFilter::push_to_remote(const 
TNetworkAddress* addr, bool opt_remo
             _state->exec_env->brpc_internal_client_cache()->get_client(*addr));
     if (!stub) {
         return Status::InternalError(
-                fmt::format("Get rpc stub failed, host={},  port=", 
addr->hostname, addr->port));
+                fmt::format("Get rpc stub failed, host={}, port={}", 
addr->hostname, addr->port));
     }
 
     auto merge_filter_request = std::make_shared<PMergeFilterRequest>();
diff --git a/be/src/olap/single_replica_compaction.cpp 
b/be/src/olap/single_replica_compaction.cpp
index 393bfb99f7b..8257a8182a0 100644
--- a/be/src/olap/single_replica_compaction.cpp
+++ b/be/src/olap/single_replica_compaction.cpp
@@ -175,7 +175,7 @@ Status 
SingleReplicaCompaction::_get_rowset_verisons_from_peer(
             
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(addr.host,
                                                                              
addr.brpc_port);
     if (stub == nullptr) {
-        return Status::Aborted("get rpc stub failed");
+        return Status::Aborted("get rpc stub failed, host={}, port={}", 
addr.host, addr.brpc_port);
     }
 
     brpc::Controller cntl;
diff --git a/be/src/runtime/runtime_filter_mgr.cpp 
b/be/src/runtime/runtime_filter_mgr.cpp
index 5b61cc87361..24baf9b6c97 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -342,10 +342,17 @@ Status 
RuntimeFilterMergeControllerEntity::send_filter_size(const PSendFilterSiz
     cnt_val->global_size += request->filter_size();
     cnt_val->source_addrs.push_back(request->source_addr());
 
+    Status st = Status::OK();
     if (cnt_val->source_addrs.size() == cnt_val->producer_size) {
         for (auto addr : cnt_val->source_addrs) {
             std::shared_ptr<PBackendService_Stub> stub(
                     
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(addr));
+            if (stub == nullptr) {
+                LOG(WARNING) << "Failed to init rpc to " << addr.hostname() << 
":" << addr.port();
+                st = Status::InternalError("Failed to init rpc to {}:{}", 
addr.hostname(),
+                                           addr.port());
+                continue;
+            }
 
             auto closure = AutoReleaseClosure<PSyncFilterSizeRequest,
                                               
DummyBrpcCallback<PSyncFilterSizeResponse>>::
@@ -365,7 +372,7 @@ Status 
RuntimeFilterMergeControllerEntity::send_filter_size(const PSendFilterSiz
             closure.release();
         }
     }
-    return Status::OK();
+    return st;
 }
 
 Status RuntimeFilterMgr::sync_filter_size(const PSyncFilterSizeRequest* 
request) {
@@ -395,6 +402,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const 
PMergeFilterRequest* requ
     int64_t start_merge = MonotonicMillis();
     auto filter_id = request->filter_id();
     std::map<int, CntlValwithLock>::iterator iter;
+    Status st = Status::OK();
     {
         std::shared_lock<std::shared_mutex> guard(_filter_map_mutex);
         iter = _filter_map.find(filter_id);
@@ -587,7 +595,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const 
PMergeFilterRequest* requ
             }
         }
     }
-    return Status::OK();
+    return st;
 }
 
 Status RuntimeFilterMergeController::acquire(
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 0801f30fb2e..13faed18e61 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -1110,6 +1110,11 @@ void 
PInternalServiceImpl::fetch_remote_tablet_schema(google::protobuf::RpcContr
                 std::shared_ptr<PBackendService_Stub> stub(
                         
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
                                 host, brpc_port));
+                if (stub == nullptr) {
+                    LOG(WARNING) << "Failed to init rpc to " << host << ":" << 
brpc_port;
+                    st = Status::InternalError("Failed to init rpc to {}:{}", 
host, brpc_port);
+                    continue;
+                }
                 rpc_contexts[i].cid = rpc_contexts[i].cntl.call_id();
                 
rpc_contexts[i].cntl.set_timeout_ms(config::fetch_remote_schema_rpc_timeout_ms);
                 stub->fetch_remote_tablet_schema(&rpc_contexts[i].cntl, 
&remote_request,
diff --git a/be/src/util/brpc_client_cache.h b/be/src/util/brpc_client_cache.h
index 09c92fb398e..24bd284f302 100644
--- a/be/src/util/brpc_client_cache.h
+++ b/be/src/util/brpc_client_cache.h
@@ -83,33 +83,47 @@ public:
     }
 
     std::shared_ptr<T> get_client(const std::string& host, int port) {
-        std::string realhost;
-        realhost = host;
-        if (!is_valid_ip(host)) {
-            Status status = ExecEnv::GetInstance()->dns_cache()->get(host, 
&realhost);
+        std::string realhost = host;
+        auto dns_cache = ExecEnv::GetInstance()->dns_cache();
+        if (dns_cache == nullptr) {
+            LOG(WARNING) << "DNS cache is not initialized, skipping hostname 
resolve";
+        } else if (!is_valid_ip(host)) {
+            Status status = dns_cache->get(host, &realhost);
             if (!status.ok()) {
                 LOG(WARNING) << "failed to get ip from host:" << 
status.to_string();
                 return nullptr;
             }
         }
         std::string host_port = get_host_port(realhost, port);
-        return get_client(host_port);
-    }
-
-    std::shared_ptr<T> get_client(const std::string& host_port) {
         std::shared_ptr<T> stub_ptr;
         auto get_value = [&stub_ptr](const auto& v) { stub_ptr = v.second; };
         if (LIKELY(_stub_map.if_contains(host_port, get_value))) {
+            DCHECK(stub_ptr != nullptr);
             return stub_ptr;
         }
 
         // new one stub and insert into map
         auto stub = get_new_client_no_cache(host_port);
-        _stub_map.try_emplace_l(
-                host_port, [&stub](const auto& v) { stub = v.second; }, stub);
+        if (stub != nullptr) {
+            _stub_map.try_emplace_l(
+                    host_port, [&stub](const auto& v) { stub = v.second; }, 
stub);
+        }
         return stub;
     }
 
+    std::shared_ptr<T> get_client(const std::string& host_port) {
+        int pos = host_port.rfind(':');
+        std::string host = host_port.substr(0, pos);
+        int port = 0;
+        try {
+            port = stoi(host_port.substr(pos + 1));
+        } catch (const std::exception& err) {
+            LOG(WARNING) << "failed to parse port from " << host_port << ": " 
<< err.what();
+            return nullptr;
+        }
+        return get_client(host, port);
+    }
+
     std::shared_ptr<T> get_new_client_no_cache(const std::string& host_port,
                                                const std::string& protocol = 
"",
                                                const std::string& 
connection_type = "",
@@ -143,6 +157,7 @@ public:
                     channel->Init(host_port.c_str(), 
config::rpc_load_balancer.c_str(), &options);
         }
         if (ret_code) {
+            LOG(WARNING) << "Failed to initialize brpc Channel to " << 
host_port;
             return nullptr;
         }
         return std::make_shared<T>(channel.release(), 
google::protobuf::Service::STUB_OWNS_CHANNEL);
diff --git a/be/src/util/proto_util.h b/be/src/util/proto_util.h
index f77a1f637f3..c1994d1feeb 100644
--- a/be/src/util/proto_util.h
+++ b/be/src/util/proto_util.h
@@ -71,11 +71,26 @@ Status transmit_block_httpv2(ExecEnv* exec_env, 
std::unique_ptr<Closure> closure
                              TNetworkAddress brpc_dest_addr) {
     
RETURN_IF_ERROR(request_embed_attachment_contain_blockv2(closure->request_.get(),
 closure));
 
+    std::string host = brpc_dest_addr.hostname;
+    auto dns_cache = ExecEnv::GetInstance()->dns_cache();
+    if (dns_cache == nullptr) {
+        LOG(WARNING) << "DNS cache is not initialized, skipping hostname 
resolve";
+    } else if (!is_valid_ip(brpc_dest_addr.hostname)) {
+        Status status = dns_cache->get(brpc_dest_addr.hostname, &host);
+        if (!status.ok()) {
+            LOG(WARNING) << "failed to get ip from host " << 
brpc_dest_addr.hostname << ": "
+                         << status.to_string();
+            return Status::InternalError("failed to get ip from host {}", 
brpc_dest_addr.hostname);
+        }
+    }
     //format an ipv6 address
     std::string brpc_url = get_brpc_http_url(brpc_dest_addr.hostname, 
brpc_dest_addr.port);
 
     std::shared_ptr<PBackendService_Stub> brpc_http_stub =
             
exec_env->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url, 
"http");
+    if (brpc_http_stub == nullptr) {
+        return Status::InternalError("failed to open brpc http client to {}", 
brpc_url);
+    }
     closure->cntl_->http_request().uri() =
             brpc_url + "/PInternalServiceImpl/transmit_block_by_http";
     closure->cntl_->http_request().set_method(brpc::HTTP_METHOD_POST);
diff --git a/be/src/vec/functions/function_rpc.cpp 
b/be/src/vec/functions/function_rpc.cpp
index ba171ffbbc9..c27383dac62 100644
--- a/be/src/vec/functions/function_rpc.cpp
+++ b/be/src/vec/functions/function_rpc.cpp
@@ -46,6 +46,11 @@ Status RPCFnImpl::vec_call(FunctionContext* context, Block& 
block, const ColumnN
                            size_t result, size_t input_rows_count) {
     PFunctionCallRequest request;
     PFunctionCallResponse response;
+    if (_client == nullptr) {
+        return Status::InternalError(
+                "call to rpc function {} failed: init rpc error, server addr = 
{}", _signature,
+                _server_addr);
+    }
     request.set_function_name(_function_name);
     RETURN_IF_ERROR(_convert_block_to_proto(block, arguments, 
input_rows_count, &request));
     brpc::Controller cntl;
diff --git a/be/src/vec/sink/load_stream_stub.cpp 
b/be/src/vec/sink/load_stream_stub.cpp
index e899486e854..e29d64118b9 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -152,7 +152,6 @@ Status 
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
         return _init_st;
     }
     _dst_id = node_info.id;
-    std::string host_port = get_host_port(node_info.host, node_info.brpc_port);
     brpc::StreamOptions opt;
     opt.max_buf_size = config::load_stream_max_buf_size;
     opt.idle_timeout_ms = idle_timeout_ms;
@@ -185,7 +184,11 @@ Status 
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
     }
     POpenLoadStreamResponse response;
     // set connection_group "streaming" to distinguish with non-streaming 
connections
-    const auto& stub = client_cache->get_client(host_port);
+    const auto& stub = client_cache->get_client(node_info.host, 
node_info.brpc_port);
+    if (stub == nullptr) {
+        return Status::InternalError("failed to init brpc client to {}:{}", 
node_info.host,
+                                     node_info.brpc_port);
+    }
     stub->open_load_stream(&cntl, &request, &response, nullptr);
     for (const auto& resp : response.tablet_schemas()) {
         auto tablet_schema = std::make_unique<TabletSchema>();
@@ -200,7 +203,8 @@ Status 
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
                                          cntl.ErrorText());
         return _init_st;
     }
-    LOG(INFO) << "open load stream to " << host_port << ", " << *this;
+    LOG(INFO) << "open load stream to host=" << node_info.host << ", port=" << 
node_info.brpc_port
+              << ", " << *this;
     _is_init.store(true);
     return Status::OK();
 }
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 1e6b8f7b868..e946a73bfed 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -700,11 +700,30 @@ void VNodeChannel::try_send_pending_block(RuntimeState* 
state) {
             return;
         }
 
+        std::string host = _node_info.host;
+        auto dns_cache = ExecEnv::GetInstance()->dns_cache();
+        if (dns_cache == nullptr) {
+            LOG(WARNING) << "DNS cache is not initialized, skipping hostname 
resolve";
+        } else if (!is_valid_ip(_node_info.host)) {
+            Status status = dns_cache->get(_node_info.host, &host);
+            if (!status.ok()) {
+                LOG(WARNING) << "failed to get ip from host " << 
_node_info.host << ": "
+                             << status.to_string();
+                _send_block_callback->clear_in_flight();
+                return;
+            }
+        }
         //format an ipv6 address
-        std::string brpc_url = get_brpc_http_url(_node_info.host, 
_node_info.brpc_port);
+        std::string brpc_url = get_brpc_http_url(host, _node_info.brpc_port);
         std::shared_ptr<PBackendService_Stub> _brpc_http_stub =
                 
_state->exec_env()->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url,
                                                                                
           "http");
+        if (_brpc_http_stub == nullptr) {
+            cancel(fmt::format("{}, failed to open brpc http client to {}", 
channel_info(),
+                               brpc_url));
+            _send_block_callback->clear_in_flight();
+            return;
+        }
         _send_block_callback->cntl_->http_request().uri() =
                 brpc_url + 
"/PInternalServiceImpl/tablet_writer_add_block_by_http";
         
_send_block_callback->cntl_->http_request().set_method(brpc::HTTP_METHOD_POST);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to