This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-1.1-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.1-lts by this push:
     new 2559414bd5 [fix](rpc) The proxy removed when rpc exception occurs is 
not an abnormal proxy (#13836) (#13943)
2559414bd5 is described below

commit 2559414bd5db267aea6fd161d92ae51b094741d6
Author: ZenoYang <cookie...@qq.com>
AuthorDate: Thu Nov 3 19:02:10 2022 +0800

    [fix](rpc) The proxy removed when rpc exception occurs is not an abnormal 
proxy (#13836) (#13943)
---
 .../main/java/org/apache/doris/qe/Coordinator.java | 35 ++++++++++++----------
 .../org/apache/doris/rpc/BackendServiceProxy.java  |  2 +-
 2 files changed, 20 insertions(+), 17 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 7353a1a529..ec8cf79cc4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -17,6 +17,8 @@
 
 package org.apache.doris.qe;
 
+import org.apache.commons.lang3.tuple.ImmutableTriple;
+import org.apache.commons.lang3.tuple.Triple;
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.DescriptorTable;
 import org.apache.doris.analysis.StorageBackend;
@@ -640,10 +642,11 @@ public class Coordinator {
             } // end for fragments
 
             // 4. send and wait fragments rpc
-            List<Pair<BackendExecStates, 
Future<InternalService.PExecPlanFragmentResult>>> futures = 
Lists.newArrayList();
+            List<Triple<BackendExecStates, BackendServiceProxy, 
Future<InternalService.PExecPlanFragmentResult>>> futures = 
Lists.newArrayList();
             for (BackendExecStates states : beToExecStates.values()) {
                 states.unsetFields();
-                futures.add(Pair.create(states, 
states.execRemoteFragmentsAsync()));
+                BackendServiceProxy proxy = BackendServiceProxy.getInstance();
+                futures.add(ImmutableTriple.of(states, proxy, 
states.execRemoteFragmentsAsync(proxy)));
             }
             waitRpc(futures, this.timeoutDeadline - 
System.currentTimeMillis(), "send fragments");
 
@@ -651,7 +654,8 @@ public class Coordinator {
                 // 5. send and wait execution start rpc
                 futures.clear();
                 for (BackendExecStates states : beToExecStates.values()) {
-                    futures.add(Pair.create(states, 
states.execPlanFragmentStartAsync()));
+                    BackendServiceProxy proxy = 
BackendServiceProxy.getInstance();
+                    futures.add(ImmutableTriple.of(states, proxy, 
states.execPlanFragmentStartAsync(proxy)));
                 }
                 waitRpc(futures, this.timeoutDeadline - 
System.currentTimeMillis(), "send execution start");
             }
@@ -662,7 +666,7 @@ public class Coordinator {
         }
     }
 
-    private void waitRpc(List<Pair<BackendExecStates, 
Future<PExecPlanFragmentResult>>> futures, long leftTimeMs,
+    private void waitRpc(List<Triple<BackendExecStates, BackendServiceProxy, 
Future<PExecPlanFragmentResult>>> futures, long leftTimeMs,
             String operation) throws RpcException, UserException {
         if (leftTimeMs <= 0) {
             throw new UserException("timeout before waiting for " + operation 
+ " RPC. Elapse(sec): " + (
@@ -670,24 +674,24 @@ public class Coordinator {
         }
         
         long timeoutMs = Math.min(leftTimeMs, 
Config.remote_fragment_exec_timeout_ms);
-        for (Pair<BackendExecStates, Future<PExecPlanFragmentResult>> pair : 
futures) {
+        for (Triple<BackendExecStates, BackendServiceProxy, 
Future<PExecPlanFragmentResult>> triple : futures) {
             TStatusCode code;
             String errMsg = null;
             Exception exception = null;
             try {
-                PExecPlanFragmentResult result = pair.second.get(timeoutMs, 
TimeUnit.MILLISECONDS);
+                PExecPlanFragmentResult result = 
triple.getRight().get(timeoutMs, TimeUnit.MILLISECONDS);
                 code = 
TStatusCode.findByValue(result.getStatus().getStatusCode());
                 if (code != TStatusCode.OK) {
                     if (!result.getStatus().getErrorMsgsList().isEmpty()) {
                         errMsg = result.getStatus().getErrorMsgsList().get(0);
                     } else {
-                        errMsg = operation + " failed. backend id: " + 
pair.first.beId;
+                        errMsg = operation + " failed. backend id: " + 
triple.getLeft().beId;
                     }
                 }
             } catch (ExecutionException e) {
                 exception = e;
                 code = TStatusCode.THRIFT_RPC_ERROR;
-                
BackendServiceProxy.getInstance().removeProxy(pair.first.brpcAddr);
+                triple.getMiddle().removeProxy(triple.getLeft().brpcAddr);
             } catch (InterruptedException e) {
                 exception = e;
                 code = TStatusCode.INTERNAL_ERROR;
@@ -705,10 +709,10 @@ public class Coordinator {
                 cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
                 switch (code) {
                     case TIMEOUT:
-                        throw new RpcException(pair.first.brpcAddr.hostname, 
errMsg, exception);
+                        throw new 
RpcException(triple.getLeft().brpcAddr.hostname, errMsg, exception);
                     case THRIFT_RPC_ERROR:
-                        SimpleScheduler.addToBlacklist(pair.first.beId, 
errMsg);
-                        throw new RpcException(pair.first.brpcAddr.hostname, 
errMsg, exception);
+                        SimpleScheduler.addToBlacklist(triple.getLeft().beId, 
errMsg);
+                        throw new 
RpcException(triple.getLeft().brpcAddr.hostname, errMsg, exception);
                     default:
                         throw new UserException(errMsg, exception);
                 }
@@ -2121,15 +2125,14 @@ public class Coordinator {
             }
         }
 
-        public Future<InternalService.PExecPlanFragmentResult> 
execRemoteFragmentsAsync() throws TException {
+        public Future<InternalService.PExecPlanFragmentResult> 
execRemoteFragmentsAsync(BackendServiceProxy proxy) throws TException {
             try {
                 TExecPlanFragmentParamsList paramsList = new 
TExecPlanFragmentParamsList();
                 for (BackendExecState state : states) {
                     state.initiated = true;
                     paramsList.addToParamsList(state.rpcParams);
                 }
-                return BackendServiceProxy.getInstance()
-                        .execPlanFragmentsAsync(brpcAddr, paramsList, 
twoPhaseExecution);
+                return proxy.execPlanFragmentsAsync(brpcAddr, paramsList, 
twoPhaseExecution);
             } catch (RpcException e) {
                 // DO NOT throw exception here, return a complete future with 
error code,
                 // so that the following logic will cancel the fragment.
@@ -2137,12 +2140,12 @@ public class Coordinator {
             }
         }
 
-        public Future<InternalService.PExecPlanFragmentResult> 
execPlanFragmentStartAsync() throws TException {
+        public Future<InternalService.PExecPlanFragmentResult> 
execPlanFragmentStartAsync(BackendServiceProxy proxy) throws TException {
             try {
                 PExecPlanFragmentStartRequest.Builder builder = 
PExecPlanFragmentStartRequest.newBuilder();
                 PUniqueId qid = 
PUniqueId.newBuilder().setHi(queryId.hi).setLo(queryId.lo).build();
                 builder.setQueryId(qid);
-                return 
BackendServiceProxy.getInstance().execPlanFragmentStartAsync(brpcAddr, 
builder.build());
+                return proxy.execPlanFragmentStartAsync(brpcAddr, 
builder.build());
             } catch (RpcException e) {
                 // DO NOT throw exception here, return a complete future with 
error code,
                 // so that the following logic will cancel the fragment.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java 
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index f9f8a83ce9..20eaba1a9a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -64,7 +64,7 @@ public class BackendServiceProxy {
         }
 
         static BackendServiceProxy get() {
-            return proxies[count.addAndGet(1) % PROXY_NUM];
+            return proxies[Math.abs(count.addAndGet(1) % PROXY_NUM)];
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to