This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new abcba778ff5 [fix](cancel) Fix cancel msg on branch-2.1 (#41798)
abcba778ff5 is described below

commit abcba778ff566355bd92b8f1914a058a2ae634e1
Author: zhiqiang <seuhezhiqi...@163.com>
AuthorDate: Tue Oct 15 17:15:05 2024 +0800

    [fix](cancel) Fix cancel msg on branch-2.1 (#41798)
    
    Make sure we can tell cancel reason from:
    1. user cancel
    2. timeout
    3. others
    
    ```text
    mysql [demo]>set query_timeout=1;
    --------------
    set query_timeout=1
    --------------
    
    Query OK, 0 rows affected (0.00 sec)
    
    mysql [demo]>select sleep(5);
    --------------
    select sleep(5)
    --------------
    
    ERROR 1105 (HY000): errCode = 2, detailMessage = Timeout
    
    mysql [demo]>select sleep(5);
    --------------
    select sleep(5)
    --------------
    
    ^C^C -- sending "KILL QUERY 0" to server ...
    ^C -- query aborted
    ERROR 1105 (HY000): errCode = 2, detailMessage = cancel query by user from 
127.0.0.1:64208
    ```
---
 .../pipeline_x/pipeline_x_fragment_context.cpp     |  2 +-
 be/src/runtime/buffer_control_block.cpp            | 10 ++++----
 be/src/runtime/buffer_control_block.h              |  6 ++---
 be/src/runtime/fragment_mgr.cpp                    | 12 +++++-----
 be/src/runtime/result_buffer_mgr.cpp               |  7 +++---
 be/src/runtime/result_buffer_mgr.h                 |  2 +-
 .../httpv2/rest/manager/QueryProfileAction.java    |  2 +-
 .../doris/job/extensions/insert/InsertTask.java    |  2 +-
 .../apache/doris/job/extensions/mtmv/MTMVTask.java |  2 +-
 .../org/apache/doris/load/ExportTaskExecutor.java  |  2 +-
 .../java/org/apache/doris/load/loadv2/LoadJob.java |  2 +-
 .../commands/insert/AbstractInsertExecutor.java    |  2 +-
 .../java/org/apache/doris/qe/ConnectContext.java   |  8 +++----
 .../java/org/apache/doris/qe/ConnectScheduler.java |  4 ++--
 .../main/java/org/apache/doris/qe/Coordinator.java | 28 ++++++++++------------
 .../java/org/apache/doris/qe/ResultReceiver.java   | 11 +++++----
 .../java/org/apache/doris/qe/StmtExecutor.java     |  4 ++--
 .../WorkloadActionCancelQuery.java                 |  2 +-
 .../sessions/FlightSqlConnectContext.java          |  2 +-
 .../apache/doris/statistics/BaseAnalysisTask.java  |  2 +-
 20 files changed, 55 insertions(+), 57 deletions(-)

diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index a3dff107f1b..53dae142a6d 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -145,7 +145,7 @@ void PipelineXFragmentContext::cancel(const 
PPlanFragmentCancelReason& reason,
     LOG_INFO("PipelineXFragmentContext::cancel")
             .tag("query_id", print_id(_query_id))
             .tag("fragment_id", _fragment_id)
-            .tag("reason", reason)
+            .tag("reason", PPlanFragmentCancelReason_Name(reason))
             .tag("error message", msg);
     if (reason == PPlanFragmentCancelReason::TIMEOUT) {
         LOG(WARNING) << "PipelineXFragmentContext is cancelled due to timeout 
: " << debug_string();
diff --git a/be/src/runtime/buffer_control_block.cpp 
b/be/src/runtime/buffer_control_block.cpp
index 4b9fa57ce64..c61c98a324b 100644
--- a/be/src/runtime/buffer_control_block.cpp
+++ b/be/src/runtime/buffer_control_block.cpp
@@ -104,7 +104,7 @@ BufferControlBlock::BufferControlBlock(const TUniqueId& id, 
int buffer_size)
 }
 
 BufferControlBlock::~BufferControlBlock() {
-    cancel();
+    cancel(Status::Cancelled("Cancelled"));
 }
 
 Status BufferControlBlock::init() {
@@ -266,13 +266,13 @@ Status BufferControlBlock::close(Status exec_status) {
     return Status::OK();
 }
 
-void BufferControlBlock::cancel() {
+void BufferControlBlock::cancel(const Status& reason) {
     std::unique_lock<std::mutex> l(_lock);
     _is_cancelled = true;
     _data_removal.notify_all();
     _data_arrival.notify_all();
     for (auto& ctx : _waiting_rpc) {
-        ctx->on_failure(Status::Cancelled("Cancelled"));
+        ctx->on_failure(reason);
     }
     _waiting_rpc.clear();
 }
@@ -301,8 +301,8 @@ Status 
PipBufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch
     return Status::OK();
 }
 
-void PipBufferControlBlock::cancel() {
-    BufferControlBlock::cancel();
+void PipBufferControlBlock::cancel(const Status& reason) {
+    BufferControlBlock::cancel(reason);
     _update_dependency();
 }
 
diff --git a/be/src/runtime/buffer_control_block.h 
b/be/src/runtime/buffer_control_block.h
index b8b3f3d163e..9e991613f2e 100644
--- a/be/src/runtime/buffer_control_block.h
+++ b/be/src/runtime/buffer_control_block.h
@@ -86,8 +86,8 @@ public:
     // close buffer block, set _status to exec_status and set _is_close to 
true;
     // called because data has been read or error happened.
     Status close(Status exec_status);
-    // this is called by RPC, called from coordinator
-    virtual void cancel();
+
+    virtual void cancel(const Status& reason);
 
     [[nodiscard]] const TUniqueId& fragment_id() const { return _fragment_id; }
 
@@ -152,7 +152,7 @@ public:
 
     Status get_arrow_batch(std::shared_ptr<arrow::RecordBatch>* result) 
override;
 
-    void cancel() override;
+    void cancel(const Status& reason) override;
 
     void set_dependency(std::shared_ptr<pipeline::Dependency> 
result_sink_dependency);
 
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index acf622b4196..4696814e55d 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1249,7 +1249,7 @@ void FragmentMgr::cancel_worker() {
     clock_gettime(CLOCK_MONOTONIC, &check_invalid_query_last_timestamp);
 
     do {
-        std::vector<TUniqueId> to_cancel;
+        std::vector<TUniqueId> queries_timeout;
         std::vector<TUniqueId> queries_to_cancel;
         std::vector<TUniqueId> queries_pipeline_task_leak;
         // Fe process uuid -> set<QueryId>
@@ -1274,7 +1274,7 @@ void FragmentMgr::cancel_worker() {
             std::lock_guard<std::mutex> lock(_lock);
             for (auto& fragment_instance_itr : _fragment_instance_map) {
                 if (fragment_instance_itr.second->is_timeout(now)) {
-                    
to_cancel.push_back(fragment_instance_itr.second->fragment_instance_id());
+                    
queries_timeout.push_back(fragment_instance_itr.second->fragment_instance_id());
                 }
             }
             for (auto& pipeline_itr : _pipeline_map) {
@@ -1283,7 +1283,7 @@ void FragmentMgr::cancel_worker() {
                     
reinterpret_cast<pipeline::PipelineXFragmentContext*>(pipeline_itr.second.get())
                             ->instance_ids(ins_ids);
                     for (auto& ins_id : ins_ids) {
-                        to_cancel.push_back(ins_id);
+                        queries_timeout.push_back(ins_id);
                     }
                 } else {
                     pipeline_itr.second->clear_finished_tasks();
@@ -1393,9 +1393,9 @@ void FragmentMgr::cancel_worker() {
 
         // TODO(zhiqiang): It seems that timeout_canceled_fragment_count is
         // designed to count canceled fragment of non-pipeline query.
-        timeout_canceled_fragment_count->increment(to_cancel.size());
-        for (auto& id : to_cancel) {
-            cancel_instance(id, PPlanFragmentCancelReason::TIMEOUT);
+        timeout_canceled_fragment_count->increment(queries_timeout.size());
+        for (auto& id : queries_timeout) {
+            cancel_instance(id, PPlanFragmentCancelReason::TIMEOUT, "Query 
timeout");
             LOG(INFO) << "FragmentMgr cancel worker going to cancel timeout 
instance "
                       << print_id(id);
         }
diff --git a/be/src/runtime/result_buffer_mgr.cpp 
b/be/src/runtime/result_buffer_mgr.cpp
index 3d96c1871b9..f81c9b1094f 100644
--- a/be/src/runtime/result_buffer_mgr.cpp
+++ b/be/src/runtime/result_buffer_mgr.cpp
@@ -32,6 +32,7 @@
 
 #include "arrow/record_batch.h"
 #include "arrow/type_fwd.h"
+#include "common/status.h"
 #include "runtime/buffer_control_block.h"
 #include "util/doris_metrics.h"
 #include "util/metrics.h"
@@ -150,13 +151,13 @@ Status ResultBufferMgr::fetch_arrow_data(const TUniqueId& 
finst_id,
     return Status::OK();
 }
 
-void ResultBufferMgr::cancel(const TUniqueId& query_id) {
+void ResultBufferMgr::cancel(const TUniqueId& query_id, const Status& reason) {
     {
         std::unique_lock<std::shared_mutex> wlock(_buffer_map_lock);
         BufferMap::iterator iter = _buffer_map.find(query_id);
 
         if (_buffer_map.end() != iter) {
-            iter->second->cancel();
+            iter->second->cancel(reason);
             _buffer_map.erase(iter);
         }
     }
@@ -206,7 +207,7 @@ void ResultBufferMgr::cancel_thread() {
 
         // cancel query
         for (int i = 0; i < query_to_cancel.size(); ++i) {
-            cancel(query_to_cancel[i]);
+            cancel(query_to_cancel[i], Status::TimedOut("Query tiemout"));
         }
     } while 
(!_stop_background_threads_latch.wait_for(std::chrono::seconds(1)));
 
diff --git a/be/src/runtime/result_buffer_mgr.h 
b/be/src/runtime/result_buffer_mgr.h
index e6ae0cc1042..18846684233 100644
--- a/be/src/runtime/result_buffer_mgr.h
+++ b/be/src/runtime/result_buffer_mgr.h
@@ -71,7 +71,7 @@ public:
     std::shared_ptr<arrow::Schema> find_arrow_schema(const TUniqueId& 
query_id);
 
     // cancel
-    void cancel(const TUniqueId& fragment_id);
+    void cancel(const TUniqueId& query_id, const Status& reason);
 
     // cancel one query at a future time.
     void cancel_at_time(time_t cancel_time, const TUniqueId& query_id);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java
index 0623932bc9d..d6bb25e9533 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java
@@ -576,7 +576,7 @@ public class QueryProfileAction extends RestBaseController {
         }
 
         ExecuteEnv env = ExecuteEnv.getInstance();
-        env.getScheduler().cancelQuery(queryId);
+        env.getScheduler().cancelQuery(queryId, "cancel query by rest api");
         return ResponseEntityBuilder.ok();
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
index ee5abed8392..b4f52808f4b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
@@ -221,7 +221,7 @@ public class InsertTask extends AbstractTask {
         }
         isCanceled.getAndSet(true);
         if (null != stmtExecutor) {
-            stmtExecutor.cancel();
+            stmtExecutor.cancel("insert task cancelled");
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
index 59a421509d9..966291bd7aa 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
@@ -258,7 +258,7 @@ public class MTMVTask extends AbstractTask {
     protected synchronized void executeCancelLogic() {
         LOG.info("mtmv task cancel, taskId: {}", super.getTaskId());
         if (executor != null) {
-            executor.cancel();
+            executor.cancel("mtmv task cancelled");
         }
         after();
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java
index 1424f3bc301..0e434b0b820 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java
@@ -162,7 +162,7 @@ public class ExportTaskExecutor implements 
TransientTaskExecutor {
         }
         isCanceled.getAndSet(true);
         if (stmtExecutor != null) {
-            stmtExecutor.cancel();
+            stmtExecutor.cancel("export task cancelled");
         }
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
index 737eb33b584..f02c0b289b8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
@@ -600,7 +600,7 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
         for (TUniqueId loadId : loadIds) {
             Coordinator coordinator = 
QeProcessorImpl.INSTANCE.getCoordinator(loadId);
             if (coordinator != null) {
-                coordinator.cancel();
+                coordinator.cancel(failMsg.getMsg());
             }
         }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
index 064b9abfcac..58a45031ffa 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
@@ -143,7 +143,7 @@ public abstract class AbstractInsertExecutor {
         }
         boolean notTimeout = coordinator.join(execTimeout);
         if (!coordinator.isDone()) {
-            coordinator.cancel();
+            coordinator.cancel("insert timeout");
             if (notTimeout) {
                 errMsg = coordinator.getExecStatus().getErrorMsg();
                 ErrorReport.reportDdlException("there exists unhealthy 
backend. "
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index ba53c14e1dc..c5622d54a14 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -935,7 +935,7 @@ public class ConnectContext {
             closeChannel();
         }
         // Now, cancel running query.
-        cancelQuery();
+        cancelQuery("cancel query by user from " + getRemoteHostPortString());
     }
 
     // kill operation with no protect by timeout.
@@ -956,10 +956,10 @@ public class ConnectContext {
         }
     }
 
-    public void cancelQuery() {
+    public void cancelQuery(String cancelMessage) {
         StmtExecutor executorRef = executor;
         if (executorRef != null) {
-            executorRef.cancel();
+            executorRef.cancel(cancelMessage);
         }
     }
 
@@ -990,7 +990,7 @@ public class ConnectContext {
             long timeout = getExecTimeout() * 1000L;
             if (delta > timeout) {
                 LOG.warn("kill {} timeout, remote: {}, query timeout: {}, 
query id: {}",
-                        timeoutTag, getRemoteHostPortString(), timeout, 
queryId);
+                        timeoutTag, getRemoteHostPortString(), timeout, 
DebugUtil.printId(queryId));
                 killFlag = true;
             }
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
index 01b41ec3e96..5da6bb1ba95 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
@@ -145,11 +145,11 @@ public class ConnectScheduler {
         return null;
     }
 
-    public void cancelQuery(String queryId) {
+    public void cancelQuery(String queryId, String cancelReason) {
         for (ConnectContext ctx : connectionMap.values()) {
             TUniqueId qid = ctx.queryId();
             if (qid != null && DebugUtil.printId(qid).equals(queryId)) {
-                ctx.cancelQuery();
+                ctx.cancelQuery(cancelReason);
                 break;
             }
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index b0d1cbaba05..7d9c8243c04 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1164,7 +1164,7 @@ public class Coordinator implements CoordInterface {
                     errMsg = operation + " failed. " + exception.getMessage();
                 }
                 queryStatus.updateStatus(TStatusCode.INTERNAL_ERROR, errMsg);
-                cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
+                cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR, 
errMsg);
                 switch (code) {
                     case TIMEOUT:
                         
MetricRepo.BE_COUNTER_QUERY_RPC_FAILED.getOrAdd(triple.getLeft().brpcAddr.hostname)
@@ -1259,7 +1259,7 @@ public class Coordinator implements CoordInterface {
                     errMsg = operation + " failed. " + exception.getMessage();
                 }
                 queryStatus.updateStatus(TStatusCode.INTERNAL_ERROR, errMsg);
-                cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
+                cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR, 
errMsg);
                 switch (code) {
                     case TIMEOUT:
                         
MetricRepo.BE_COUNTER_QUERY_RPC_FAILED.getOrAdd(triple.getLeft().brpcAddr.hostname)
@@ -1385,9 +1385,9 @@ public class Coordinator implements CoordInterface {
 
             queryStatus.updateStatus(status.getErrorCode(), 
status.getErrorMsg());
             if (status.getErrorCode() == TStatusCode.TIMEOUT) {
-                cancelInternal(Types.PPlanFragmentCancelReason.TIMEOUT);
+                cancelInternal(Types.PPlanFragmentCancelReason.TIMEOUT, 
status.getErrorMsg());
             } else {
-                cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
+                cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR, 
status.getErrorMsg());
             }
         } finally {
             lock.unlock();
@@ -1426,7 +1426,7 @@ public class Coordinator implements CoordInterface {
                 throw new RpcException(null, copyStatus.getErrorMsg());
             } else {
                 String errMsg = copyStatus.getErrorMsg();
-                LOG.warn("query failed: {}", errMsg);
+                LOG.warn("Query {} failed: {}", DebugUtil.printId(queryId), 
errMsg);
                 throw new UserException(errMsg);
             }
         }
@@ -1441,7 +1441,7 @@ public class Coordinator implements CoordInterface {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("no block query, return num >= limit rows, need 
cancel");
                 }
-                cancelInternal(Types.PPlanFragmentCancelReason.LIMIT_REACH);
+                cancelInternal(Types.PPlanFragmentCancelReason.LIMIT_REACH, 
"query reach limit");
             }
             if (ConnectContext.get() != null && 
ConnectContext.get().getSessionVariable().dryRunQuery) {
                 numReceivedRows = 0;
@@ -1528,8 +1528,8 @@ public class Coordinator implements CoordInterface {
     // Cancel execution of query. This includes the execution of the local plan
     // fragment,
     // if any, as well as all plan fragments on remote nodes.
-    public void cancel() {
-        cancel(Types.PPlanFragmentCancelReason.USER_CANCEL, "user cancel");
+    public void cancel(String errorMsg) {
+        cancel(Types.PPlanFragmentCancelReason.USER_CANCEL, errorMsg);
         if (queueToken != null) {
             queueToken.cancel();
         }
@@ -1552,8 +1552,8 @@ public class Coordinator implements CoordInterface {
                 queryStatus.updateStatus(TStatusCode.CANCELLED, errorMsg);
             }
             LOG.warn("Cancel execution of query {}, this is a outside invoke, 
cancelReason {}",
-                    DebugUtil.printId(queryId), cancelReason.toString());
-            cancelInternal(cancelReason);
+                    DebugUtil.printId(queryId), errorMsg);
+            cancelInternal(cancelReason, errorMsg);
         } finally {
             unlock();
         }
@@ -1577,9 +1577,9 @@ public class Coordinator implements CoordInterface {
         }
     }
 
-    private void cancelInternal(Types.PPlanFragmentCancelReason cancelReason) {
+    private void cancelInternal(Types.PPlanFragmentCancelReason cancelReason, 
String cancelMessage) {
         if (null != receiver) {
-            receiver.cancel(cancelReason);
+            receiver.cancel(cancelReason, cancelMessage);
         }
         if (null != pointExec) {
             pointExec.cancel();
@@ -3307,10 +3307,6 @@ public class Coordinator implements CoordInterface {
                                             
DebugUtil.printId(fragmentInstanceId()), status.toString());
                                 }
                             }
-                            LOG.warn("Failed to cancel query {} instance 
initiated={} done={} backend: {},"
-                                    + "fragment instance id={}, reason: {}",
-                                    DebugUtil.printId(queryId), initiated, 
done, backend.getId(),
-                                    DebugUtil.printId(fragmentInstanceId()), 
"without status");
                         }
 
                         public void onFailure(Throwable t) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
index 981d720e8b9..43ad573bf79 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
@@ -111,8 +111,8 @@ public class ResultReceiver {
                         LOG.warn("Query {} get result timeout, get result 
duration {} ms",
                                 DebugUtil.printId(this.queryId), (timeoutTs - 
currentTs) / 1000);
                         setRunStatus(Status.TIMEOUT);
-                        status.updateStatus(TStatusCode.TIMEOUT, "");
-                        updateCancelReason("fetch data timeout");
+                        status.updateStatus(TStatusCode.TIMEOUT, "Query 
timeout");
+                        updateCancelReason("Query timeout");
                         return null;
                     } catch (InterruptedException e) {
                         // continue to get result
@@ -183,7 +183,7 @@ public class ResultReceiver {
             }
         } catch (TimeoutException e) {
             LOG.warn("fetch result timeout, finstId={}", 
DebugUtil.printId(finstId), e);
-            status.updateStatus(TStatusCode.TIMEOUT, "query timeout");
+            status.updateStatus(TStatusCode.TIMEOUT, "Query timeout");
         } finally {
             synchronized (this) {
                 currentThread = null;
@@ -205,13 +205,14 @@ public class ResultReceiver {
         }
     }
 
-    public void cancel(Types.PPlanFragmentCancelReason reason) {
+    public void cancel(Types.PPlanFragmentCancelReason reason, String 
cancelMessage) {
         if (reason == Types.PPlanFragmentCancelReason.TIMEOUT) {
             setRunStatus(Status.TIMEOUT);
         } else {
             setRunStatus(Status.CANCELLED);
         }
-        updateCancelReason(reason.toString());
+
+        updateCancelReason(cancelMessage);
         synchronized (this) {
             if (currentThread != null) {
                 // TODO(cmy): we cannot interrupt this thread, or we may throw
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 3efed4b7650..84aed148106 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1475,7 +1475,7 @@ public class StmtExecutor {
     }
 
     // Because this is called by other thread
-    public void cancel() {
+    public void cancel(String message) {
         Optional<InsertOverwriteTableCommand> insertOverwriteTableCommand = 
getInsertOverwriteTableCommand();
         if (insertOverwriteTableCommand.isPresent()) {
             // If the be scheduling has not been triggered yet, cancel the 
scheduling first
@@ -1483,7 +1483,7 @@ public class StmtExecutor {
         }
         Coordinator coordRef = coord;
         if (coordRef != null) {
-            coordRef.cancel();
+            coordRef.cancel(message);
         }
         if (mysqlLoadId != null) {
             
Env.getCurrentEnv().getLoadManager().getMysqlLoadManager().cancelMySqlLoad(mysqlLoadId);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionCancelQuery.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionCancelQuery.java
index 2dcff6075f4..d512bcfb489 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionCancelQuery.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionCancelQuery.java
@@ -32,7 +32,7 @@ public class WorkloadActionCancelQuery implements 
WorkloadAction {
                 && queryInfo.tUniqueId != null
                 && 
QeProcessorImpl.INSTANCE.getCoordinator(queryInfo.tUniqueId) != null) {
             LOG.info("cancel query {} triggered by query schedule policy.", 
queryInfo.queryId);
-            queryInfo.context.cancelQuery();
+            queryInfo.context.cancelQuery("cancel query by workload policy");
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java
 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java
index 9f703dff92b..406497c77db 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java
@@ -74,7 +74,7 @@ public class FlightSqlConnectContext extends ConnectContext {
             connectScheduler.unregisterConnection(this);
         }
         // Now, cancel running query.
-        cancelQuery();
+        cancelQuery("arrow flight query killed by user");
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
index 31fae23284b..f2919afbb5a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
@@ -192,7 +192,7 @@ public abstract class BaseAnalysisTask {
     public void cancel() {
         killed = true;
         if (stmtExecutor != null) {
-            stmtExecutor.cancel();
+            stmtExecutor.cancel("analysis task cancelled");
         }
         Env.getCurrentEnv().getAnalysisManager()
                 .updateTaskStatus(info, AnalysisState.FAILED,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to