This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 4386a71709f [bugfix](wgcore) map at only get reference and it will core in multithread 4386a71709f is described below commit 4386a71709f535f62bad6ed6a9188736d7e55d1f Author: yiguolei <yiguo...@gmail.com> AuthorDate: Sun Mar 3 13:38:41 2024 +0800 [bugfix](wgcore) map at only get reference and it will core in multithread f --- be/src/runtime/task_group/task_group_manager.cpp | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/be/src/runtime/task_group/task_group_manager.cpp b/be/src/runtime/task_group/task_group_manager.cpp index 819e63c855d..a336cccd3d2 100644 --- a/be/src/runtime/task_group/task_group_manager.cpp +++ b/be/src/runtime/task_group/task_group_manager.cpp @@ -72,33 +72,35 @@ TaskGroupPtr TaskGroupManager::get_task_group_by_id(uint64_t tg_id) { void TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id) { int64_t begin_time = MonotonicMillis(); // 1 get delete group without running queries - std::set<uint64_t> deleted_tg_ids; + std::vector<TaskGroupPtr> deleted_task_groups; { std::lock_guard<std::shared_mutex> write_lock(_group_mutex); for (auto iter = _task_groups.begin(); iter != _task_groups.end(); iter++) { uint64_t tg_id = iter->first; - auto* task_group_ptr = iter->second.get(); + auto task_group_ptr = iter->second; if (used_wg_id.find(tg_id) == used_wg_id.end()) { task_group_ptr->shutdown(); // only when no query running in task group, its resource can be released in BE if (task_group_ptr->query_num() == 0) { LOG(INFO) << "There is no query in wg " << tg_id << ", delete it."; - deleted_tg_ids.insert(tg_id); + deleted_task_groups.push_back(task_group_ptr); } } } } // 2 stop active thread - for (uint64_t tg_id : deleted_tg_ids) { - _task_groups.at(tg_id)->try_stop_schedulers(); + for (auto& tg : deleted_task_groups) { + // There is not lock here, but the tg may be released by another + // thread, so that we should use shared ptr here, not use tg_id + tg->try_stop_schedulers(); } // 3 release resource in memory { std::lock_guard<std::shared_mutex> write_lock(_group_mutex); - for (uint64_t tg_id : deleted_tg_ids) { - _task_groups.erase(tg_id); + for (auto& tg : deleted_task_groups) { + _task_groups.erase(tg->id()); } } @@ -129,7 +131,7 @@ void TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id) { } int64_t time_cost_ms = MonotonicMillis() - begin_time; LOG(INFO) << "finish clear unused task group, time cost: " << time_cost_ms - << "ms, deleted group size:" << deleted_tg_ids.size(); + << "ms, deleted group size:" << deleted_task_groups.size(); } void TaskGroupManager::stop() { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org