This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit ad2f7fc3165e5afc0498cd3c69f4d509354386db Author: yiguolei <676222...@qq.com> AuthorDate: Wed Mar 6 21:38:14 2024 +0800 [bugfix](coordinator) should use fragment id not profile fragment id to cancel fragment (#31852) --- .../main/java/org/apache/doris/qe/Coordinator.java | 70 ++-------------------- .../org/apache/doris/rpc/BackendServiceProxy.java | 6 +- 2 files changed, 10 insertions(+), 66 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index b91fcd15e3c..64f32ed2e24 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -136,7 +136,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Objects; import java.util.Optional; import java.util.Random; import java.util.Set; @@ -1253,7 +1252,7 @@ public class Coordinator implements CoordInterface { } } - private void updateStatus(Status status, TUniqueId instanceId) { + private void updateStatus(Status status) { lock.lock(); try { // The query is done and we are just waiting for remote fragments to clean up. @@ -1272,10 +1271,6 @@ public class Coordinator implements CoordInterface { } queryStatus.setStatus(status); - LOG.warn("one instance report fail throw updateStatus(), need cancel. job id: {}," - + " query id: {}, instance id: {}, error message: {}", - jobId, DebugUtil.printId(queryId), instanceId != null ? DebugUtil.printId(instanceId) : "NaN", - status.getErrorMsg()); if (status.getErrorCode() == TStatusCode.TIMEOUT) { cancelInternal(Types.PPlanFragmentCancelReason.TIMEOUT); } else { @@ -1286,38 +1281,6 @@ public class Coordinator implements CoordInterface { } } - private void updateStatus(Status status, long backendId) { - lock.lock(); - try { - // The query is done and we are just waiting for remote fragments to clean up. - // Ignore their cancelled updates. - if (returnedAllResults && status.isCancelled()) { - return; - } - // nothing to update - if (status.ok()) { - return; - } - - // don't override an error status; also, cancellation has already started - if (!queryStatus.ok()) { - return; - } - - queryStatus.setStatus(status); - LOG.warn("one instance report fail throw updateStatus(), need cancel. job id: {}," - + " query id: {}, error message: {}", - jobId, DebugUtil.printId(queryId), status.getErrorMsg()); - if (status.getErrorCode() == TStatusCode.TIMEOUT) { - cancelInternal(Types.PPlanFragmentCancelReason.TIMEOUT, backendId); - } else { - cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR, backendId); - } - } finally { - lock.unlock(); - } - } - @Override public RowBatch getNext() throws Exception { if (receiver == null) { @@ -1332,7 +1295,7 @@ public class Coordinator implements CoordInterface { DebugUtil.printId(queryId), status.toString()); } - updateStatus(status, null /* no instance id */); + updateStatus(status); Status copyStatus = null; lock(); @@ -1487,18 +1450,6 @@ public class Coordinator implements CoordInterface { executionProfile.onCancel(); } - private void cancelInternal(Types.PPlanFragmentCancelReason cancelReason, long backendId) { - if (null != receiver) { - receiver.cancel(cancelReason.toString()); - } - if (null != pointExec) { - pointExec.cancel(); - return; - } - cancelRemoteFragmentsAsync(cancelReason, backendId); - executionProfile.onCancel(); - } - private void cancelRemoteFragmentsAsync(Types.PPlanFragmentCancelReason cancelReason) { if (enablePipelineEngine) { for (PipelineExecContext ctx : pipelineExecContexts.values()) { @@ -1511,15 +1462,6 @@ public class Coordinator implements CoordInterface { } } - private void cancelRemoteFragmentsAsync(Types.PPlanFragmentCancelReason cancelReason, long backendId) { - Preconditions.checkArgument(enablePipelineXEngine); - for (PipelineExecContext ctx : pipelineExecContexts.values()) { - if (!Objects.equals(idToBackend.get(backendId), ctx.backend)) { - ctx.cancelFragmentInstance(cancelReason); - } - } - } - private void computeFragmentExecParams() throws Exception { // fill hosts field in fragmentExecParams computeFragmentHosts(); @@ -2489,7 +2431,7 @@ public class Coordinator implements CoordInterface { LOG.warn("one instance report fail, query_id={} instance_id={}, error message: {}", DebugUtil.printId(queryId), DebugUtil.printId(params.getFragmentInstanceId()), status.getErrorMsg()); - updateStatus(status, params.backend_id); + updateStatus(status); } if (params.isSetDeltaUrls()) { updateDeltas(params.getDeltaUrls()); @@ -2546,7 +2488,7 @@ public class Coordinator implements CoordInterface { DebugUtil.printId(queryId), params.getFragmentId(), DebugUtil.printId(params.getFragmentInstanceId()), params.getBackendId(), status.getErrorMsg()); - updateStatus(status, params.getFragmentInstanceId()); + updateStatus(status); } // params.isDone() should be promised. @@ -2622,7 +2564,7 @@ public class Coordinator implements CoordInterface { LOG.warn("Instance {} of query {} report failed status, error msg: {}", DebugUtil.printId(queryId), DebugUtil.printId(params.getFragmentInstanceId()), status.getErrorMsg()); - updateStatus(status, params.getFragmentInstanceId()); + updateStatus(status); } } @@ -3336,7 +3278,7 @@ public class Coordinator implements CoordInterface { try { try { BackendServiceProxy.getInstance().cancelPipelineXPlanFragmentAsync(brpcAddress, - this.profileFragmentId, queryId, cancelReason); + this.fragmentId, queryId, cancelReason); } catch (RpcException e) { LOG.warn("cancel plan fragment get a exception, address={}:{}", brpcAddress.getHostname(), brpcAddress.getPort()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index 52350d805bb..df9a90433db 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -21,6 +21,7 @@ import org.apache.doris.common.Config; import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.util.NetUtils; import org.apache.doris.metric.MetricRepo; +import org.apache.doris.planner.PlanFragmentId; import org.apache.doris.proto.InternalService; import org.apache.doris.proto.InternalService.PExecPlanFragmentStartRequest; import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest; @@ -241,12 +242,13 @@ public class BackendServiceProxy { } public Future<InternalService.PCancelPlanFragmentResult> cancelPipelineXPlanFragmentAsync(TNetworkAddress address, - int fragmentId, TUniqueId queryId, Types.PPlanFragmentCancelReason cancelReason) throws RpcException { + PlanFragmentId fragmentId, TUniqueId queryId, + Types.PPlanFragmentCancelReason cancelReason) throws RpcException { final InternalService.PCancelPlanFragmentRequest pRequest = InternalService.PCancelPlanFragmentRequest .newBuilder() .setFinstId(Types.PUniqueId.newBuilder().setHi(0).setLo(0).build()) .setCancelReason(cancelReason) - .setFragmentId(fragmentId) + .setFragmentId(fragmentId.asInt()) .setQueryId(Types.PUniqueId.newBuilder().setHi(queryId.hi).setLo(queryId.lo).build()).build(); try { final BackendServiceClient client = getProxy(address); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org