This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 5dfc5d2c77c [enhancement](querycancel) print detail message when query 
is cancelled (#38859)
5dfc5d2c77c is described below

commit 5dfc5d2c77ce0d5feaf1a67abd2669407ed19ac8
Author: yiguolei <676222...@qq.com>
AuthorDate: Mon Aug 5 14:47:03 2024 +0800

    [enhancement](querycancel) print detail message when query is cancelled 
(#38859)
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
    
    ---------
    
    Co-authored-by: yiguolei <yiguo...@gmail.com>
---
 .../main/java/org/apache/doris/common/Status.java  |  7 ++++
 .../java/org/apache/doris/qe/CoordInterface.java   |  2 +-
 .../main/java/org/apache/doris/qe/Coordinator.java | 43 +++++++++++++---------
 .../java/org/apache/doris/qe/PointQueryExec.java   |  2 +-
 .../org/apache/doris/qe/PointQueryExecutor.java    |  2 +-
 .../org/apache/doris/qe/QueryCancelWorker.java     |  6 ++-
 .../java/org/apache/doris/qe/StmtExecutor.java     | 12 ++++--
 7 files changed, 47 insertions(+), 27 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Status.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/Status.java
index 18852d8c04c..638c1123820 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Status.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Status.java
@@ -21,6 +21,8 @@ import org.apache.doris.proto.Types.PStatus;
 import org.apache.doris.thrift.TStatus;
 import org.apache.doris.thrift.TStatusCode;
 
+import org.apache.logging.log4j.message.ParameterizedMessage;
+
 public class Status {
     public static final Status OK = new Status();
     public static final Status CANCELLED = new Status(TStatusCode.CANCELLED, 
"Cancelled");
@@ -43,6 +45,11 @@ public class Status {
         this.errorMsg = errorMsg;
     }
 
+    public Status(TStatusCode code, final String errorMsg, final 
Object...params) {
+        this.errorCode = code;
+        this.errorMsg = ParameterizedMessage.format(errorMsg, params);
+    }
+
     public Status(final TStatus status) {
         this.errorCode = status.status_code;
         if (status.isSetErrorMsgs()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordInterface.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordInterface.java
index 5718e68c6b0..baf6d922e4e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordInterface.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordInterface.java
@@ -25,7 +25,7 @@ public interface CoordInterface {
 
     public RowBatch getNext() throws Exception;
 
-    public void cancel(Types.PPlanFragmentCancelReason cancelReason);
+    public void cancel(Types.PPlanFragmentCancelReason cancelReason, String 
errorMsg);
 
     // When call exec or get next data finished, should call this method to 
release
     // some resource.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 9e22f853c73..dac9be06b9f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1443,7 +1443,7 @@ public class Coordinator implements CoordInterface {
     // We use a very conservative cancel strategy.
     // 0. If backends has zero process epoch, do not cancel. Zero process 
epoch usually arises in cluster upgrading.
     // 1. If process epoch is same, do not cancel. Means backends does not 
restart or die.
-    public boolean shouldCancel(List<Backend> currentBackends) {
+    public Status shouldCancel(List<Backend> currentBackends) {
         Map<Long, Backend> curBeMap = Maps.newHashMap();
         for (Backend be : currentBackends) {
             curBeMap.put(be.getId(), be);
@@ -1456,21 +1456,24 @@ public class Coordinator implements CoordInterface {
                 for (PipelineExecContext pipelineExecContext : 
pipelineExecContexts.values()) {
                     Backend be = 
curBeMap.get(pipelineExecContext.backend.getId());
                     if (be == null || !be.isAlive()) {
-                        LOG.warn("Backend {} not exists or dead, query {} 
should be cancelled",
+                        Status errorStatus = new Status(TStatusCode.CANCELLED,
+                                "Backend {} not exists or dead, query {} 
should be cancelled",
                                 pipelineExecContext.backend.toString(), 
DebugUtil.printId(queryId));
-                        return true;
+                        LOG.warn(errorStatus.getErrorMsg());
+                        return errorStatus;
                     }
 
                     // Backend process epoch changed, indicates that this be 
restarts, query should be cancelled.
                     // Check zero since during upgrading, older version oplog 
will not persistent be start time
                     // so newer version follower will get zero epoch when 
replaying oplog or snapshot
                     if (pipelineExecContext.beProcessEpoch != 
be.getProcessEpoch() && be.getProcessEpoch() != 0) {
-                        LOG.warn("Backend process epoch changed, previous {} 
now {}, "
-                                        + "means this be has already 
restarted, should cancel this coordinator,"
-                                        + " query id {}",
-                                        pipelineExecContext.beProcessEpoch, 
be.getProcessEpoch(),
-                                        DebugUtil.printId(queryId));
-                        return true;
+                        Status errorStatus = new Status(TStatusCode.CANCELLED,
+                                "Backend process epoch changed, previous {} 
now {}, "
+                                + "means this be has already restarted, should 
cancel this coordinator,"
+                                + "query id {}", 
pipelineExecContext.beProcessEpoch, be.getProcessEpoch(),
+                                DebugUtil.printId(queryId));
+                        LOG.warn(errorStatus.getErrorMsg());
+                        return errorStatus;
                     } else if (be.getProcessEpoch() == 0) {
                         LOG.warn("Backend {} has zero process epoch, maybe we 
are upgrading cluster?",
                                 be.toString());
@@ -1481,23 +1484,27 @@ public class Coordinator implements CoordInterface {
                 for (BackendExecStates beExecState : beToExecStates.values()) {
                     Backend be = curBeMap.get(beExecState.beId);
                     if (be == null || !be.isAlive()) {
-                        LOG.warn("Backend {} not exists or dead, query {} 
should be cancelled.",
+                        Status errorStatus = new Status(TStatusCode.CANCELLED,
+                                "Backend {} not exists or dead, query {} 
should be cancelled.",
                                 beExecState.beId, DebugUtil.printId(queryId));
-                        return true;
+                        LOG.warn(errorStatus.getErrorMsg());
+                        return errorStatus;
                     }
 
                     if (beExecState.beProcessEpoch != be.getProcessEpoch() && 
be.getProcessEpoch() != 0) {
-                        LOG.warn("Process epoch changed, previous {} now {}, 
means this be has already restarted, "
-                                        + "should cancel this coordinator, 
query id {}",
+                        Status errorStatus = new Status(TStatusCode.CANCELLED,
+                                "Process epoch changed, previous {} now {}, 
means this be has already restarted,"
+                                + "should cancel this coordinator, query id 
{}",
                                 beExecState.beProcessEpoch, 
be.getProcessEpoch(), DebugUtil.printId(queryId));
-                        return true;
+                        LOG.warn(errorStatus.getErrorMsg());
+                        return errorStatus;
                     } else if (be.getProcessEpoch() == 0) {
                         LOG.warn("Backend {} has zero process epoch, maybe we 
are upgrading cluster?", be.toString());
                     }
                 }
             }
 
-            return false;
+            return Status.OK;
         } finally {
             unlock();
         }
@@ -1507,14 +1514,14 @@ public class Coordinator implements CoordInterface {
     // fragment,
     // if any, as well as all plan fragments on remote nodes.
     public void cancel() {
-        cancel(Types.PPlanFragmentCancelReason.USER_CANCEL);
+        cancel(Types.PPlanFragmentCancelReason.USER_CANCEL, "user cancel");
         if (queueToken != null) {
             queueToken.signalForCancel();
         }
     }
 
     @Override
-    public void cancel(Types.PPlanFragmentCancelReason cancelReason) {
+    public void cancel(Types.PPlanFragmentCancelReason cancelReason, String 
errorMsg) {
         for (ScanNode scanNode : scanNodes) {
             scanNode.stop();
         }
@@ -1527,7 +1534,7 @@ public class Coordinator implements CoordInterface {
                         DebugUtil.printId(queryId), queryStatus.toString(),
                         new Exception("cancel failed"));
             } else {
-                queryStatus.updateStatus(TStatusCode.CANCELLED, "cancelled");
+                queryStatus.updateStatus(TStatusCode.CANCELLED, errorMsg);
             }
             LOG.warn("Cancel execution of query {}, this is a outside invoke, 
cancelReason {}",
                     DebugUtil.printId(queryId), cancelReason.toString());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
index f31251dae17..b4812280896 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
@@ -169,7 +169,7 @@ public class PointQueryExec implements CoordInterface {
     }
 
     @Override
-    public void cancel(Types.PPlanFragmentCancelReason cancelReason) {
+    public void cancel(Types.PPlanFragmentCancelReason cancelReason, String 
errorMsg) {
         // Do nothing
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExecutor.java
index 572367fa33b..b0af8431471 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExecutor.java
@@ -163,7 +163,7 @@ public class PointQueryExecutor implements CoordInterface {
     }
 
     @Override
-    public void cancel(Types.PPlanFragmentCancelReason cancelReason) {
+    public void cancel(Types.PPlanFragmentCancelReason cancelReason, String 
errorMsg) {
         // Do nothing
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/QueryCancelWorker.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/QueryCancelWorker.java
index 500cad0f28d..cf1b4fec286 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QueryCancelWorker.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QueryCancelWorker.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.qe;
 
+import org.apache.doris.common.Status;
 import org.apache.doris.common.util.MasterDaemon;
 import org.apache.doris.proto.Types;
 import org.apache.doris.system.Backend;
@@ -36,10 +37,11 @@ public class QueryCancelWorker extends MasterDaemon {
         List<Backend> allBackends = systemInfoService.getAllBackends();
 
         for (Coordinator co : QeProcessorImpl.INSTANCE.getAllCoordinators()) {
-            if (co.shouldCancel(allBackends)) {
+            Status status = co.shouldCancel(allBackends);
+            if (!status.ok()) {
                 // TODO(zhiqiang): We need more clear cancel message, so that 
user can figure out what happened
                 //  by searching log.
-                co.cancel(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
+                co.cancel(Types.PPlanFragmentCancelReason.INTERNAL_ERROR, 
status.getErrorMsg());
             }
         }
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 30dbedbf3de..dbab3ab3957 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -103,6 +103,7 @@ import org.apache.doris.common.FormatOptions;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.NereidsException;
 import org.apache.doris.common.NereidsSqlCacheManager;
+import org.apache.doris.common.Status;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.Version;
 import org.apache.doris.common.profile.Profile;
@@ -1453,7 +1454,7 @@ public class StmtExecutor {
     public void cancel(Types.PPlanFragmentCancelReason cancelReason) {
         Coordinator coordRef = coord;
         if (coordRef != null) {
-            coordRef.cancel(cancelReason);
+            coordRef.cancel(cancelReason, "");
         }
         if (mysqlLoadId != null) {
             
Env.getCurrentEnv().getLoadManager().getMysqlLoadManager().cancelMySqlLoad(mysqlLoadId);
@@ -1874,8 +1875,11 @@ public class StmtExecutor {
             // notify all be cancel running fragment
             // in some case may block all fragment handle threads
             // details see issue https://github.com/apache/doris/issues/16203
-            LOG.warn("cancel fragment query_id:{} cause {}", 
DebugUtil.printId(context.queryId()), e.getMessage());
-            coordBase.cancel(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
+            Status internalErrorSt = new Status(TStatusCode.INTERNAL_ERROR,
+                    "cancel fragment query_id:{} cause {}",
+                    DebugUtil.printId(context.queryId()), e.getMessage());
+            LOG.warn(internalErrorSt.getErrorMsg());
+            coordBase.cancel(Types.PPlanFragmentCancelReason.INTERNAL_ERROR, 
internalErrorSt.getErrorMsg());
             throw e;
         } finally {
             coordBase.close();
@@ -2257,7 +2261,7 @@ public class StmtExecutor {
                 }
                 boolean notTimeout = coord.join(execTimeout);
                 if (!coord.isDone()) {
-                    coord.cancel(Types.PPlanFragmentCancelReason.TIMEOUT);
+                    coord.cancel(Types.PPlanFragmentCancelReason.TIMEOUT, 
"timeout");
                     if (notTimeout) {
                         errMsg = coord.getExecStatus().getErrorMsg();
                         ErrorReport.reportDdlException("There exists unhealthy 
backend. "


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to