This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit ad2f7fc3165e5afc0498cd3c69f4d509354386db
Author: yiguolei <676222...@qq.com>
AuthorDate: Wed Mar 6 21:38:14 2024 +0800

    [bugfix](coordinator) should use fragment id not profile fragment id to 
cancel fragment (#31852)
---
 .../main/java/org/apache/doris/qe/Coordinator.java | 70 ++--------------------
 .../org/apache/doris/rpc/BackendServiceProxy.java  |  6 +-
 2 files changed, 10 insertions(+), 66 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index b91fcd15e3c..64f32ed2e24 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -136,7 +136,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
@@ -1253,7 +1252,7 @@ public class Coordinator implements CoordInterface {
         }
     }
 
-    private void updateStatus(Status status, TUniqueId instanceId) {
+    private void updateStatus(Status status) {
         lock.lock();
         try {
             // The query is done and we are just waiting for remote fragments 
to clean up.
@@ -1272,10 +1271,6 @@ public class Coordinator implements CoordInterface {
             }
 
             queryStatus.setStatus(status);
-            LOG.warn("one instance report fail throw updateStatus(), need 
cancel. job id: {},"
-                            + " query id: {}, instance id: {}, error message: 
{}",
-                    jobId, DebugUtil.printId(queryId), instanceId != null ? 
DebugUtil.printId(instanceId) : "NaN",
-                    status.getErrorMsg());
             if (status.getErrorCode() == TStatusCode.TIMEOUT) {
                 cancelInternal(Types.PPlanFragmentCancelReason.TIMEOUT);
             } else {
@@ -1286,38 +1281,6 @@ public class Coordinator implements CoordInterface {
         }
     }
 
-    private void updateStatus(Status status, long backendId) {
-        lock.lock();
-        try {
-            // The query is done and we are just waiting for remote fragments 
to clean up.
-            // Ignore their cancelled updates.
-            if (returnedAllResults && status.isCancelled()) {
-                return;
-            }
-            // nothing to update
-            if (status.ok()) {
-                return;
-            }
-
-            // don't override an error status; also, cancellation has already 
started
-            if (!queryStatus.ok()) {
-                return;
-            }
-
-            queryStatus.setStatus(status);
-            LOG.warn("one instance report fail throw updateStatus(), need 
cancel. job id: {},"
-                            + " query id: {}, error message: {}",
-                    jobId, DebugUtil.printId(queryId), status.getErrorMsg());
-            if (status.getErrorCode() == TStatusCode.TIMEOUT) {
-                cancelInternal(Types.PPlanFragmentCancelReason.TIMEOUT, 
backendId);
-            } else {
-                cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR, 
backendId);
-            }
-        } finally {
-            lock.unlock();
-        }
-    }
-
     @Override
     public RowBatch getNext() throws Exception {
         if (receiver == null) {
@@ -1332,7 +1295,7 @@ public class Coordinator implements CoordInterface {
                     DebugUtil.printId(queryId), status.toString());
         }
 
-        updateStatus(status, null /* no instance id */);
+        updateStatus(status);
 
         Status copyStatus = null;
         lock();
@@ -1487,18 +1450,6 @@ public class Coordinator implements CoordInterface {
         executionProfile.onCancel();
     }
 
-    private void cancelInternal(Types.PPlanFragmentCancelReason cancelReason, 
long backendId) {
-        if (null != receiver) {
-            receiver.cancel(cancelReason.toString());
-        }
-        if (null != pointExec) {
-            pointExec.cancel();
-            return;
-        }
-        cancelRemoteFragmentsAsync(cancelReason, backendId);
-        executionProfile.onCancel();
-    }
-
     private void cancelRemoteFragmentsAsync(Types.PPlanFragmentCancelReason 
cancelReason) {
         if (enablePipelineEngine) {
             for (PipelineExecContext ctx : pipelineExecContexts.values()) {
@@ -1511,15 +1462,6 @@ public class Coordinator implements CoordInterface {
         }
     }
 
-    private void cancelRemoteFragmentsAsync(Types.PPlanFragmentCancelReason 
cancelReason, long backendId) {
-        Preconditions.checkArgument(enablePipelineXEngine);
-        for (PipelineExecContext ctx : pipelineExecContexts.values()) {
-            if (!Objects.equals(idToBackend.get(backendId), ctx.backend)) {
-                ctx.cancelFragmentInstance(cancelReason);
-            }
-        }
-    }
-
     private void computeFragmentExecParams() throws Exception {
         // fill hosts field in fragmentExecParams
         computeFragmentHosts();
@@ -2489,7 +2431,7 @@ public class Coordinator implements CoordInterface {
                 LOG.warn("one instance report fail, query_id={} 
instance_id={}, error message: {}",
                         DebugUtil.printId(queryId), 
DebugUtil.printId(params.getFragmentInstanceId()),
                         status.getErrorMsg());
-                updateStatus(status, params.backend_id);
+                updateStatus(status);
             }
             if (params.isSetDeltaUrls()) {
                 updateDeltas(params.getDeltaUrls());
@@ -2546,7 +2488,7 @@ public class Coordinator implements CoordInterface {
                         DebugUtil.printId(queryId), params.getFragmentId(),
                         DebugUtil.printId(params.getFragmentInstanceId()),
                         params.getBackendId(), status.getErrorMsg());
-                updateStatus(status, params.getFragmentInstanceId());
+                updateStatus(status);
             }
 
             // params.isDone() should be promised.
@@ -2622,7 +2564,7 @@ public class Coordinator implements CoordInterface {
                     LOG.warn("Instance {} of query {} report failed status, 
error msg: {}",
                             DebugUtil.printId(queryId), 
DebugUtil.printId(params.getFragmentInstanceId()),
                             status.getErrorMsg());
-                    updateStatus(status, params.getFragmentInstanceId());
+                    updateStatus(status);
                 }
             }
 
@@ -3336,7 +3278,7 @@ public class Coordinator implements CoordInterface {
             try {
                 try {
                     
BackendServiceProxy.getInstance().cancelPipelineXPlanFragmentAsync(brpcAddress,
-                            this.profileFragmentId, queryId, cancelReason);
+                            this.fragmentId, queryId, cancelReason);
                 } catch (RpcException e) {
                     LOG.warn("cancel plan fragment get a exception, 
address={}:{}", brpcAddress.getHostname(),
                             brpcAddress.getPort());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java 
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index 52350d805bb..df9a90433db 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -21,6 +21,7 @@ import org.apache.doris.common.Config;
 import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.common.util.NetUtils;
 import org.apache.doris.metric.MetricRepo;
+import org.apache.doris.planner.PlanFragmentId;
 import org.apache.doris.proto.InternalService;
 import org.apache.doris.proto.InternalService.PExecPlanFragmentStartRequest;
 import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest;
@@ -241,12 +242,13 @@ public class BackendServiceProxy {
     }
 
     public Future<InternalService.PCancelPlanFragmentResult> 
cancelPipelineXPlanFragmentAsync(TNetworkAddress address,
-            int fragmentId, TUniqueId queryId, Types.PPlanFragmentCancelReason 
cancelReason) throws RpcException {
+            PlanFragmentId fragmentId, TUniqueId queryId,
+            Types.PPlanFragmentCancelReason cancelReason) throws RpcException {
         final InternalService.PCancelPlanFragmentRequest pRequest = 
InternalService.PCancelPlanFragmentRequest
                 .newBuilder()
                 
.setFinstId(Types.PUniqueId.newBuilder().setHi(0).setLo(0).build())
                 .setCancelReason(cancelReason)
-                .setFragmentId(fragmentId)
+                .setFragmentId(fragmentId.asInt())
                 
.setQueryId(Types.PUniqueId.newBuilder().setHi(queryId.hi).setLo(queryId.lo).build()).build();
         try {
             final BackendServiceClient client = getProxy(address);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to