This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-1.1-lts in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.1-lts by this push: new 2559414bd5 [fix](rpc) The proxy removed when rpc exception occurs is not an abnormal proxy (#13836) (#13943) 2559414bd5 is described below commit 2559414bd5db267aea6fd161d92ae51b094741d6 Author: ZenoYang <cookie...@qq.com> AuthorDate: Thu Nov 3 19:02:10 2022 +0800 [fix](rpc) The proxy removed when rpc exception occurs is not an abnormal proxy (#13836) (#13943) --- .../main/java/org/apache/doris/qe/Coordinator.java | 35 ++++++++++++---------- .../org/apache/doris/rpc/BackendServiceProxy.java | 2 +- 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 7353a1a529..ec8cf79cc4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -17,6 +17,8 @@ package org.apache.doris.qe; +import org.apache.commons.lang3.tuple.ImmutableTriple; +import org.apache.commons.lang3.tuple.Triple; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.DescriptorTable; import org.apache.doris.analysis.StorageBackend; @@ -640,10 +642,11 @@ public class Coordinator { } // end for fragments // 4. send and wait fragments rpc - List<Pair<BackendExecStates, Future<InternalService.PExecPlanFragmentResult>>> futures = Lists.newArrayList(); + List<Triple<BackendExecStates, BackendServiceProxy, Future<InternalService.PExecPlanFragmentResult>>> futures = Lists.newArrayList(); for (BackendExecStates states : beToExecStates.values()) { states.unsetFields(); - futures.add(Pair.create(states, states.execRemoteFragmentsAsync())); + BackendServiceProxy proxy = BackendServiceProxy.getInstance(); + futures.add(ImmutableTriple.of(states, proxy, states.execRemoteFragmentsAsync(proxy))); } waitRpc(futures, this.timeoutDeadline - System.currentTimeMillis(), "send fragments"); @@ -651,7 +654,8 @@ public class Coordinator { // 5. send and wait execution start rpc futures.clear(); for (BackendExecStates states : beToExecStates.values()) { - futures.add(Pair.create(states, states.execPlanFragmentStartAsync())); + BackendServiceProxy proxy = BackendServiceProxy.getInstance(); + futures.add(ImmutableTriple.of(states, proxy, states.execPlanFragmentStartAsync(proxy))); } waitRpc(futures, this.timeoutDeadline - System.currentTimeMillis(), "send execution start"); } @@ -662,7 +666,7 @@ public class Coordinator { } } - private void waitRpc(List<Pair<BackendExecStates, Future<PExecPlanFragmentResult>>> futures, long leftTimeMs, + private void waitRpc(List<Triple<BackendExecStates, BackendServiceProxy, Future<PExecPlanFragmentResult>>> futures, long leftTimeMs, String operation) throws RpcException, UserException { if (leftTimeMs <= 0) { throw new UserException("timeout before waiting for " + operation + " RPC. Elapse(sec): " + ( @@ -670,24 +674,24 @@ public class Coordinator { } long timeoutMs = Math.min(leftTimeMs, Config.remote_fragment_exec_timeout_ms); - for (Pair<BackendExecStates, Future<PExecPlanFragmentResult>> pair : futures) { + for (Triple<BackendExecStates, BackendServiceProxy, Future<PExecPlanFragmentResult>> triple : futures) { TStatusCode code; String errMsg = null; Exception exception = null; try { - PExecPlanFragmentResult result = pair.second.get(timeoutMs, TimeUnit.MILLISECONDS); + PExecPlanFragmentResult result = triple.getRight().get(timeoutMs, TimeUnit.MILLISECONDS); code = TStatusCode.findByValue(result.getStatus().getStatusCode()); if (code != TStatusCode.OK) { if (!result.getStatus().getErrorMsgsList().isEmpty()) { errMsg = result.getStatus().getErrorMsgsList().get(0); } else { - errMsg = operation + " failed. backend id: " + pair.first.beId; + errMsg = operation + " failed. backend id: " + triple.getLeft().beId; } } } catch (ExecutionException e) { exception = e; code = TStatusCode.THRIFT_RPC_ERROR; - BackendServiceProxy.getInstance().removeProxy(pair.first.brpcAddr); + triple.getMiddle().removeProxy(triple.getLeft().brpcAddr); } catch (InterruptedException e) { exception = e; code = TStatusCode.INTERNAL_ERROR; @@ -705,10 +709,10 @@ public class Coordinator { cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR); switch (code) { case TIMEOUT: - throw new RpcException(pair.first.brpcAddr.hostname, errMsg, exception); + throw new RpcException(triple.getLeft().brpcAddr.hostname, errMsg, exception); case THRIFT_RPC_ERROR: - SimpleScheduler.addToBlacklist(pair.first.beId, errMsg); - throw new RpcException(pair.first.brpcAddr.hostname, errMsg, exception); + SimpleScheduler.addToBlacklist(triple.getLeft().beId, errMsg); + throw new RpcException(triple.getLeft().brpcAddr.hostname, errMsg, exception); default: throw new UserException(errMsg, exception); } @@ -2121,15 +2125,14 @@ public class Coordinator { } } - public Future<InternalService.PExecPlanFragmentResult> execRemoteFragmentsAsync() throws TException { + public Future<InternalService.PExecPlanFragmentResult> execRemoteFragmentsAsync(BackendServiceProxy proxy) throws TException { try { TExecPlanFragmentParamsList paramsList = new TExecPlanFragmentParamsList(); for (BackendExecState state : states) { state.initiated = true; paramsList.addToParamsList(state.rpcParams); } - return BackendServiceProxy.getInstance() - .execPlanFragmentsAsync(brpcAddr, paramsList, twoPhaseExecution); + return proxy.execPlanFragmentsAsync(brpcAddr, paramsList, twoPhaseExecution); } catch (RpcException e) { // DO NOT throw exception here, return a complete future with error code, // so that the following logic will cancel the fragment. @@ -2137,12 +2140,12 @@ public class Coordinator { } } - public Future<InternalService.PExecPlanFragmentResult> execPlanFragmentStartAsync() throws TException { + public Future<InternalService.PExecPlanFragmentResult> execPlanFragmentStartAsync(BackendServiceProxy proxy) throws TException { try { PExecPlanFragmentStartRequest.Builder builder = PExecPlanFragmentStartRequest.newBuilder(); PUniqueId qid = PUniqueId.newBuilder().setHi(queryId.hi).setLo(queryId.lo).build(); builder.setQueryId(qid); - return BackendServiceProxy.getInstance().execPlanFragmentStartAsync(brpcAddr, builder.build()); + return proxy.execPlanFragmentStartAsync(brpcAddr, builder.build()); } catch (RpcException e) { // DO NOT throw exception here, return a complete future with error code, // so that the following logic will cancel the fragment. diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index f9f8a83ce9..20eaba1a9a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -64,7 +64,7 @@ public class BackendServiceProxy { } static BackendServiceProxy get() { - return proxies[count.addAndGet(1) % PROXY_NUM]; + return proxies[Math.abs(count.addAndGet(1) % PROXY_NUM)]; } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org