This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch dev-1.1.2 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.1.2 by this push: new faf2d9097c use pooled stub to call rpc on be instead of one stub (#12459) faf2d9097c is described below commit faf2d9097cd560125e390d76899cc1ef1b3e3a77 Author: Yongqiang YANG <98214048+dataroar...@users.noreply.github.com> AuthorDate: Thu Sep 8 14:39:16 2022 +0800 use pooled stub to call rpc on be instead of one stub (#12459) * use pooled stub to call rpc on be instead of one stub A channel is closed when a timedout or exception happens, if only one stub is used, then all query would fail. If we dont close the channel, sometimes grpc-java stuck without sending any rpc. * enable retry on grpc to be and keep connection without call --- .../org/apache/doris/rpc/BackendServiceClient.java | 3 ++- .../org/apache/doris/rpc/BackendServiceProxy.java | 19 ++++++++++++++++--- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java index 82bd1508bd..704190c7c7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java @@ -34,7 +34,7 @@ import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; public class BackendServiceClient { public static final Logger LOG = LogManager.getLogger(BackendServiceClient.class); - private static final int MAX_RETRY_NUM = 0; + private static final int MAX_RETRY_NUM = 10; private final TNetworkAddress address; private final PBackendServiceGrpc.PBackendServiceFutureStub stub; private final PBackendServiceGrpc.PBackendServiceBlockingStub blockingStub; @@ -44,6 +44,7 @@ public class BackendServiceClient { this.address = address; channel = NettyChannelBuilder.forAddress(address.getHostname(), address.getPort()) .flowControlWindow(Config.grpc_max_message_size_bytes) + .keepAliveWithoutCalls(true) .maxInboundMessageSize(Config.grpc_max_message_size_bytes).enableRetry().maxRetryAttempts(MAX_RETRY_NUM) .usePlaintext().build(); stub = PBackendServiceGrpc.newFutureStub(channel); diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index 95148aa8e1..f9f8a83ce9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -38,6 +38,7 @@ import org.apache.thrift.protocol.TCompactProtocol; import java.util.List; import java.util.Map; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; public class BackendServiceProxy { @@ -51,12 +52,24 @@ public class BackendServiceProxy { serviceMap = Maps.newConcurrentMap(); } - private static class SingletonHolder { - private static final BackendServiceProxy INSTANCE = new BackendServiceProxy(); + private static class Holder { + private static final int PROXY_NUM = 20; + private static BackendServiceProxy[] proxies = new BackendServiceProxy[PROXY_NUM]; + private static AtomicInteger count = new AtomicInteger(); + + static { + for (int i = 0; i < proxies.length; i++) { + proxies[i] = new BackendServiceProxy(); + } + } + + static BackendServiceProxy get() { + return proxies[count.addAndGet(1) % PROXY_NUM]; + } } public static BackendServiceProxy getInstance() { - return SingletonHolder.INSTANCE; + return Holder.get(); } public void removeProxy(TNetworkAddress address) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org