This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new e70e618e0ee branch-4.0: [fix](rpc) Use resolved IP address for rpc
connections instead of hostname to avoid DNS resolution failures #59904 (#60221)
e70e618e0ee is described below
commit e70e618e0ee64d35dfc009e087f0ab5150d8fb98
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Jan 26 17:10:59 2026 +0800
branch-4.0: [fix](rpc) Use resolved IP address for rpc connections instead
of hostname to avoid DNS resolution failures #59904 (#60221)
Cherry-picked from #59904
Co-authored-by: Mingyu Chen (Rayner) <[email protected]>
---
be/src/runtime/client_cache.cpp | 107 +++++--
be/src/runtime/client_cache.h | 28 +-
be/src/util/brpc_client_cache.h | 62 +++-
be/src/util/dns_cache.cpp | 48 ++-
be/src/util/dns_cache.h | 5 +
.../java/org/apache/doris/common/DNSCache.java | 21 +-
.../org/apache/doris/rpc/BackendServiceClient.java | 7 +-
.../org/apache/doris/rpc/BackendServiceProxy.java | 14 +-
.../java/org/apache/doris/common/DNSCacheTest.java | 356 +++++++++++++++++++++
.../apache/doris/rpc/BackendServiceClientTest.java | 227 +++++++++++++
.../apache/doris/rpc/BackendServiceProxyTest.java | 303 ++++++++++++++++++
11 files changed, 1128 insertions(+), 50 deletions(-)
diff --git a/be/src/runtime/client_cache.cpp b/be/src/runtime/client_cache.cpp
index f15717be735..e83c5fbc6cc 100644
--- a/be/src/runtime/client_cache.cpp
+++ b/be/src/runtime/client_cache.cpp
@@ -21,6 +21,8 @@
#include <utility>
#include "common/logging.h"
+#include "runtime/exec_env.h"
+#include "util/dns_cache.h"
#include "util/doris_metrics.h"
#include "util/network_util.h"
@@ -37,30 +39,66 @@ ClientCacheHelper::~ClientCacheHelper() {
}
}
-void ClientCacheHelper::_get_client_from_cache(const TNetworkAddress&
hostport, void** client_key) {
+void ClientCacheHelper::_get_client_from_cache(const TNetworkAddress& hostport,
+ const std::string& resolved_ip,
void** client_key) {
*client_key = nullptr;
std::lock_guard<std::mutex> lock(_lock);
//VLOG_RPC << "get_client(" << hostport << ")";
auto cache_entry = _client_cache.find(hostport);
if (cache_entry == _client_cache.end()) {
- cache_entry = _client_cache.insert(std::make_pair(hostport,
std::list<void*>())).first;
+ cache_entry =
+ _client_cache.insert(std::make_pair(hostport,
std::list<CachedClient>())).first;
DCHECK(cache_entry != _client_cache.end());
}
- std::list<void*>& info_list = cache_entry->second;
- if (!info_list.empty()) {
- *client_key = info_list.front();
- VLOG_RPC << "get_client(): cached client for " << hostport;
- info_list.pop_front();
+ std::list<CachedClient>& info_list = cache_entry->second;
+ // Find a cached client with matching resolved IP
+ for (auto it = info_list.begin(); it != info_list.end(); ++it) {
+ if (it->resolved_ip == resolved_ip) {
+ *client_key = it->client_key;
+ VLOG_RPC << "get_client(): cached client for " << hostport << "
with ip "
+ << resolved_ip;
+ info_list.erase(it);
+ return;
+ }
+ }
+
+ // No matching client found. Clear all cached clients with stale IPs for
this hostport.
+ // These clients were created with old resolved IPs and should be closed.
+ for (auto& cached_client : info_list) {
+ auto client_map_entry = _client_map.find(cached_client.client_key);
+ if (client_map_entry != _client_map.end()) {
+ ThriftClientImpl* client_to_close = client_map_entry->second;
+ client_to_close->close();
+ delete client_to_close;
+ _client_map.erase(client_map_entry);
+ _client_hostport_map.erase(cached_client.client_key);
+ if (_metrics_enabled) {
+ thrift_opened_clients->increment(-1);
+ }
+ }
}
+ info_list.clear();
}
Status ClientCacheHelper::get_client(const TNetworkAddress& hostport,
ClientFactory& factory_method,
void** client_key, int timeout_ms) {
- _get_client_from_cache(hostport, client_key);
+ // Resolve hostname to IP address via DNS cache.
+ // If the hostname is already an IP address, DNS cache will return it
directly.
+ std::string resolved_ip;
+ Status dns_status =
ExecEnv::GetInstance()->dns_cache()->get(hostport.hostname, &resolved_ip);
+ if (!dns_status.ok() || resolved_ip.empty()) {
+ return Status::InternalError("Failed to resolve hostname {} to IP
address: {}",
+ hostport.hostname,
dns_status.to_string());
+ }
+
+ // Try to get a cached client with matching resolved IP
+ _get_client_from_cache(hostport, resolved_ip, client_key);
if (*client_key == nullptr) {
- RETURN_IF_ERROR(_create_client(hostport, factory_method, client_key,
timeout_ms));
+ // No cached client with matching IP, create a new one using the
resolved IP
+ RETURN_IF_ERROR(
+ _create_client(hostport, resolved_ip, factory_method,
client_key, timeout_ms));
}
if (_metrics_enabled) {
@@ -74,14 +112,18 @@ Status ClientCacheHelper::reopen_client(ClientFactory&
factory_method, void** cl
int timeout_ms) {
DCHECK(*client_key != nullptr) << "Trying to reopen nullptr client";
ThriftClientImpl* client_to_close = nullptr;
+ TNetworkAddress hostport;
{
std::lock_guard<std::mutex> lock(_lock);
auto client_map_entry = _client_map.find(*client_key);
DCHECK(client_map_entry != _client_map.end());
client_to_close = client_map_entry->second;
+
+ // Get the original hostport (with hostname) for this client
+ auto hostport_entry = _client_hostport_map.find(*client_key);
+ DCHECK(hostport_entry != _client_hostport_map.end());
+ hostport = hostport_entry->second;
}
- const std::string ipaddress = client_to_close->ipaddress();
- int port = client_to_close->port();
client_to_close->close();
@@ -91,6 +133,7 @@ Status ClientCacheHelper::reopen_client(ClientFactory&
factory_method, void** cl
{
std::lock_guard<std::mutex> lock(_lock);
_client_map.erase(*client_key);
+ _client_hostport_map.erase(*client_key);
}
delete client_to_close;
*client_key = nullptr;
@@ -99,16 +142,30 @@ Status ClientCacheHelper::reopen_client(ClientFactory&
factory_method, void** cl
thrift_opened_clients->increment(-1);
}
- RETURN_IF_ERROR(_create_client(make_network_address(ipaddress, port),
factory_method,
- client_key, timeout_ms));
+ // Re-resolve hostname to IP address
+ std::string resolved_ip;
+ Status dns_status =
ExecEnv::GetInstance()->dns_cache()->get(hostport.hostname, &resolved_ip);
+ if (!dns_status.ok() || resolved_ip.empty()) {
+ return Status::InternalError("Failed to resolve hostname {} to IP
address: {}",
+ hostport.hostname,
dns_status.to_string());
+ }
+
+ RETURN_IF_ERROR(_create_client(hostport, resolved_ip, factory_method,
client_key, timeout_ms));
return Status::OK();
}
Status ClientCacheHelper::_create_client(const TNetworkAddress& hostport,
+ const std::string& resolved_ip,
ClientFactory& factory_method, void**
client_key,
int timeout_ms) {
- std::unique_ptr<ThriftClientImpl> client_impl(factory_method(hostport,
client_key));
+ // Create a new TNetworkAddress with the resolved IP instead of hostname.
+ // This ensures that all client connections are made using IP addresses.
+ TNetworkAddress addr_with_ip;
+ addr_with_ip.hostname = resolved_ip;
+ addr_with_ip.port = hostport.port;
+
+ std::unique_ptr<ThriftClientImpl> client_impl(factory_method(addr_with_ip,
client_key));
//VLOG_CONNECTION << "create_client(): adding new client for "
// << client_impl->ipaddress() << ":" <<
client_impl->port();
@@ -135,6 +192,8 @@ Status ClientCacheHelper::_create_client(const
TNetworkAddress& hostport,
// Because the client starts life 'checked out', we don't add it to
the cache map
DCHECK(_client_map.count(*client_key) == 0);
_client_map[*client_key] = client_impl.release();
+ // Store the original hostport (with hostname) for this client
+ _client_hostport_map[*client_key] = hostport;
}
if (_metrics_enabled) {
@@ -151,17 +210,27 @@ void ClientCacheHelper::release_client(void** client_key)
{
std::lock_guard<std::mutex> lock(_lock);
auto client_map_entry = _client_map.find(*client_key);
DCHECK(client_map_entry != _client_map.end());
- client_to_close = client_map_entry->second;
+ ThriftClientImpl* client = client_map_entry->second;
+
+ // Get the original hostport (with hostname) for this client
+ auto hostport_entry = _client_hostport_map.find(*client_key);
+ DCHECK(hostport_entry != _client_hostport_map.end());
+ const TNetworkAddress& hostport = hostport_entry->second;
- auto cache_list = _client_cache.find(
- make_network_address(client_to_close->ipaddress(),
client_to_close->port()));
+ auto cache_list = _client_cache.find(hostport);
DCHECK(cache_list != _client_cache.end());
if (_max_cache_size_per_host >= 0 &&
cache_list->second.size() >= _max_cache_size_per_host) {
- // cache of this host is full, close this client connection and
remove if from _client_map
+ // cache of this host is full, close this client connection and
remove from maps
_client_map.erase(*client_key);
+ _client_hostport_map.erase(*client_key);
+ client_to_close = client;
} else {
- cache_list->second.push_back(*client_key);
+ // Store the client with its resolved IP address
+ CachedClient cached_client;
+ cached_client.client_key = *client_key;
+ cached_client.resolved_ip = client->ipaddress();
+ cache_list->second.push_back(cached_client);
// There is no need to close client if we put it to cache list.
client_to_close = nullptr;
}
diff --git a/be/src/runtime/client_cache.h b/be/src/runtime/client_cache.h
index c5f44f55e09..712878ffef1 100644
--- a/be/src/runtime/client_cache.h
+++ b/be/src/runtime/client_cache.h
@@ -106,17 +106,30 @@ private:
// this isn't going to scale for a high request rate
std::mutex _lock;
- // map from (host, port) to list of client keys for that address
- using ClientCacheMap = std::unordered_map<TNetworkAddress,
std::list<void*>>;
+ // Cached client entry with resolved IP address
+ struct CachedClient {
+ void* client_key;
+ std::string resolved_ip; // The IP address when this client was cached
+ };
+
+ // map from (host, port) to list of cached client entries for that address
+ using ClientCacheMap = std::unordered_map<TNetworkAddress,
std::list<CachedClient>>;
ClientCacheMap _client_cache;
- // if cache not found, set client_key as nullptr
- void _get_client_from_cache(const TNetworkAddress& hostport, void**
client_key);
+ // Get a client from cache that matches the resolved IP.
+ // If cache not found or IP doesn't match, set client_key as nullptr.
+ void _get_client_from_cache(const TNetworkAddress& hostport, const
std::string& resolved_ip,
+ void** client_key);
// Map from client key back to its associated ThriftClientImpl transport
using ClientMap = std::unordered_map<void*, ThriftClientImpl*>;
ClientMap _client_map;
+ // Map from client key to the original hostport (with hostname, not
resolved IP).
+ // This is needed to correctly return clients to the cache by hostname.
+ using ClientHostportMap = std::unordered_map<void*, TNetworkAddress>;
+ ClientHostportMap _client_hostport_map;
+
bool _metrics_enabled;
// max connections per host in this cache, -1 means unlimited
@@ -130,9 +143,10 @@ private:
// Total clients in the cache, including those in use
IntGauge* thrift_opened_clients = nullptr;
- // Create a new client for specific host/port in 'client' and put it in
_client_map
- Status _create_client(const TNetworkAddress& hostport, ClientFactory&
factory_method,
- void** client_key, int timeout_ms);
+ // Create a new client for specific host/port in 'client' and put it in
_client_map.
+ // The resolved_ip is the actual IP address to connect to (resolved from
hostname).
+ Status _create_client(const TNetworkAddress& hostport, const std::string&
resolved_ip,
+ ClientFactory& factory_method, void** client_key,
int timeout_ms);
};
template <class T>
diff --git a/be/src/util/brpc_client_cache.h b/be/src/util/brpc_client_cache.h
index 9f8898c4825..7b0fe99cbf8 100644
--- a/be/src/util/brpc_client_cache.h
+++ b/be/src/util/brpc_client_cache.h
@@ -51,10 +51,17 @@ class PBackendService_Stub;
class PFunctionService_Stub;
} // namespace doris
+// Entry that holds both resolved IP and stub, similar to Java's
BackendServiceClientExtIp
+template <typename T>
+struct StubEntry {
+ std::string real_ip;
+ std::shared_ptr<T> stub;
+};
+
template <typename T>
using StubMap = phmap::parallel_flat_hash_map<
- std::string, std::shared_ptr<T>, std::hash<std::string>,
std::equal_to<std::string>,
- std::allocator<std::pair<const std::string, std::shared_ptr<T>>>, 8,
std::mutex>;
+ std::string, StubEntry<T>, std::hash<std::string>,
std::equal_to<std::string>,
+ std::allocator<std::pair<const std::string, StubEntry<T>>>, 8,
std::mutex>;
namespace doris {
#include "common/compile_check_begin.h"
@@ -170,26 +177,53 @@ public:
return nullptr;
}
}
- std::string host_port = get_host_port(realhost, port);
+
+ // Use original host:port as key (like Java's TNetworkAddress address)
+ // This allows us to detect IP changes when DNS resolution changes
+ std::string host_port = fmt::format("{}:{}", host, port);
+
std::shared_ptr<T> stub_ptr;
- auto get_value = [&stub_ptr](const auto& v) { stub_ptr = v.second; };
- if (LIKELY(_stub_map.if_contains(host_port, get_value))) {
- DCHECK(stub_ptr != nullptr);
- // All client created from this cache will use
FailureDetectChannel, so it is
- // safe to do static cast here.
- // Check if the base channel is OK, if not ignore the stub and
create new one.
- if
(static_cast<FailureDetectChannel*>(stub_ptr->channel())->channel_status()->ok())
{
- return stub_ptr;
+ bool need_remove = false;
+
+ auto check_entry = [&](const auto& v) {
+ const StubEntry<T>& entry = v.second;
+ // Check if cached IP matches current resolved IP
+ if (entry.real_ip != realhost) {
+ // IP changed (DNS resolution changed)
+ LOG(WARNING) << "Cached ip changed for " << host << ", before
ip: " << entry.real_ip
+ << ", current ip: " << realhost;
+ need_remove = true;
+ } else if
(!static_cast<FailureDetectChannel*>(entry.stub->channel())
+ ->channel_status()
+ ->ok()) {
+ // Client is not in normal state, need to recreate
+ // At this point we cannot judge the progress of reconnecting
the underlying channel.
+ // In the worst case, it may take two minutes. But we can't
stand the connection refused
+ // for two minutes, so rebuild the channel directly.
+ need_remove = true;
} else {
+ // Cache hit: IP matches and client is healthy
+ stub_ptr = entry.stub;
+ }
+ };
+
+ if (LIKELY(_stub_map.if_contains(host_port, check_entry))) {
+ if (stub_ptr != nullptr) {
+ return stub_ptr;
+ }
+ // IP changed or client unhealthy, need to remove old entry
+ if (need_remove) {
_stub_map.erase(host_port);
}
}
- // new one stub and insert into map
- auto stub = get_new_client_no_cache(host_port);
+ // Create new stub using resolved IP for actual connection
+ std::string real_host_port = get_host_port(realhost, port);
+ auto stub = get_new_client_no_cache(real_host_port);
if (stub != nullptr) {
+ StubEntry<T> entry {realhost, stub};
_stub_map.try_emplace_l(
- host_port, [&stub](const auto& v) { stub = v.second; },
stub);
+ host_port, [&stub](const auto& v) { stub = v.second.stub;
}, entry);
}
return stub;
}
diff --git a/be/src/util/dns_cache.cpp b/be/src/util/dns_cache.cpp
index fc7e88b3c1c..0ea794872c1 100644
--- a/be/src/util/dns_cache.cpp
+++ b/be/src/util/dns_cache.cpp
@@ -51,9 +51,46 @@ Status DNSCache::get(const std::string& hostname,
std::string* ip) {
}
}
+// Resolve hostname to IP address, similar to Java's DNSCache.resolveHostname.
+// If resolution fails, falls back to cached IP if available.
+// Returns the resolved IP, or cached IP on failure, or empty string if no
cache available.
+std::string DNSCache::_resolve_hostname(const std::string& hostname) {
+ // Get cached IP first (if any)
+ std::string cached_ip;
+ {
+ std::shared_lock<std::shared_mutex> lock(mutex);
+ auto it = cache.find(hostname);
+ if (it != cache.end()) {
+ cached_ip = it->second;
+ }
+ }
+
+ // Try to resolve hostname
+ std::string resolved_ip;
+ Status status = hostname_to_ip(hostname, resolved_ip,
BackendOptions::is_bind_ipv6());
+
+ if (!status.ok() || resolved_ip.empty()) {
+ // Resolution failed
+ if (!cached_ip.empty()) {
+ LOG(WARNING) << "Failed to resolve hostname " << hostname
+ << ", use cached ip: " << cached_ip;
+ return cached_ip;
+ } else {
+ LOG(WARNING) << "Failed to resolve hostname " << hostname << ", no
cached ip available";
+ return "";
+ }
+ }
+
+ return resolved_ip;
+}
+
Status DNSCache::_update(const std::string& hostname) {
- std::string real_ip = "";
- RETURN_IF_ERROR(hostname_to_ip(hostname, real_ip,
BackendOptions::is_bind_ipv6()));
+ std::string real_ip = _resolve_hostname(hostname);
+ if (real_ip.empty()) {
+ return Status::InternalError("Failed to resolve hostname {} and no
cached ip available",
+ hostname);
+ }
+
std::unique_lock<std::shared_mutex> lock(mutex);
auto it = cache.find(hostname);
if (it == cache.end() || it->second != real_ip) {
@@ -73,9 +110,12 @@ void DNSCache::_refresh_cache() {
std::transform(cache.begin(), cache.end(), std::inserter(keys,
keys.end()),
[](const auto& pair) { return pair.first; });
}
- Status st;
for (auto& key : keys) {
- st = _update(key);
+ Status st = _update(key);
+ if (!st.ok()) {
+ LOG(WARNING) << "Failed to update DNS cache for hostname " <<
key << ": "
+ << st.to_string();
+ }
}
}
}
diff --git a/be/src/util/dns_cache.h b/be/src/util/dns_cache.h
index 5dc413c53e2..51ffb6567ec 100644
--- a/be/src/util/dns_cache.h
+++ b/be/src/util/dns_cache.h
@@ -39,6 +39,11 @@ public:
Status get(const std::string& hostname, std::string* ip);
private:
+ // Resolve hostname to IP address.
+ // If resolution fails, falls back to cached IP if available.
+ // Returns the resolved IP, or cached IP on failure, or empty string if no
cache available.
+ std::string _resolve_hostname(const std::string& hostname);
+
// update the ip of hostname in cache
Status _update(const std::string& hostname);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/DNSCache.java
b/fe/fe-core/src/main/java/org/apache/doris/common/DNSCache.java
index 1fe96eba20f..395130e2659 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/DNSCache.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/DNSCache.java
@@ -68,10 +68,27 @@ public class DNSCache {
* @return The IP address for the given hostname, or an empty string if
the hostname cannot be resolved.
*/
private String resolveHostname(String hostname) {
+ String cachedIp = cache.get(hostname);
try {
- return NetUtils.getIpByHost(hostname, 0);
+ String ip = NetUtils.getIpByHost(hostname, 0);
+ if (ip == null || ip.isEmpty()) {
+ if (cachedIp != null && !cachedIp.isEmpty()) {
+ LOG.warn("Failed to resolve hostname {}, use cached ip:
{}", hostname, cachedIp);
+ return cachedIp;
+ } else {
+ LOG.warn("Failed to resolve hostname {}, no cached ip
available", hostname);
+ return "";
+ }
+ }
+ return ip;
} catch (UnknownHostException e) {
- return "";
+ if (cachedIp != null && !cachedIp.isEmpty()) {
+ LOG.warn("Failed to resolve hostname {}, use cached ip: {}",
hostname, cachedIp);
+ return cachedIp;
+ } else {
+ LOG.warn("Failed to resolve hostname {}, no cached ip
available", hostname);
+ return "";
+ }
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
index eff777c98d1..fc3dac0c214 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
@@ -43,9 +43,12 @@ public class BackendServiceClient {
private final ManagedChannel channel;
private final long execPlanTimeout;
- public BackendServiceClient(TNetworkAddress address, Executor executor) {
+ public BackendServiceClient(TNetworkAddress address, String resolvedIp,
Executor executor) {
this.address = address;
- channel = NettyChannelBuilder.forAddress(address.getHostname(),
address.getPort())
+ // Use resolved IP address instead of hostname to avoid DNS resolution
issues
+ // If resolvedIp is empty or null, fallback to hostname
+ String targetHost = (resolvedIp != null && !resolvedIp.isEmpty()) ?
resolvedIp : address.getHostname();
+ channel = NettyChannelBuilder.forAddress(targetHost, address.getPort())
.executor(executor).keepAliveTime(Config.grpc_keep_alive_second,
TimeUnit.SECONDS)
.flowControlWindow(Config.grpc_max_message_size_bytes)
.keepAliveWithoutCalls(true)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index 68fa927f0f2..fbdfb3cf223 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -119,6 +119,15 @@ public class BackendServiceProxy {
private BackendServiceClient getProxy(TNetworkAddress address) throws
UnknownHostException {
String realIp =
Env.getCurrentEnv().getDnsCache().get(address.hostname);
+
+ // Check if DNS resolution failed (returns empty string)
+ if (realIp.isEmpty() && Config.enable_fqdn_mode) {
+ String errorMsg = String.format("Failed to resolve hostname: %s.
DNS cache returned empty IP address.",
+ address.hostname);
+ LOG.warn(errorMsg);
+ throw new UnknownHostException(errorMsg);
+ }
+
BackendServiceClientExtIp serviceClientExtIp = serviceMap.get(address);
if (serviceClientExtIp != null &&
serviceClientExtIp.realIp.equals(realIp)
&& serviceClientExtIp.client.isNormalState()) {
@@ -131,7 +140,7 @@ public class BackendServiceProxy {
try {
serviceClientExtIp = serviceMap.get(address);
if (serviceClientExtIp != null &&
!serviceClientExtIp.realIp.equals(realIp)) {
- LOG.warn("Cached ip changed ,before ip: {}, curIp: {}",
serviceClientExtIp.realIp, realIp);
+ LOG.warn("Cached ip changed, before ip: {}, curIp: {}",
serviceClientExtIp.realIp, realIp);
serviceMap.remove(address);
removedClient = serviceClientExtIp.client;
serviceClientExtIp = null;
@@ -145,7 +154,8 @@ public class BackendServiceProxy {
serviceClientExtIp = null;
}
if (serviceClientExtIp == null) {
- BackendServiceClient client = new
BackendServiceClient(address, grpcThreadPool);
+ // Pass resolved IP to BackendServiceClient to avoid DNS
resolution at gRPC layer
+ BackendServiceClient client = new
BackendServiceClient(address, realIp, grpcThreadPool);
serviceMap.put(address, new BackendServiceClientExtIp(realIp,
client));
}
return serviceMap.get(address).client;
diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/DNSCacheTest.java
b/fe/fe-core/src/test/java/org/apache/doris/common/DNSCacheTest.java
new file mode 100644
index 00000000000..6d2afa59d9a
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/DNSCacheTest.java
@@ -0,0 +1,356 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common;
+
+import org.apache.doris.common.util.NetUtils;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+import java.lang.reflect.Method;
+import java.net.UnknownHostException;
+
+/**
+ * Unit tests for DNSCache to verify DNS caching and refresh functionality.
+ */
+public class DNSCacheTest {
+ private DNSCache dnsCache;
+ private MockedStatic<NetUtils> netUtilsMockedStatic;
+ private boolean originalFqdnMode;
+
+ @Before
+ public void setUp() {
+ // Save original config
+ originalFqdnMode = Config.enable_fqdn_mode;
+
+ // Create DNS cache instance
+ dnsCache = new DNSCache();
+
+ // Mock NetUtils static methods
+ netUtilsMockedStatic = Mockito.mockStatic(NetUtils.class);
+ }
+
+ @After
+ public void tearDown() {
+ // Restore original config
+ Config.enable_fqdn_mode = originalFqdnMode;
+
+ // Close mocked static
+ if (netUtilsMockedStatic != null) {
+ netUtilsMockedStatic.close();
+ }
+ }
+
+ /**
+ * Test that get() successfully resolves a hostname and caches the result.
+ */
+ @Test
+ public void testGetResolvesAndCachesHostname() throws UnknownHostException
{
+ String hostname = "backend.example.com";
+ String expectedIp = "10.0.0.1";
+
+ // Mock NetUtils to return expected IP
+ netUtilsMockedStatic.when(() -> NetUtils.getIpByHost(hostname, 0))
+ .thenReturn(expectedIp);
+
+ // First call - should resolve and cache
+ String ip1 = dnsCache.get(hostname);
+ Assert.assertEquals(expectedIp, ip1);
+
+ // Second call - should return cached value without calling NetUtils
again
+ String ip2 = dnsCache.get(hostname);
+ Assert.assertEquals(expectedIp, ip2);
+
+ // Verify NetUtils.getIpByHost was called only once (cached on
subsequent calls)
+ netUtilsMockedStatic.verify(() -> NetUtils.getIpByHost(hostname, 0),
Mockito.times(1));
+ }
+
+ /**
+ * Test that when DNS resolution fails, get() returns an empty string.
+ */
+ @Test
+ public void testGetReturnsEmptyStringOnResolutionFailure() throws
UnknownHostException {
+ String hostname = "non-existent.example.com";
+
+ // Mock NetUtils to throw UnknownHostException
+ netUtilsMockedStatic.when(() -> NetUtils.getIpByHost(hostname, 0))
+ .thenThrow(new UnknownHostException("Host not found"));
+
+ // Should return empty string instead of throwing exception
+ String ip = dnsCache.get(hostname);
+ Assert.assertEquals("", ip);
+
+ // Verify the result is cached (subsequent calls don't resolve again)
+ String ip2 = dnsCache.get(hostname);
+ Assert.assertEquals("", ip2);
+
+ // Should only attempt resolution once
+ netUtilsMockedStatic.verify(() -> NetUtils.getIpByHost(hostname, 0),
Mockito.times(1));
+ }
+
+ /**
+ * Test that multiple different hostnames can be cached simultaneously.
+ */
+ @Test
+ public void testMultipleHostnamesCached() throws UnknownHostException {
+ String hostname1 = "backend1.example.com";
+ String hostname2 = "backend2.example.com";
+ String ip1 = "10.0.0.1";
+ String ip2 = "10.0.0.2";
+
+ // Mock NetUtils for both hostnames
+ netUtilsMockedStatic.when(() -> NetUtils.getIpByHost(hostname1, 0))
+ .thenReturn(ip1);
+ netUtilsMockedStatic.when(() -> NetUtils.getIpByHost(hostname2, 0))
+ .thenReturn(ip2);
+
+ // Get both hostnames
+ String result1 = dnsCache.get(hostname1);
+ String result2 = dnsCache.get(hostname2);
+
+ // Verify both are cached correctly
+ Assert.assertEquals(ip1, result1);
+ Assert.assertEquals(ip2, result2);
+
+ // Verify each was resolved once
+ netUtilsMockedStatic.verify(() -> NetUtils.getIpByHost(hostname1, 0),
Mockito.times(1));
+ netUtilsMockedStatic.verify(() -> NetUtils.getIpByHost(hostname2, 0),
Mockito.times(1));
+ }
+
+ /**
+ * Test that localhost resolves to 127.0.0.1 (using real DNS resolution).
+ */
+ @Test
+ public void testLocalhostResolution() {
+ // Don't mock NetUtils for this test - use real resolution
+ netUtilsMockedStatic.close();
+ netUtilsMockedStatic = null;
+
+ DNSCache realDnsCache = new DNSCache();
+
+ String ip = realDnsCache.get("localhost");
+
+ // localhost should resolve to 127.0.0.1 or ::1
+ Assert.assertTrue("localhost should resolve to an IP",
+ ip.equals("127.0.0.1") || ip.contains(":"));
+ }
+
+ /**
+ * Test that the cache can handle IP addresses as input (should return the
IP as-is).
+ * Note: This depends on the implementation of NetUtils.getIpByHost.
+ */
+ @Test
+ public void testIpAddressInput() throws UnknownHostException {
+ String ipAddress = "10.0.0.1";
+
+ // Mock NetUtils to return the IP as-is
+ netUtilsMockedStatic.when(() -> NetUtils.getIpByHost(ipAddress, 0))
+ .thenReturn(ipAddress);
+
+ String result = dnsCache.get(ipAddress);
+
+ Assert.assertEquals(ipAddress, result);
+ }
+
+ /**
+ * Test that start() only schedules refresh task when enable_fqdn_mode is
true.
+ * Note: This is a behavior test and doesn't verify actual scheduling,
+ * just that start() completes without errors.
+ */
+ @Test
+ public void testStartWithFqdnModeEnabled() {
+ Config.enable_fqdn_mode = true;
+
+ DNSCache cache = new DNSCache();
+
+ // Should not throw any exception
+ cache.start();
+
+ // Verify it completes successfully
+ Assert.assertNotNull(cache);
+ }
+
+ /**
+ * Test that start() does not schedule refresh when enable_fqdn_mode is
false.
+ */
+ @Test
+ public void testStartWithFqdnModeDisabled() {
+ Config.enable_fqdn_mode = false;
+
+ DNSCache cache = new DNSCache();
+
+ // Should not throw any exception
+ cache.start();
+
+ // Verify it completes successfully
+ Assert.assertNotNull(cache);
+ }
+
+ /**
+ * Test thread safety - multiple threads accessing cache concurrently.
+ */
+ @Test
+ public void testConcurrentAccess() throws InterruptedException,
UnknownHostException {
+ String hostname = "backend.example.com";
+ String expectedIp = "10.0.0.1";
+
+ netUtilsMockedStatic.when(() -> NetUtils.getIpByHost(hostname, 0))
+ .thenReturn(expectedIp);
+
+ // Pre-populate the cache by calling get() once before concurrent
access
+ // This ensures the cache is initialized and subsequent calls will hit
the cache
+ String initialIp = dnsCache.get(hostname);
+ Assert.assertEquals(expectedIp, initialIp);
+
+ int threadCount = 10;
+ Thread[] threads = new Thread[threadCount];
+
+ // Create multiple threads that access the cache concurrently
+ for (int i = 0; i < threadCount; i++) {
+ threads[i] = new Thread(() -> {
+ String ip = dnsCache.get(hostname);
+ Assert.assertEquals(expectedIp, ip);
+ });
+ }
+
+ // Start all threads
+ for (Thread thread : threads) {
+ thread.start();
+ }
+
+ // Wait for all threads to complete
+ for (Thread thread : threads) {
+ thread.join();
+ }
+
+ // Despite concurrent access, NetUtils.getIpByHost should be called
only once
+ // (during the initial pre-population) due to caching
+ netUtilsMockedStatic.verify(() -> NetUtils.getIpByHost(hostname, 0),
Mockito.times(1));
+ }
+
+ /**
+ * Test that concurrent access with race condition still resolves
correctly.
+ * This test uses localhost to avoid mock issues in multi-threaded
scenarios.
+ */
+ @Test
+ public void testConcurrentAccessWithRealDns() throws InterruptedException {
+ // Don't mock NetUtils for this test - use real resolution
+ netUtilsMockedStatic.close();
+ netUtilsMockedStatic = null;
+
+ DNSCache realDnsCache = new DNSCache();
+ String hostname = "localhost";
+
+ int threadCount = 20;
+ Thread[] threads = new Thread[threadCount];
+ String[] results = new String[threadCount];
+
+ // Create multiple threads that try to resolve the same hostname
concurrently
+ for (int i = 0; i < threadCount; i++) {
+ final int index = i;
+ threads[i] = new Thread(() -> {
+ results[index] = realDnsCache.get(hostname);
+ });
+ }
+
+ // Start all threads
+ for (Thread thread : threads) {
+ thread.start();
+ }
+
+ // Wait for all threads to complete
+ for (Thread thread : threads) {
+ thread.join();
+ }
+
+ // All threads should get the same result
+ String expectedResult = results[0];
+ Assert.assertNotNull("Result should not be null", expectedResult);
+ Assert.assertFalse("Result should not be empty",
expectedResult.isEmpty());
+
+ for (int i = 1; i < threadCount; i++) {
+ Assert.assertEquals("All threads should get the same result",
expectedResult, results[i]);
+ }
+ }
+
+ /**
+ * Test that refresh() keeps the cached IP when DNS resolution fails.
+ * The refresh method should log a warning and continue using the cached
value.
+ */
+ @Test
+ public void testRefreshKeepsCachedIpOnResolutionFailure() throws Exception
{
+ String hostname = "backend.example.com";
+ String cachedIp = "10.0.0.1";
+
+ // First, mock successful resolution to populate the cache
+ netUtilsMockedStatic.when(() -> NetUtils.getIpByHost(hostname, 0))
+ .thenReturn(cachedIp);
+
+ // Populate the cache
+ String ip = dnsCache.get(hostname);
+ Assert.assertEquals(cachedIp, ip);
+
+ // Now mock resolution failure
+ netUtilsMockedStatic.when(() -> NetUtils.getIpByHost(hostname, 0))
+ .thenThrow(new UnknownHostException("Host not found"));
+
+ // Call refresh() using reflection since it's private
+ Method refreshMethod = DNSCache.class.getDeclaredMethod("refresh");
+ refreshMethod.setAccessible(true);
+ refreshMethod.invoke(dnsCache);
+
+ // The cached IP should remain unchanged after refresh failure
+ String ipAfterRefresh = dnsCache.get(hostname);
+ Assert.assertEquals("Cached IP should remain unchanged after refresh
failure", cachedIp, ipAfterRefresh);
+ }
+
+ /**
+ * Test that refresh() updates the cached IP when DNS resolution succeeds
with a new IP.
+ */
+ @Test
+ public void testRefreshUpdatesCachedIpOnSuccess() throws Exception {
+ String hostname = "backend.example.com";
+ String originalIp = "10.0.0.1";
+ String newIp = "10.0.0.2";
+
+ // First, mock successful resolution to populate the cache
+ netUtilsMockedStatic.when(() -> NetUtils.getIpByHost(hostname, 0))
+ .thenReturn(originalIp);
+
+ // Populate the cache
+ String ip = dnsCache.get(hostname);
+ Assert.assertEquals(originalIp, ip);
+
+ // Now mock resolution with new IP
+ netUtilsMockedStatic.when(() -> NetUtils.getIpByHost(hostname, 0))
+ .thenReturn(newIp);
+
+ // Call refresh() using reflection since it's private
+ Method refreshMethod = DNSCache.class.getDeclaredMethod("refresh");
+ refreshMethod.setAccessible(true);
+ refreshMethod.invoke(dnsCache);
+
+ // The cached IP should be updated to the new IP
+ String ipAfterRefresh = dnsCache.get(hostname);
+ Assert.assertEquals("Cached IP should be updated after successful
refresh", newIp, ipAfterRefresh);
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/rpc/BackendServiceClientTest.java
b/fe/fe-core/src/test/java/org/apache/doris/rpc/BackendServiceClientTest.java
new file mode 100644
index 00000000000..68aa36eee74
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/rpc/BackendServiceClientTest.java
@@ -0,0 +1,227 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.rpc;
+
+import org.apache.doris.common.Config;
+import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.thrift.TNetworkAddress;
+
+import io.grpc.ManagedChannel;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+/**
+ * Unit tests for BackendServiceClient to verify that it uses
+ * resolved IP addresses instead of hostnames for gRPC connections.
+ */
+public class BackendServiceClientTest {
+ private Executor executor;
+ private int originalGrpcKeepAliveSeconds;
+ private int originalGrpcMaxMessageSize;
+ private long originalRemoteFragmentExecTimeout;
+
+ @Before
+ public void setUp() {
+ // Create executor for tests
+ executor = Executors.newCachedThreadPool();
+
+ // Save original config values
+ originalGrpcKeepAliveSeconds = Config.grpc_keep_alive_second;
+ originalGrpcMaxMessageSize = Config.grpc_max_message_size_bytes;
+ originalRemoteFragmentExecTimeout =
Config.remote_fragment_exec_timeout_ms;
+
+ // Set test config values to reasonable defaults
+ Config.grpc_keep_alive_second = 60;
+ Config.grpc_max_message_size_bytes = 1024 * 1024 * 100; // 100MB
+ Config.remote_fragment_exec_timeout_ms = 5000;
+ }
+
+ @After
+ public void tearDown() {
+ // Restore original config
+ Config.grpc_keep_alive_second = originalGrpcKeepAliveSeconds;
+ Config.grpc_max_message_size_bytes = originalGrpcMaxMessageSize;
+ Config.remote_fragment_exec_timeout_ms =
originalRemoteFragmentExecTimeout;
+ }
+
+ /**
+ * Test that BackendServiceClient uses the resolved IP address
+ * when creating the gRPC channel.
+ */
+ @Test
+ public void testClientUsesResolvedIp() {
+ String hostname = "backend.example.com";
+ String resolvedIp = "10.0.0.1";
+ int port = 9060;
+
+ TNetworkAddress address = new TNetworkAddress(hostname, port);
+
+ // Create client with resolved IP
+ BackendServiceClient client = new BackendServiceClient(address,
resolvedIp, executor);
+
+ // Verify client was created
+ Assert.assertNotNull(client);
+
+ // Verify the address is stored
+ TNetworkAddress storedAddress = Deencapsulation.getField(client,
"address");
+ Assert.assertEquals(address, storedAddress);
+
+ // Verify the channel was created (non-null)
+ ManagedChannel channel = Deencapsulation.getField(client, "channel");
+ Assert.assertNotNull(channel);
+
+ // Note: We cannot easily verify that the channel uses the IP instead
of hostname
+ // without inspecting the channel's internal state, which is
implementation-dependent.
+ // In a real scenario, you would use integration tests or network
monitoring to verify.
+
+ // Cleanup
+ client.shutdown();
+ }
+
+ /**
+ * Test that when resolved IP is empty, the client falls back to using
hostname.
+ */
+ @Test
+ public void testClientFallsBackToHostnameWhenIpIsEmpty() {
+ String hostname = "localhost"; // Use localhost to ensure it resolves
+ String emptyIp = "";
+ int port = 9060;
+
+ TNetworkAddress address = new TNetworkAddress(hostname, port);
+
+ // Create client with empty IP - should fallback to hostname
+ BackendServiceClient client = new BackendServiceClient(address,
emptyIp, executor);
+
+ // Verify client was created
+ Assert.assertNotNull(client);
+
+ // Verify channel was created
+ ManagedChannel channel = Deencapsulation.getField(client, "channel");
+ Assert.assertNotNull(channel);
+
+ // Cleanup
+ client.shutdown();
+ }
+
+ /**
+ * Test that when resolved IP is null, the client falls back to using
hostname.
+ */
+ @Test
+ public void testClientFallsBackToHostnameWhenIpIsNull() {
+ String hostname = "localhost"; // Use localhost to ensure it resolves
+ String nullIp = null;
+ int port = 9060;
+
+ TNetworkAddress address = new TNetworkAddress(hostname, port);
+
+ // Create client with null IP - should fallback to hostname
+ BackendServiceClient client = new BackendServiceClient(address,
nullIp, executor);
+
+ // Verify client was created
+ Assert.assertNotNull(client);
+
+ // Verify channel was created
+ ManagedChannel channel = Deencapsulation.getField(client, "channel");
+ Assert.assertNotNull(channel);
+
+ // Cleanup
+ client.shutdown();
+ }
+
+ /**
+ * Test that the client's isNormalState() method works correctly
+ * after creation.
+ */
+ @Test
+ public void testIsNormalState() {
+ String hostname = "localhost";
+ String resolvedIp = "127.0.0.1";
+ int port = 9060;
+
+ TNetworkAddress address = new TNetworkAddress(hostname, port);
+
+ // Create client
+ BackendServiceClient client = new BackendServiceClient(address,
resolvedIp, executor);
+
+ // Verify client is in normal state initially
+ // (IDLE or CONNECTING state is considered normal)
+ Assert.assertTrue("Client should be in normal state after creation",
+ client.isNormalState());
+
+ // Cleanup
+ client.shutdown();
+
+ // After shutdown, state should no longer be normal
+ // Note: This might be racy, so we don't assert on it strictly
+ }
+
+ /**
+ * Test that shutdown() properly closes the channel.
+ */
+ @Test
+ public void testShutdown() throws InterruptedException {
+ String hostname = "localhost";
+ String resolvedIp = "127.0.0.1";
+ int port = 9060;
+
+ TNetworkAddress address = new TNetworkAddress(hostname, port);
+
+ // Create client
+ BackendServiceClient client = new BackendServiceClient(address,
resolvedIp, executor);
+
+ // Verify channel is not shutdown initially
+ ManagedChannel channel = Deencapsulation.getField(client, "channel");
+ Assert.assertFalse("Channel should not be shutdown initially",
channel.isShutdown());
+
+ // Shutdown client
+ client.shutdown();
+
+ // Give it a moment to shutdown
+ Thread.sleep(100);
+
+ // Verify channel is shutdown or terminated
+ Assert.assertTrue("Channel should be shutdown or terminated",
+ channel.isShutdown() || channel.isTerminated());
+ }
+
+ /**
+ * Test that multiple clients can be created with different addresses.
+ */
+ @Test
+ public void testMultipleClients() {
+ TNetworkAddress address1 = new TNetworkAddress("localhost", 9060);
+ TNetworkAddress address2 = new TNetworkAddress("localhost", 9061);
+
+ BackendServiceClient client1 = new BackendServiceClient(address1,
"127.0.0.1", executor);
+ BackendServiceClient client2 = new BackendServiceClient(address2,
"127.0.0.1", executor);
+
+ Assert.assertNotNull(client1);
+ Assert.assertNotNull(client2);
+ Assert.assertTrue(client1.isNormalState());
+ Assert.assertTrue(client2.isNormalState());
+
+ // Cleanup
+ client1.shutdown();
+ client2.shutdown();
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/rpc/BackendServiceProxyTest.java
b/fe/fe-core/src/test/java/org/apache/doris/rpc/BackendServiceProxyTest.java
new file mode 100644
index 00000000000..2a897f0e400
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/rpc/BackendServiceProxyTest.java
@@ -0,0 +1,303 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.rpc;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DNSCache;
+import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.thrift.TNetworkAddress;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+import java.net.UnknownHostException;
+import java.util.Map;
+
+/**
+ * Unit tests for BackendServiceProxy to verify DNS cache integration
+ * and IP address change handling.
+ */
+public class BackendServiceProxyTest {
+ private BackendServiceProxy proxy;
+ private DNSCache mockDnsCache;
+ private Env mockEnv;
+ private MockedStatic<Env> envMockedStatic;
+ private boolean originalFqdnMode;
+ private int originalProxyNum;
+
+ @Before
+ public void setUp() {
+ // Save original config values
+ originalFqdnMode = Config.enable_fqdn_mode;
+ originalProxyNum = Config.backend_proxy_num;
+
+ // Set test config
+ Config.enable_fqdn_mode = true;
+ Config.backend_proxy_num = 1;
+
+ // Create mock DNS cache
+ mockDnsCache = Mockito.mock(DNSCache.class);
+
+ // Create mock Env
+ mockEnv = Mockito.mock(Env.class);
+ Mockito.when(mockEnv.getDnsCache()).thenReturn(mockDnsCache);
+
+ // Mock static Env.getCurrentEnv()
+ envMockedStatic = Mockito.mockStatic(Env.class);
+ envMockedStatic.when(Env::getCurrentEnv).thenReturn(mockEnv);
+
+ // Create proxy instance
+ proxy = new BackendServiceProxy();
+ }
+
+ @After
+ public void tearDown() {
+ // Restore original config
+ Config.enable_fqdn_mode = originalFqdnMode;
+ Config.backend_proxy_num = originalProxyNum;
+
+ // Close mocked static
+ if (envMockedStatic != null) {
+ envMockedStatic.close();
+ }
+ }
+
+ /**
+ * Test that when DNS cache returns a valid IP, the client is created
successfully
+ * and uses the resolved IP address.
+ */
+ @Test
+ public void testGetProxyWithValidIp() throws Exception {
+ String hostname = "backend-host.example.com";
+ String resolvedIp = "10.0.0.1";
+ int port = 9060;
+
+ TNetworkAddress address = new TNetworkAddress(hostname, port);
+
+ // Mock DNS cache to return valid IP
+ Mockito.when(mockDnsCache.get(hostname)).thenReturn(resolvedIp);
+
+ // Get proxy - should create client successfully
+ BackendServiceClient client = Deencapsulation.invoke(proxy,
"getProxy", address);
+
+ // Verify client was created
+ Assert.assertNotNull(client);
+
+ // Verify DNS cache was called
+ Mockito.verify(mockDnsCache, Mockito.times(1)).get(hostname);
+
+ // Verify the client is stored in serviceMap
+ Map<TNetworkAddress, Object> serviceMap =
Deencapsulation.getField(proxy, "serviceMap");
+ Assert.assertEquals(1, serviceMap.size());
+ Assert.assertTrue(serviceMap.containsKey(address));
+ }
+
+ /**
+ * Test that when DNS cache returns empty string (DNS resolution failed)
+ * and FQDN mode is enabled, UnknownHostException is thrown.
+ */
+ @Test
+ public void testGetProxyWithDnsResolutionFailure() {
+ String hostname = "non-existent-host.example.com";
+ int port = 9060;
+
+ TNetworkAddress address = new TNetworkAddress(hostname, port);
+
+ // Mock DNS cache to return empty string (resolution failed)
+ Mockito.when(mockDnsCache.get(hostname)).thenReturn("");
+
+ // Should throw UnknownHostException
+ try {
+ Deencapsulation.invoke(proxy, "getProxy", address);
+ Assert.fail("Expected UnknownHostException to be thrown");
+ } catch (Exception e) {
+ Assert.assertTrue("Expected UnknownHostException", e instanceof
UnknownHostException);
+ Assert.assertTrue("Exception message should contain hostname",
+ e.getMessage().contains(hostname));
+ Assert.assertTrue("Exception message should mention DNS cache",
+ e.getMessage().contains("DNS cache returned empty IP
address"));
+ }
+
+ // Verify DNS cache was called
+ Mockito.verify(mockDnsCache, Mockito.times(1)).get(hostname);
+ }
+
+ /**
+ * Test that when DNS cache returns empty string but FQDN mode is disabled,
+ * client creation proceeds with hostname (fallback behavior).
+ */
+ @Test
+ public void testGetProxyWithDnsFailureAndFqdnModeDisabled() throws
Exception {
+ Config.enable_fqdn_mode = false;
+
+ String hostname = "backend-host.example.com";
+ int port = 9060;
+
+ TNetworkAddress address = new TNetworkAddress(hostname, port);
+
+ // Mock DNS cache to return empty string
+ Mockito.when(mockDnsCache.get(hostname)).thenReturn("");
+
+ // Should create client with hostname as fallback
+ BackendServiceClient client = Deencapsulation.invoke(proxy,
"getProxy", address);
+
+ // Verify client was created
+ Assert.assertNotNull(client);
+
+ // Verify DNS cache was called
+ Mockito.verify(mockDnsCache, Mockito.times(1)).get(hostname);
+ }
+
+ /**
+ * Test that when IP address changes, the old client is shutdown and
+ * a new client is created with the new IP.
+ */
+ @Test
+ public void testGetProxyWithIpChange() throws Exception {
+ String hostname = "backend-host.example.com";
+ String oldIp = "10.0.0.1";
+ String newIp = "10.0.0.2";
+ int port = 9060;
+
+ TNetworkAddress address = new TNetworkAddress(hostname, port);
+
+ // First call - create client with old IP
+ Mockito.when(mockDnsCache.get(hostname)).thenReturn(oldIp);
+ BackendServiceClient client1 = Deencapsulation.invoke(proxy,
"getProxy", address);
+ Assert.assertNotNull(client1);
+
+ // Verify serviceMap contains the client
+ Map<TNetworkAddress, Object> serviceMap =
Deencapsulation.getField(proxy, "serviceMap");
+ Assert.assertEquals(1, serviceMap.size());
+
+ // Second call - IP changed
+ Mockito.when(mockDnsCache.get(hostname)).thenReturn(newIp);
+ BackendServiceClient client2 = Deencapsulation.invoke(proxy,
"getProxy", address);
+
+ // Verify a new client was created
+ Assert.assertNotNull(client2);
+
+ // Verify DNS cache was called twice
+ Mockito.verify(mockDnsCache, Mockito.times(2)).get(hostname);
+
+ // Verify serviceMap still has one entry but with new client
+ Assert.assertEquals(1, serviceMap.size());
+
+ // Note: We cannot easily verify client1.shutdown() was called because
+ // the client is created as a real object, not a mock. In a real test
+ // environment, you would verify the channel state or use dependency
injection.
+ }
+
+ /**
+ * Test that when the same IP is returned, the existing client is reused
+ * (no new client is created).
+ */
+ @Test
+ public void testGetProxyReusesClientWithSameIp() throws Exception {
+ String hostname = "backend-host.example.com";
+ String resolvedIp = "10.0.0.1";
+ int port = 9060;
+
+ TNetworkAddress address = new TNetworkAddress(hostname, port);
+
+ // Mock DNS cache to return same IP
+ Mockito.when(mockDnsCache.get(hostname)).thenReturn(resolvedIp);
+
+ // First call
+ BackendServiceClient client1 = Deencapsulation.invoke(proxy,
"getProxy", address);
+ Assert.assertNotNull(client1);
+
+ // Second call with same IP
+ BackendServiceClient client2 = Deencapsulation.invoke(proxy,
"getProxy", address);
+
+ // Should reuse the same client
+ Assert.assertSame("Client should be reused when IP hasn't changed",
client1, client2);
+
+ // DNS cache should be called twice (once per getProxy call)
+ Mockito.verify(mockDnsCache, Mockito.times(2)).get(hostname);
+ }
+
+ /**
+ * Test that removeProxy properly removes and shuts down the client.
+ */
+ @Test
+ public void testRemoveProxy() throws Exception {
+ String hostname = "backend-host.example.com";
+ String resolvedIp = "10.0.0.1";
+ int port = 9060;
+
+ TNetworkAddress address = new TNetworkAddress(hostname, port);
+
+ // Mock DNS cache
+ Mockito.when(mockDnsCache.get(hostname)).thenReturn(resolvedIp);
+
+ // Create client
+ BackendServiceClient client = Deencapsulation.invoke(proxy,
"getProxy", address);
+ Assert.assertNotNull(client);
+
+ // Verify serviceMap contains the client
+ Map<TNetworkAddress, Object> serviceMap =
Deencapsulation.getField(proxy, "serviceMap");
+ Assert.assertEquals(1, serviceMap.size());
+
+ // Remove proxy
+ proxy.removeProxy(address);
+
+ // Verify serviceMap is now empty
+ Assert.assertEquals(0, serviceMap.size());
+
+ // Note: In a real test, you would verify client.shutdown() was called
+ // This would require mocking the client creation process
+ }
+
+ /**
+ * Test that multiple different backends can coexist in the serviceMap.
+ */
+ @Test
+ public void testMultipleBackends() throws Exception {
+ String hostname1 = "backend1.example.com";
+ String hostname2 = "backend2.example.com";
+ String ip1 = "10.0.0.1";
+ String ip2 = "10.0.0.2";
+ int port = 9060;
+
+ TNetworkAddress address1 = new TNetworkAddress(hostname1, port);
+ TNetworkAddress address2 = new TNetworkAddress(hostname2, port);
+
+ // Mock DNS cache for both hostnames
+ Mockito.when(mockDnsCache.get(hostname1)).thenReturn(ip1);
+ Mockito.when(mockDnsCache.get(hostname2)).thenReturn(ip2);
+
+ // Create clients for both backends
+ BackendServiceClient client1 = Deencapsulation.invoke(proxy,
"getProxy", address1);
+ BackendServiceClient client2 = Deencapsulation.invoke(proxy,
"getProxy", address2);
+
+ Assert.assertNotNull(client1);
+ Assert.assertNotNull(client2);
+
+ // Verify serviceMap contains both clients
+ Map<TNetworkAddress, Object> serviceMap =
Deencapsulation.getField(proxy, "serviceMap");
+ Assert.assertEquals(2, serviceMap.size());
+ Assert.assertTrue(serviceMap.containsKey(address1));
+ Assert.assertTrue(serviceMap.containsKey(address2));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]