This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 4386a71709f [bugfix](wgcore) map at only get reference and it will 
core in multithread
4386a71709f is described below

commit 4386a71709f535f62bad6ed6a9188736d7e55d1f
Author: yiguolei <yiguo...@gmail.com>
AuthorDate: Sun Mar 3 13:38:41 2024 +0800

    [bugfix](wgcore) map at only get reference and it will core in multithread
    
    f
---
 be/src/runtime/task_group/task_group_manager.cpp | 18 ++++++++++--------
 1 file changed, 10 insertions(+), 8 deletions(-)

diff --git a/be/src/runtime/task_group/task_group_manager.cpp 
b/be/src/runtime/task_group/task_group_manager.cpp
index 819e63c855d..a336cccd3d2 100644
--- a/be/src/runtime/task_group/task_group_manager.cpp
+++ b/be/src/runtime/task_group/task_group_manager.cpp
@@ -72,33 +72,35 @@ TaskGroupPtr 
TaskGroupManager::get_task_group_by_id(uint64_t tg_id) {
 void TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id) 
{
     int64_t begin_time = MonotonicMillis();
     // 1 get delete group without running queries
-    std::set<uint64_t> deleted_tg_ids;
+    std::vector<TaskGroupPtr> deleted_task_groups;
     {
         std::lock_guard<std::shared_mutex> write_lock(_group_mutex);
         for (auto iter = _task_groups.begin(); iter != _task_groups.end(); 
iter++) {
             uint64_t tg_id = iter->first;
-            auto* task_group_ptr = iter->second.get();
+            auto task_group_ptr = iter->second;
             if (used_wg_id.find(tg_id) == used_wg_id.end()) {
                 task_group_ptr->shutdown();
                 // only when no query running in task group, its resource can 
be released in BE
                 if (task_group_ptr->query_num() == 0) {
                     LOG(INFO) << "There is no query in wg " << tg_id << ", 
delete it.";
-                    deleted_tg_ids.insert(tg_id);
+                    deleted_task_groups.push_back(task_group_ptr);
                 }
             }
         }
     }
 
     // 2 stop active thread
-    for (uint64_t tg_id : deleted_tg_ids) {
-        _task_groups.at(tg_id)->try_stop_schedulers();
+    for (auto& tg : deleted_task_groups) {
+        // There is not lock here, but the tg may be released by another
+        // thread, so that we should use shared ptr here, not use tg_id
+        tg->try_stop_schedulers();
     }
 
     // 3 release resource in memory
     {
         std::lock_guard<std::shared_mutex> write_lock(_group_mutex);
-        for (uint64_t tg_id : deleted_tg_ids) {
-            _task_groups.erase(tg_id);
+        for (auto& tg : deleted_task_groups) {
+            _task_groups.erase(tg->id());
         }
     }
 
@@ -129,7 +131,7 @@ void 
TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id) {
     }
     int64_t time_cost_ms = MonotonicMillis() - begin_time;
     LOG(INFO) << "finish clear unused task group, time cost: " << time_cost_ms
-              << "ms, deleted group size:" << deleted_tg_ids.size();
+              << "ms, deleted group size:" << deleted_task_groups.size();
 }
 
 void TaskGroupManager::stop() {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to