This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new abcba778ff5 [fix](cancel) Fix cancel msg on branch-2.1 (#41798) abcba778ff5 is described below commit abcba778ff566355bd92b8f1914a058a2ae634e1 Author: zhiqiang <seuhezhiqi...@163.com> AuthorDate: Tue Oct 15 17:15:05 2024 +0800 [fix](cancel) Fix cancel msg on branch-2.1 (#41798) Make sure we can tell cancel reason from: 1. user cancel 2. timeout 3. others ```text mysql [demo]>set query_timeout=1; -------------- set query_timeout=1 -------------- Query OK, 0 rows affected (0.00 sec) mysql [demo]>select sleep(5); -------------- select sleep(5) -------------- ERROR 1105 (HY000): errCode = 2, detailMessage = Timeout mysql [demo]>select sleep(5); -------------- select sleep(5) -------------- ^C^C -- sending "KILL QUERY 0" to server ... ^C -- query aborted ERROR 1105 (HY000): errCode = 2, detailMessage = cancel query by user from 127.0.0.1:64208 ``` --- .../pipeline_x/pipeline_x_fragment_context.cpp | 2 +- be/src/runtime/buffer_control_block.cpp | 10 ++++---- be/src/runtime/buffer_control_block.h | 6 ++--- be/src/runtime/fragment_mgr.cpp | 12 +++++----- be/src/runtime/result_buffer_mgr.cpp | 7 +++--- be/src/runtime/result_buffer_mgr.h | 2 +- .../httpv2/rest/manager/QueryProfileAction.java | 2 +- .../doris/job/extensions/insert/InsertTask.java | 2 +- .../apache/doris/job/extensions/mtmv/MTMVTask.java | 2 +- .../org/apache/doris/load/ExportTaskExecutor.java | 2 +- .../java/org/apache/doris/load/loadv2/LoadJob.java | 2 +- .../commands/insert/AbstractInsertExecutor.java | 2 +- .../java/org/apache/doris/qe/ConnectContext.java | 8 +++---- .../java/org/apache/doris/qe/ConnectScheduler.java | 4 ++-- .../main/java/org/apache/doris/qe/Coordinator.java | 28 ++++++++++------------ .../java/org/apache/doris/qe/ResultReceiver.java | 11 +++++---- .../java/org/apache/doris/qe/StmtExecutor.java | 4 ++-- .../WorkloadActionCancelQuery.java | 2 +- .../sessions/FlightSqlConnectContext.java | 2 +- .../apache/doris/statistics/BaseAnalysisTask.java | 2 +- 20 files changed, 55 insertions(+), 57 deletions(-) diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index a3dff107f1b..53dae142a6d 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -145,7 +145,7 @@ void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason, LOG_INFO("PipelineXFragmentContext::cancel") .tag("query_id", print_id(_query_id)) .tag("fragment_id", _fragment_id) - .tag("reason", reason) + .tag("reason", PPlanFragmentCancelReason_Name(reason)) .tag("error message", msg); if (reason == PPlanFragmentCancelReason::TIMEOUT) { LOG(WARNING) << "PipelineXFragmentContext is cancelled due to timeout : " << debug_string(); diff --git a/be/src/runtime/buffer_control_block.cpp b/be/src/runtime/buffer_control_block.cpp index 4b9fa57ce64..c61c98a324b 100644 --- a/be/src/runtime/buffer_control_block.cpp +++ b/be/src/runtime/buffer_control_block.cpp @@ -104,7 +104,7 @@ BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size) } BufferControlBlock::~BufferControlBlock() { - cancel(); + cancel(Status::Cancelled("Cancelled")); } Status BufferControlBlock::init() { @@ -266,13 +266,13 @@ Status BufferControlBlock::close(Status exec_status) { return Status::OK(); } -void BufferControlBlock::cancel() { +void BufferControlBlock::cancel(const Status& reason) { std::unique_lock<std::mutex> l(_lock); _is_cancelled = true; _data_removal.notify_all(); _data_arrival.notify_all(); for (auto& ctx : _waiting_rpc) { - ctx->on_failure(Status::Cancelled("Cancelled")); + ctx->on_failure(reason); } _waiting_rpc.clear(); } @@ -301,8 +301,8 @@ Status PipBufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch return Status::OK(); } -void PipBufferControlBlock::cancel() { - BufferControlBlock::cancel(); +void PipBufferControlBlock::cancel(const Status& reason) { + BufferControlBlock::cancel(reason); _update_dependency(); } diff --git a/be/src/runtime/buffer_control_block.h b/be/src/runtime/buffer_control_block.h index b8b3f3d163e..9e991613f2e 100644 --- a/be/src/runtime/buffer_control_block.h +++ b/be/src/runtime/buffer_control_block.h @@ -86,8 +86,8 @@ public: // close buffer block, set _status to exec_status and set _is_close to true; // called because data has been read or error happened. Status close(Status exec_status); - // this is called by RPC, called from coordinator - virtual void cancel(); + + virtual void cancel(const Status& reason); [[nodiscard]] const TUniqueId& fragment_id() const { return _fragment_id; } @@ -152,7 +152,7 @@ public: Status get_arrow_batch(std::shared_ptr<arrow::RecordBatch>* result) override; - void cancel() override; + void cancel(const Status& reason) override; void set_dependency(std::shared_ptr<pipeline::Dependency> result_sink_dependency); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index acf622b4196..4696814e55d 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -1249,7 +1249,7 @@ void FragmentMgr::cancel_worker() { clock_gettime(CLOCK_MONOTONIC, &check_invalid_query_last_timestamp); do { - std::vector<TUniqueId> to_cancel; + std::vector<TUniqueId> queries_timeout; std::vector<TUniqueId> queries_to_cancel; std::vector<TUniqueId> queries_pipeline_task_leak; // Fe process uuid -> set<QueryId> @@ -1274,7 +1274,7 @@ void FragmentMgr::cancel_worker() { std::lock_guard<std::mutex> lock(_lock); for (auto& fragment_instance_itr : _fragment_instance_map) { if (fragment_instance_itr.second->is_timeout(now)) { - to_cancel.push_back(fragment_instance_itr.second->fragment_instance_id()); + queries_timeout.push_back(fragment_instance_itr.second->fragment_instance_id()); } } for (auto& pipeline_itr : _pipeline_map) { @@ -1283,7 +1283,7 @@ void FragmentMgr::cancel_worker() { reinterpret_cast<pipeline::PipelineXFragmentContext*>(pipeline_itr.second.get()) ->instance_ids(ins_ids); for (auto& ins_id : ins_ids) { - to_cancel.push_back(ins_id); + queries_timeout.push_back(ins_id); } } else { pipeline_itr.second->clear_finished_tasks(); @@ -1393,9 +1393,9 @@ void FragmentMgr::cancel_worker() { // TODO(zhiqiang): It seems that timeout_canceled_fragment_count is // designed to count canceled fragment of non-pipeline query. - timeout_canceled_fragment_count->increment(to_cancel.size()); - for (auto& id : to_cancel) { - cancel_instance(id, PPlanFragmentCancelReason::TIMEOUT); + timeout_canceled_fragment_count->increment(queries_timeout.size()); + for (auto& id : queries_timeout) { + cancel_instance(id, PPlanFragmentCancelReason::TIMEOUT, "Query timeout"); LOG(INFO) << "FragmentMgr cancel worker going to cancel timeout instance " << print_id(id); } diff --git a/be/src/runtime/result_buffer_mgr.cpp b/be/src/runtime/result_buffer_mgr.cpp index 3d96c1871b9..f81c9b1094f 100644 --- a/be/src/runtime/result_buffer_mgr.cpp +++ b/be/src/runtime/result_buffer_mgr.cpp @@ -32,6 +32,7 @@ #include "arrow/record_batch.h" #include "arrow/type_fwd.h" +#include "common/status.h" #include "runtime/buffer_control_block.h" #include "util/doris_metrics.h" #include "util/metrics.h" @@ -150,13 +151,13 @@ Status ResultBufferMgr::fetch_arrow_data(const TUniqueId& finst_id, return Status::OK(); } -void ResultBufferMgr::cancel(const TUniqueId& query_id) { +void ResultBufferMgr::cancel(const TUniqueId& query_id, const Status& reason) { { std::unique_lock<std::shared_mutex> wlock(_buffer_map_lock); BufferMap::iterator iter = _buffer_map.find(query_id); if (_buffer_map.end() != iter) { - iter->second->cancel(); + iter->second->cancel(reason); _buffer_map.erase(iter); } } @@ -206,7 +207,7 @@ void ResultBufferMgr::cancel_thread() { // cancel query for (int i = 0; i < query_to_cancel.size(); ++i) { - cancel(query_to_cancel[i]); + cancel(query_to_cancel[i], Status::TimedOut("Query tiemout")); } } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(1))); diff --git a/be/src/runtime/result_buffer_mgr.h b/be/src/runtime/result_buffer_mgr.h index e6ae0cc1042..18846684233 100644 --- a/be/src/runtime/result_buffer_mgr.h +++ b/be/src/runtime/result_buffer_mgr.h @@ -71,7 +71,7 @@ public: std::shared_ptr<arrow::Schema> find_arrow_schema(const TUniqueId& query_id); // cancel - void cancel(const TUniqueId& fragment_id); + void cancel(const TUniqueId& query_id, const Status& reason); // cancel one query at a future time. void cancel_at_time(time_t cancel_time, const TUniqueId& query_id); diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java index 0623932bc9d..d6bb25e9533 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java @@ -576,7 +576,7 @@ public class QueryProfileAction extends RestBaseController { } ExecuteEnv env = ExecuteEnv.getInstance(); - env.getScheduler().cancelQuery(queryId); + env.getScheduler().cancelQuery(queryId, "cancel query by rest api"); return ResponseEntityBuilder.ok(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java index ee5abed8392..b4f52808f4b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java @@ -221,7 +221,7 @@ public class InsertTask extends AbstractTask { } isCanceled.getAndSet(true); if (null != stmtExecutor) { - stmtExecutor.cancel(); + stmtExecutor.cancel("insert task cancelled"); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index 59a421509d9..966291bd7aa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -258,7 +258,7 @@ public class MTMVTask extends AbstractTask { protected synchronized void executeCancelLogic() { LOG.info("mtmv task cancel, taskId: {}", super.getTaskId()); if (executor != null) { - executor.cancel(); + executor.cancel("mtmv task cancelled"); } after(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java index 1424f3bc301..0e434b0b820 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java @@ -162,7 +162,7 @@ public class ExportTaskExecutor implements TransientTaskExecutor { } isCanceled.getAndSet(true); if (stmtExecutor != null) { - stmtExecutor.cancel(); + stmtExecutor.cancel("export task cancelled"); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index 737eb33b584..f02c0b289b8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -600,7 +600,7 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements for (TUniqueId loadId : loadIds) { Coordinator coordinator = QeProcessorImpl.INSTANCE.getCoordinator(loadId); if (coordinator != null) { - coordinator.cancel(); + coordinator.cancel(failMsg.getMsg()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java index 064b9abfcac..58a45031ffa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java @@ -143,7 +143,7 @@ public abstract class AbstractInsertExecutor { } boolean notTimeout = coordinator.join(execTimeout); if (!coordinator.isDone()) { - coordinator.cancel(); + coordinator.cancel("insert timeout"); if (notTimeout) { errMsg = coordinator.getExecStatus().getErrorMsg(); ErrorReport.reportDdlException("there exists unhealthy backend. " diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index ba53c14e1dc..c5622d54a14 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -935,7 +935,7 @@ public class ConnectContext { closeChannel(); } // Now, cancel running query. - cancelQuery(); + cancelQuery("cancel query by user from " + getRemoteHostPortString()); } // kill operation with no protect by timeout. @@ -956,10 +956,10 @@ public class ConnectContext { } } - public void cancelQuery() { + public void cancelQuery(String cancelMessage) { StmtExecutor executorRef = executor; if (executorRef != null) { - executorRef.cancel(); + executorRef.cancel(cancelMessage); } } @@ -990,7 +990,7 @@ public class ConnectContext { long timeout = getExecTimeout() * 1000L; if (delta > timeout) { LOG.warn("kill {} timeout, remote: {}, query timeout: {}, query id: {}", - timeoutTag, getRemoteHostPortString(), timeout, queryId); + timeoutTag, getRemoteHostPortString(), timeout, DebugUtil.printId(queryId)); killFlag = true; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java index 01b41ec3e96..5da6bb1ba95 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java @@ -145,11 +145,11 @@ public class ConnectScheduler { return null; } - public void cancelQuery(String queryId) { + public void cancelQuery(String queryId, String cancelReason) { for (ConnectContext ctx : connectionMap.values()) { TUniqueId qid = ctx.queryId(); if (qid != null && DebugUtil.printId(qid).equals(queryId)) { - ctx.cancelQuery(); + ctx.cancelQuery(cancelReason); break; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index b0d1cbaba05..7d9c8243c04 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1164,7 +1164,7 @@ public class Coordinator implements CoordInterface { errMsg = operation + " failed. " + exception.getMessage(); } queryStatus.updateStatus(TStatusCode.INTERNAL_ERROR, errMsg); - cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR); + cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR, errMsg); switch (code) { case TIMEOUT: MetricRepo.BE_COUNTER_QUERY_RPC_FAILED.getOrAdd(triple.getLeft().brpcAddr.hostname) @@ -1259,7 +1259,7 @@ public class Coordinator implements CoordInterface { errMsg = operation + " failed. " + exception.getMessage(); } queryStatus.updateStatus(TStatusCode.INTERNAL_ERROR, errMsg); - cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR); + cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR, errMsg); switch (code) { case TIMEOUT: MetricRepo.BE_COUNTER_QUERY_RPC_FAILED.getOrAdd(triple.getLeft().brpcAddr.hostname) @@ -1385,9 +1385,9 @@ public class Coordinator implements CoordInterface { queryStatus.updateStatus(status.getErrorCode(), status.getErrorMsg()); if (status.getErrorCode() == TStatusCode.TIMEOUT) { - cancelInternal(Types.PPlanFragmentCancelReason.TIMEOUT); + cancelInternal(Types.PPlanFragmentCancelReason.TIMEOUT, status.getErrorMsg()); } else { - cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR); + cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR, status.getErrorMsg()); } } finally { lock.unlock(); @@ -1426,7 +1426,7 @@ public class Coordinator implements CoordInterface { throw new RpcException(null, copyStatus.getErrorMsg()); } else { String errMsg = copyStatus.getErrorMsg(); - LOG.warn("query failed: {}", errMsg); + LOG.warn("Query {} failed: {}", DebugUtil.printId(queryId), errMsg); throw new UserException(errMsg); } } @@ -1441,7 +1441,7 @@ public class Coordinator implements CoordInterface { if (LOG.isDebugEnabled()) { LOG.debug("no block query, return num >= limit rows, need cancel"); } - cancelInternal(Types.PPlanFragmentCancelReason.LIMIT_REACH); + cancelInternal(Types.PPlanFragmentCancelReason.LIMIT_REACH, "query reach limit"); } if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().dryRunQuery) { numReceivedRows = 0; @@ -1528,8 +1528,8 @@ public class Coordinator implements CoordInterface { // Cancel execution of query. This includes the execution of the local plan // fragment, // if any, as well as all plan fragments on remote nodes. - public void cancel() { - cancel(Types.PPlanFragmentCancelReason.USER_CANCEL, "user cancel"); + public void cancel(String errorMsg) { + cancel(Types.PPlanFragmentCancelReason.USER_CANCEL, errorMsg); if (queueToken != null) { queueToken.cancel(); } @@ -1552,8 +1552,8 @@ public class Coordinator implements CoordInterface { queryStatus.updateStatus(TStatusCode.CANCELLED, errorMsg); } LOG.warn("Cancel execution of query {}, this is a outside invoke, cancelReason {}", - DebugUtil.printId(queryId), cancelReason.toString()); - cancelInternal(cancelReason); + DebugUtil.printId(queryId), errorMsg); + cancelInternal(cancelReason, errorMsg); } finally { unlock(); } @@ -1577,9 +1577,9 @@ public class Coordinator implements CoordInterface { } } - private void cancelInternal(Types.PPlanFragmentCancelReason cancelReason) { + private void cancelInternal(Types.PPlanFragmentCancelReason cancelReason, String cancelMessage) { if (null != receiver) { - receiver.cancel(cancelReason); + receiver.cancel(cancelReason, cancelMessage); } if (null != pointExec) { pointExec.cancel(); @@ -3307,10 +3307,6 @@ public class Coordinator implements CoordInterface { DebugUtil.printId(fragmentInstanceId()), status.toString()); } } - LOG.warn("Failed to cancel query {} instance initiated={} done={} backend: {}," - + "fragment instance id={}, reason: {}", - DebugUtil.printId(queryId), initiated, done, backend.getId(), - DebugUtil.printId(fragmentInstanceId()), "without status"); } public void onFailure(Throwable t) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java index 981d720e8b9..43ad573bf79 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java @@ -111,8 +111,8 @@ public class ResultReceiver { LOG.warn("Query {} get result timeout, get result duration {} ms", DebugUtil.printId(this.queryId), (timeoutTs - currentTs) / 1000); setRunStatus(Status.TIMEOUT); - status.updateStatus(TStatusCode.TIMEOUT, ""); - updateCancelReason("fetch data timeout"); + status.updateStatus(TStatusCode.TIMEOUT, "Query timeout"); + updateCancelReason("Query timeout"); return null; } catch (InterruptedException e) { // continue to get result @@ -183,7 +183,7 @@ public class ResultReceiver { } } catch (TimeoutException e) { LOG.warn("fetch result timeout, finstId={}", DebugUtil.printId(finstId), e); - status.updateStatus(TStatusCode.TIMEOUT, "query timeout"); + status.updateStatus(TStatusCode.TIMEOUT, "Query timeout"); } finally { synchronized (this) { currentThread = null; @@ -205,13 +205,14 @@ public class ResultReceiver { } } - public void cancel(Types.PPlanFragmentCancelReason reason) { + public void cancel(Types.PPlanFragmentCancelReason reason, String cancelMessage) { if (reason == Types.PPlanFragmentCancelReason.TIMEOUT) { setRunStatus(Status.TIMEOUT); } else { setRunStatus(Status.CANCELLED); } - updateCancelReason(reason.toString()); + + updateCancelReason(cancelMessage); synchronized (this) { if (currentThread != null) { // TODO(cmy): we cannot interrupt this thread, or we may throw diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 3efed4b7650..84aed148106 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1475,7 +1475,7 @@ public class StmtExecutor { } // Because this is called by other thread - public void cancel() { + public void cancel(String message) { Optional<InsertOverwriteTableCommand> insertOverwriteTableCommand = getInsertOverwriteTableCommand(); if (insertOverwriteTableCommand.isPresent()) { // If the be scheduling has not been triggered yet, cancel the scheduling first @@ -1483,7 +1483,7 @@ public class StmtExecutor { } Coordinator coordRef = coord; if (coordRef != null) { - coordRef.cancel(); + coordRef.cancel(message); } if (mysqlLoadId != null) { Env.getCurrentEnv().getLoadManager().getMysqlLoadManager().cancelMySqlLoad(mysqlLoadId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionCancelQuery.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionCancelQuery.java index 2dcff6075f4..d512bcfb489 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionCancelQuery.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionCancelQuery.java @@ -32,7 +32,7 @@ public class WorkloadActionCancelQuery implements WorkloadAction { && queryInfo.tUniqueId != null && QeProcessorImpl.INSTANCE.getCoordinator(queryInfo.tUniqueId) != null) { LOG.info("cancel query {} triggered by query schedule policy.", queryInfo.queryId); - queryInfo.context.cancelQuery(); + queryInfo.context.cancelQuery("cancel query by workload policy"); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java index 9f703dff92b..406497c77db 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java @@ -74,7 +74,7 @@ public class FlightSqlConnectContext extends ConnectContext { connectScheduler.unregisterConnection(this); } // Now, cancel running query. - cancelQuery(); + cancelQuery("arrow flight query killed by user"); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java index 31fae23284b..f2919afbb5a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java @@ -192,7 +192,7 @@ public abstract class BaseAnalysisTask { public void cancel() { killed = true; if (stmtExecutor != null) { - stmtExecutor.cancel(); + stmtExecutor.cancel("analysis task cancelled"); } Env.getCurrentEnv().getAnalysisManager() .updateTaskStatus(info, AnalysisState.FAILED, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org