This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 10d9625a8eb3ecb2f61293dff029c411539ae4af Author: walter <[email protected]> AuthorDate: Mon Oct 23 11:32:24 2023 +0800 [fix](rpc) Rebuild failed channel to avoid connection refused (#25688) --- .../org/apache/doris/rpc/BackendServiceClient.java | 9 +++++++++ .../org/apache/doris/rpc/BackendServiceProxy.java | 19 +++++++++++++++++-- 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java index 8df33927fd1..e4ece51146b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java @@ -27,6 +27,7 @@ import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; import io.grpc.ClientInterceptor; +import io.grpc.ConnectivityState; import io.grpc.ForwardingClientCall; import io.grpc.ManagedChannel; import io.grpc.Metadata; @@ -64,6 +65,14 @@ public class BackendServiceClient { execPlanTimeout = Config.remote_fragment_exec_timeout_ms + 5000; } + // Is the underlying channel in a normal state? (That means the RPC call will not fail immediately) + public boolean isNormalState() { + ConnectivityState state = channel.getState(false); + return state == ConnectivityState.CONNECTING + || state == ConnectivityState.IDLE + || state == ConnectivityState.READY; + } + public Future<InternalService.PExecPlanFragmentResult> execPlanFragmentAsync( InternalService.PExecPlanFragmentRequest request) { return stub.withDeadlineAfter(execPlanTimeout, TimeUnit.MILLISECONDS) diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index 5fc3ef2815c..9b5c491df69 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -109,18 +109,30 @@ public class BackendServiceProxy { private BackendServiceClient getProxy(TNetworkAddress address) throws UnknownHostException { String realIp = NetUtils.getIpByHost(address.getHostname()); BackendServiceClientExtIp serviceClientExtIp = serviceMap.get(address); - if (serviceClientExtIp != null && serviceClientExtIp.realIp.equals(realIp)) { + if (serviceClientExtIp != null && serviceClientExtIp.realIp.equals(realIp) + && serviceClientExtIp.client.isNormalState()) { return serviceClientExtIp.client; } + // not exist, create one and return. + BackendServiceClient removedClient = null; lock.lock(); try { serviceClientExtIp = serviceMap.get(address); if (serviceClientExtIp != null && !serviceClientExtIp.realIp.equals(realIp)) { LOG.warn("Cached ip changed ,before ip: {}, curIp: {}", serviceClientExtIp.realIp, realIp); serviceMap.remove(address); + removedClient = serviceClientExtIp.client; + serviceClientExtIp = null; + } + if (serviceClientExtIp != null && !serviceClientExtIp.client.isNormalState()) { + // At this point we cannot judge the progress of reconnecting the underlying channel. + // In the worst case, it may take two minutes. But we can't stand the connection refused + // for two minutes, so rebuild the channel directly. + serviceMap.remove(address); + removedClient = serviceClientExtIp.client; + serviceClientExtIp = null; } - serviceClientExtIp = serviceMap.get(address); if (serviceClientExtIp == null) { BackendServiceClient client = new BackendServiceClient(address, grpcThreadPool); serviceMap.put(address, new BackendServiceClientExtIp(realIp, client)); @@ -128,6 +140,9 @@ public class BackendServiceProxy { return serviceMap.get(address).client; } finally { lock.unlock(); + if (removedClient != null) { + removedClient.shutdown(); + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
