This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 88e9a70349b branch-4.0: [fix](executor) Fix rare self-deadlock that 
can cause the time-sharing task executor to hang.  (#60089)
88e9a70349b is described below

commit 88e9a70349b6d7a6e50b6358ad717fac2e18ef5c
Author: Qi Chen <[email protected]>
AuthorDate: Wed Jan 21 17:45:36 2026 +0800

    branch-4.0: [fix](executor) Fix rare self-deadlock that can cause the 
time-sharing task executor to hang.  (#60089)
    
    bp #58273
---
 be/src/vec/exec/executor/task_executor.h              |  1 +
 .../time_sharing/time_sharing_task_executor.cpp       | 19 +++++++++++--------
 .../time_sharing/time_sharing_task_executor.h         |  3 +--
 be/src/vec/exec/scan/scanner_scheduler.h              |  1 +
 4 files changed, 14 insertions(+), 10 deletions(-)

diff --git a/be/src/vec/exec/executor/task_executor.h 
b/be/src/vec/exec/executor/task_executor.h
index 59ea00d460e..98526b1b81c 100644
--- a/be/src/vec/exec/executor/task_executor.h
+++ b/be/src/vec/exec/executor/task_executor.h
@@ -37,6 +37,7 @@ public:
     virtual Status init() = 0;
     virtual Status start() = 0;
     virtual void stop() = 0;
+    virtual void wait() = 0;
 
     virtual Result<std::shared_ptr<TaskHandle>> create_task(
             const TaskId& task_id, std::function<double()> 
utilization_supplier,
diff --git 
a/be/src/vec/exec/executor/time_sharing/time_sharing_task_executor.cpp 
b/be/src/vec/exec/executor/time_sharing/time_sharing_task_executor.cpp
index 5d104290177..4459f20037b 100644
--- a/be/src/vec/exec/executor/time_sharing/time_sharing_task_executor.cpp
+++ b/be/src/vec/exec/executor/time_sharing/time_sharing_task_executor.cpp
@@ -562,10 +562,6 @@ void TimeSharingTaskExecutor::_dispatch_thread() {
             std::lock_guard<std::mutex> guard(_mutex);
             _running_splits.insert(split);
         }
-        Defer defer {[&]() {
-            std::lock_guard<std::mutex> guard(_mutex);
-            _running_splits.erase(split);
-        }};
 
         Result<SharedListenableFuture<Void>> blocked_future_result = 
split->process();
 
@@ -577,10 +573,6 @@ void TimeSharingTaskExecutor::_dispatch_thread() {
             auto blocked_future = blocked_future_result.value();
 
             if (split->is_finished()) {
-                {
-                    std::ostringstream _oss;
-                    _oss << std::this_thread::get_id();
-                }
                 _split_finished(split, split->finished_status());
             } else {
                 if (split->is_auto_reschedule()) {
@@ -625,6 +617,17 @@ void TimeSharingTaskExecutor::_dispatch_thread() {
         // In the worst case, the destructor might even try to do something
         // with this SplitThreadPool, and produce a deadlock.
         // task.runnable.reset();
+
+        // IMPORTANT: We must explicitly release 'split' BEFORE acquiring 
_lock to avoid
+        // self-deadlock. The destructor chain (PrioritizedSplitRunner -> 
ScannerSplitRunner
+        // -> _scan_func lambda -> captured ScannerContext) may call 
remove_task() which
+        // tries to acquire _lock. Since _lock is not a recursive mutex, this 
would deadlock.
+        {
+            std::lock_guard<std::mutex> guard(_mutex);
+            _running_splits.erase(split);
+        }
+        split.reset();
+
         l.lock();
         thread_pool_task_execution_time_ns_total->increment(
                 task_execution_time_watch.elapsed_time());
diff --git a/be/src/vec/exec/executor/time_sharing/time_sharing_task_executor.h 
b/be/src/vec/exec/executor/time_sharing/time_sharing_task_executor.h
index f40cb88b424..3550449beb7 100644
--- a/be/src/vec/exec/executor/time_sharing/time_sharing_task_executor.h
+++ b/be/src/vec/exec/executor/time_sharing/time_sharing_task_executor.h
@@ -97,6 +97,7 @@ public:
 
     Status start() override;
     void stop() override;
+    void wait() override;
 
     Result<std::shared_ptr<TaskHandle>> create_task(
             const TaskId& task_id, std::function<double()> 
utilization_supplier,
@@ -245,8 +246,6 @@ private:
             std::unique_lock<std::mutex>& lock);
     void _record_leaf_splits_size(std::unique_lock<std::mutex>& lock);
     void _split_finished(std::shared_ptr<PrioritizedSplitRunner> split, const 
Status& status);
-    // Waits until all the tasks are completed.
-    void wait();
 
     int64_t _get_running_tasks_for_level(int level) const;
 
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h 
b/be/src/vec/exec/scan/scanner_scheduler.h
index 21fa4aefa5c..089f3e1e5b7 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -268,6 +268,7 @@ public:
     void stop() override {
         _is_stop.store(true);
         _task_executor->stop();
+        _task_executor->wait();
     }
 
     Status start(int max_thread_num, int min_thread_num, int queue_size,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to