This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/dev-1.1.2 by this push:
     new faf2d9097c use pooled stub to call rpc on be instead of one stub 
(#12459)
faf2d9097c is described below

commit faf2d9097cd560125e390d76899cc1ef1b3e3a77
Author: Yongqiang YANG <98214048+dataroar...@users.noreply.github.com>
AuthorDate: Thu Sep 8 14:39:16 2022 +0800

    use pooled stub to call rpc on be instead of one stub (#12459)
    
    * use pooled stub to call rpc on be instead of one stub
    
    A channel is closed when a timedout or exception happens, if only
    one stub is used, then all query would fail.
    
    If we dont close the channel, sometimes grpc-java stuck without sending
    any rpc.
    
    * enable retry on grpc to be and keep connection without call
---
 .../org/apache/doris/rpc/BackendServiceClient.java    |  3 ++-
 .../org/apache/doris/rpc/BackendServiceProxy.java     | 19 ++++++++++++++++---
 2 files changed, 18 insertions(+), 4 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java 
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
index 82bd1508bd..704190c7c7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
@@ -34,7 +34,7 @@ import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
 public class BackendServiceClient {
     public static final Logger LOG = 
LogManager.getLogger(BackendServiceClient.class);
 
-    private static final int MAX_RETRY_NUM = 0;
+    private static final int MAX_RETRY_NUM = 10;
     private final TNetworkAddress address;
     private final PBackendServiceGrpc.PBackendServiceFutureStub stub;
     private final PBackendServiceGrpc.PBackendServiceBlockingStub blockingStub;
@@ -44,6 +44,7 @@ public class BackendServiceClient {
         this.address = address;
         channel = NettyChannelBuilder.forAddress(address.getHostname(), 
address.getPort())
                 .flowControlWindow(Config.grpc_max_message_size_bytes)
+                .keepAliveWithoutCalls(true)
                 
.maxInboundMessageSize(Config.grpc_max_message_size_bytes).enableRetry().maxRetryAttempts(MAX_RETRY_NUM)
                 .usePlaintext().build();
         stub = PBackendServiceGrpc.newFutureStub(channel);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java 
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index 95148aa8e1..f9f8a83ce9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -38,6 +38,7 @@ import org.apache.thrift.protocol.TCompactProtocol;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
 
 public class BackendServiceProxy {
@@ -51,12 +52,24 @@ public class BackendServiceProxy {
         serviceMap = Maps.newConcurrentMap();
     }
 
-    private static class SingletonHolder {
-        private static final BackendServiceProxy INSTANCE = new 
BackendServiceProxy();
+    private static class Holder {
+        private static final int PROXY_NUM = 20;
+        private static BackendServiceProxy[] proxies = new 
BackendServiceProxy[PROXY_NUM];
+        private static AtomicInteger count = new AtomicInteger();
+
+        static {
+            for (int i = 0; i < proxies.length; i++) {
+                proxies[i] = new BackendServiceProxy();
+            }
+        }
+
+        static BackendServiceProxy get() {
+            return proxies[count.addAndGet(1) % PROXY_NUM];
+        }
     }
 
     public static BackendServiceProxy getInstance() {
-        return SingletonHolder.INSTANCE;
+        return Holder.get();
     }
 
     public void removeProxy(TNetworkAddress address) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to