This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit f4deb42a804000d93148d0faaf4d1919221b0169 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Wed Apr 24 21:49:11 2024 +0800 [pipeline](fix) Prevent re-cancel pipeline tasks (#34073) --- be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index c62ed04e580..53d3a1bc5d0 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -130,6 +130,13 @@ PipelineXFragmentContext::~PipelineXFragmentContext() { void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason, const std::string& msg) { + { + std::lock_guard<std::mutex> l(_task_mutex); + if (_closed_tasks == _total_tasks) { + // All tasks in this PipelineXFragmentContext already closed. + return; + } + } LOG_INFO("PipelineXFragmentContext::cancel") .tag("query_id", print_id(_query_id)) .tag("fragment_id", _fragment_id) @@ -158,6 +165,9 @@ void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason, // _exec_env->result_queue_mgr()->update_queue_status(id, Status::Aborted(msg)); for (auto& tasks : _tasks) { for (auto& task : tasks) { + if (task->is_finished()) { + continue; + } task->clear_blocking_state(); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org