This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a66003a5a8a [bugfix](wg) should set task group down after thread pool 
stopped (#31377)
a66003a5a8a is described below

commit a66003a5a8a32962cb4e246b30231778a55f107e
Author: yiguolei <676222...@qq.com>
AuthorDate: Sun Feb 25 18:12:26 2024 +0800

    [bugfix](wg) should set task group down after thread pool stopped (#31377)
    
    Co-authored-by: yiguolei <yiguo...@gmail.com>
---
 be/src/agent/topic_subscriber.cpp                | 2 +-
 be/src/pipeline/task_scheduler.cpp               | 6 +++++-
 be/src/runtime/task_group/task_group_manager.cpp | 1 +
 3 files changed, 7 insertions(+), 2 deletions(-)

diff --git a/be/src/agent/topic_subscriber.cpp 
b/be/src/agent/topic_subscriber.cpp
index c29533bf617..7f7cffd8840 100644
--- a/be/src/agent/topic_subscriber.cpp
+++ b/be/src/agent/topic_subscriber.cpp
@@ -45,7 +45,7 @@ void TopicSubscriber::handle_topic_info(const 
TPublishTopicRequest& topic_reques
         if (topic_request.topic_map.find(listener_pair.first) != 
topic_request.topic_map.end()) {
             listener_pair.second->handle_topic_info(
                     topic_request.topic_map.at(listener_pair.first));
-            LOG(INFO) << "handle topic " << listener_pair.first << " succ";
+            LOG(INFO) << "handle topic " << listener_pair.first << " 
successfully";
         }
     }
 }
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index e9350084c14..521519f5d9a 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -421,7 +421,6 @@ void TaskScheduler::_do_work(size_t index) {
 
 void TaskScheduler::stop() {
     if (!this->_shutdown.load()) {
-        this->_shutdown.store(true);
         if (_task_queue) {
             _task_queue->close();
         }
@@ -432,6 +431,11 @@ void TaskScheduler::stop() {
             _fix_thread_pool->shutdown();
             _fix_thread_pool->wait();
         }
+        // Should set at the ending of the stop to ensure that the
+        // pool is stopped. For example, if there are 2 threads call stop
+        // then if one thread set shutdown = false, then another thread will
+        // not check it and will free task scheduler.
+        this->_shutdown.store(true);
     }
 }
 
diff --git a/be/src/runtime/task_group/task_group_manager.cpp 
b/be/src/runtime/task_group/task_group_manager.cpp
index 718d69021e7..b0b84a0eb89 100644
--- a/be/src/runtime/task_group/task_group_manager.cpp
+++ b/be/src/runtime/task_group/task_group_manager.cpp
@@ -201,6 +201,7 @@ void 
TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id) {
                 task_group_ptr->shutdown();
                 // only when no query running in task group, its resource can 
be released in BE
                 if (task_group_ptr->query_num() == 0) {
+                    LOG(INFO) << "There is no query in wg " << tg_id << ", 
delete it.";
                     deleted_tg_ids.insert(tg_id);
                 }
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to