wangbo commented on code in PR #30279:
URL: https://github.com/apache/doris/pull/30279#discussion_r1463138715


##########
be/src/runtime/task_group/task_group_manager.cpp:
##########
@@ -180,68 +180,72 @@ Status 
TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_i
 }
 
 void TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id) 
{
-    // stop task sche may cost some time, so it should not be locked
-    std::set<doris::pipeline::TaskScheduler*> task_sche_to_del;
-    std::set<vectorized::SimplifiedScanScheduler*> scan_task_sche_to_del;
-    std::set<ThreadPool*> non_pip_thread_pool_to_del;
+    int64_t begin_time = MonotonicMillis();
+    // 1 get delete group without running queries
     std::set<uint64_t> deleted_tg_ids;
     {
-        std::shared_lock<std::shared_mutex> read_lock(_task_scheduler_lock);
-        for (auto iter = _tg_sche_map.begin(); iter != _tg_sche_map.end(); 
iter++) {
+        std::lock_guard<std::shared_mutex> write_lock(_group_mutex);
+        for (auto iter = _task_groups.begin(); iter != _task_groups.end(); 
iter++) {
             uint64_t tg_id = iter->first;
+            auto* task_group_ptr = iter->second.get();
             if (used_wg_id.find(tg_id) == used_wg_id.end()) {
-                task_sche_to_del.insert(_tg_sche_map[tg_id].get());
-                deleted_tg_ids.insert(tg_id);
+                task_group_ptr->shutdown();
+                // only when no query running in task group, its resource can 
be released in BE
+                if (task_group_ptr->query_num() == 0) {
+                    deleted_tg_ids.insert(tg_id);
+                }
             }
         }
+    }
 
-        for (auto iter = _tg_scan_sche_map.begin(); iter != 
_tg_scan_sche_map.end(); iter++) {
-            uint64_t tg_id = iter->first;
-            if (used_wg_id.find(tg_id) == used_wg_id.end()) {
-                scan_task_sche_to_del.insert(_tg_scan_sche_map[tg_id].get());
+    // 2 stop active thread
+    std::vector<doris::pipeline::TaskScheduler*> task_sched_to_stop;
+    std::vector<vectorized::SimplifiedScanScheduler*> scan_task_sched_to_stop;
+    std::vector<ThreadPool*> non_pip_thread_pool_to_stop;
+    {
+        std::shared_lock<std::shared_mutex> read_lock(_task_scheduler_lock);
+        for (uint64_t tg_id : deleted_tg_ids) {
+            if (_tg_sche_map.find(tg_id) != _tg_sche_map.end()) {
+                task_sched_to_stop.emplace_back(_tg_sche_map.at(tg_id).get());
             }
-        }
-        for (auto iter = _non_pipe_thread_pool_map.begin(); iter != 
_non_pipe_thread_pool_map.end();
-             iter++) {
-            uint64_t tg_id = iter->first;
-            if (used_wg_id.find(tg_id) == used_wg_id.end()) {
-                
non_pip_thread_pool_to_del.insert(_non_pipe_thread_pool_map[tg_id].get());
+            if (_tg_scan_sche_map.find(tg_id) != _tg_scan_sche_map.end()) {
+                
scan_task_sched_to_stop.emplace_back(_tg_scan_sche_map.at(tg_id).get());
+            }
+            if (_non_pipe_thread_pool_map.find(tg_id) != 
_non_pipe_thread_pool_map.end()) {
+                
non_pip_thread_pool_to_stop.emplace_back(_non_pipe_thread_pool_map.at(tg_id).get());
             }
         }
     }
-    // 1 stop all threads
-    for (auto* ptr1 : task_sche_to_del) {
+    for (auto* ptr1 : task_sched_to_stop) {
         ptr1->stop();
     }
-    for (auto* ptr2 : scan_task_sche_to_del) {
+    for (auto* ptr2 : scan_task_sched_to_stop) {
         ptr2->stop();
     }
-    for (auto& ptr3 : non_pip_thread_pool_to_del) {
+    for (auto& ptr3 : non_pip_thread_pool_to_stop) {
         ptr3->shutdown();
+        ptr3->wait();

Review Comment:
   这个是threadpool的标准用法,先标记poolshutdown,然后等没有活跃线程了,就都结束了。
   wait就是等线程结束的过程。因为非pipeline目前没有封装scheduler,所以是手动这么处理的。
   pipeline的scheduler调用stop的里边,也是做的同样的操作



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to