This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit f4deb42a804000d93148d0faaf4d1919221b0169
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Wed Apr 24 21:49:11 2024 +0800

    [pipeline](fix) Prevent re-cancel pipeline tasks (#34073)
---
 be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp | 10 ++++++++++
 1 file changed, 10 insertions(+)

diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index c62ed04e580..53d3a1bc5d0 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -130,6 +130,13 @@ PipelineXFragmentContext::~PipelineXFragmentContext() {
 
 void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
                                       const std::string& msg) {
+    {
+        std::lock_guard<std::mutex> l(_task_mutex);
+        if (_closed_tasks == _total_tasks) {
+            // All tasks in this PipelineXFragmentContext already closed.
+            return;
+        }
+    }
     LOG_INFO("PipelineXFragmentContext::cancel")
             .tag("query_id", print_id(_query_id))
             .tag("fragment_id", _fragment_id)
@@ -158,6 +165,9 @@ void PipelineXFragmentContext::cancel(const 
PPlanFragmentCancelReason& reason,
     // _exec_env->result_queue_mgr()->update_queue_status(id, 
Status::Aborted(msg));
     for (auto& tasks : _tasks) {
         for (auto& task : tasks) {
+            if (task->is_finished()) {
+                continue;
+            }
             task->clear_blocking_state();
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to