This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3595f214058 [improvement](executor)clear unused cgroup path (#27798)
3595f214058 is described below

commit 3595f2140589f0d92da6be007d2dd2d6fcb2285d
Author: wangbo <wan...@apache.org>
AuthorDate: Tue Dec 5 14:18:23 2023 +0800

    [improvement](executor)clear unused cgroup path (#27798)
    
    * clear unused cgroup path
    
    * use C++ api
    
    * add gcc header
---
 be/src/agent/cgroup_cpu_ctl.cpp                  | 76 ++++++++++++++++++-
 be/src/agent/cgroup_cpu_ctl.h                    |  8 +-
 be/src/agent/workload_group_listener.cpp         |  2 +-
 be/src/runtime/task_group/task_group_manager.cpp | 94 ++++++++++++++++--------
 be/src/runtime/task_group/task_group_manager.h   |  6 +-
 5 files changed, 150 insertions(+), 36 deletions(-)

diff --git a/be/src/agent/cgroup_cpu_ctl.cpp b/be/src/agent/cgroup_cpu_ctl.cpp
index 6fa234e6ee2..a494681d082 100644
--- a/be/src/agent/cgroup_cpu_ctl.cpp
+++ b/be/src/agent/cgroup_cpu_ctl.cpp
@@ -18,6 +18,9 @@
 #include "agent/cgroup_cpu_ctl.h"
 
 #include <fmt/format.h>
+#include <sys/stat.h>
+
+#include <filesystem>
 
 namespace doris {
 
@@ -100,11 +103,34 @@ Status CgroupV1CpuCtl::init() {
         int ret = mkdir(_cgroup_v1_cpu_query_path.c_str(), S_IRWXU);
         if (ret != 0) {
             LOG(ERROR) << "cgroup v1 mkdir query failed, path=" << 
_cgroup_v1_cpu_query_path;
-            return Status::InternalError<false>("cgroup v1 mkdir query failed, 
path=",
+            return Status::InternalError<false>("cgroup v1 mkdir query failed, 
path={}",
                                                 _cgroup_v1_cpu_query_path);
         }
     }
 
+    // check whether current user specified path is a valid cgroup path
+    std::string query_path_tasks = _cgroup_v1_cpu_query_path + "/tasks";
+    std::string query_path_cpu_shares = _cgroup_v1_cpu_query_path + 
"/cpu.shares";
+    std::string query_path_quota = _cgroup_v1_cpu_query_path + 
"/cpu.cfs_quota_us";
+    if (access(query_path_tasks.c_str(), F_OK) != 0) {
+        return Status::InternalError<false>("invalid cgroup path, not find 
task file");
+    }
+    if (access(query_path_cpu_shares.c_str(), F_OK) != 0) {
+        return Status::InternalError<false>("invalid cgroup path, not find cpu 
share file");
+    }
+    if (access(query_path_quota.c_str(), F_OK) != 0) {
+        return Status::InternalError<false>("invalid cgroup path, not find cpu 
quota file");
+    }
+
+    if (_tg_id == -1) {
+        // means current cgroup cpu ctl is just used to clear dir,
+        // it does not contains task group.
+        // todo(wb) rethinking whether need to refactor cgroup_cpu_ctl
+        _init_succ = true;
+        LOG(INFO) << "init cgroup cpu query path succ, path=" << 
_cgroup_v1_cpu_query_path;
+        return Status::OK();
+    }
+
     // workload group path
     _cgroup_v1_cpu_tg_path = _cgroup_v1_cpu_query_path + "/" + 
std::to_string(_tg_id);
     if (access(_cgroup_v1_cpu_tg_path.c_str(), F_OK) != 0) {
@@ -157,4 +183,52 @@ Status CgroupV1CpuCtl::add_thread_to_cgroup() {
     return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_tg_task_file, tid, 
msg, true);
 #endif
 }
+
+Status CgroupV1CpuCtl::delete_unused_cgroup_path(std::set<uint64_t>& 
used_wg_ids) {
+    if (!_init_succ) {
+        return Status::InternalError<false>(
+                "cgroup cpu ctl init failed, delete can not be executed");
+    }
+    // 1 get unused wg id
+    std::set<std::string> unused_wg_ids;
+    for (const auto& entry : 
std::filesystem::directory_iterator(_cgroup_v1_cpu_query_path)) {
+        const std::string dir_name = entry.path().string();
+        struct stat st;
+        // == 0 means exists
+        if (stat(dir_name.c_str(), &st) == 0 && (st.st_mode & S_IFDIR)) {
+            int pos = dir_name.rfind("/");
+            std::string wg_dir_name = dir_name.substr(pos + 1, 
dir_name.length());
+            if (wg_dir_name.empty()) {
+                return Status::InternalError<false>("find an empty workload 
group path, path={}",
+                                                    dir_name);
+            }
+            if (std::all_of(wg_dir_name.begin(), wg_dir_name.end(), 
::isdigit)) {
+                uint64_t id_in_path = std::stoll(wg_dir_name);
+                if (used_wg_ids.find(id_in_path) == used_wg_ids.end()) {
+                    unused_wg_ids.insert(wg_dir_name);
+                }
+            }
+        }
+    }
+
+    // 2 delete unused cgroup path
+    int failed_count = 0;
+    std::string query_path = _cgroup_v1_cpu_query_path.back() != '/'
+                                     ? _cgroup_v1_cpu_query_path + "/"
+                                     : _cgroup_v1_cpu_query_path;
+    for (const std::string& unused_wg_id : unused_wg_ids) {
+        std::string wg_path = query_path + unused_wg_id;
+        int ret = rmdir(wg_path.c_str());
+        if (ret < 0) {
+            LOG(WARNING) << "rmdir failed, path=" << wg_path;
+            failed_count++;
+        }
+    }
+    if (failed_count != 0) {
+        return Status::InternalError<false>("error happens when delete unused 
path, count={}",
+                                            failed_count);
+    }
+    return Status::OK();
+}
+
 } // namespace doris
diff --git a/be/src/agent/cgroup_cpu_ctl.h b/be/src/agent/cgroup_cpu_ctl.h
index 2a7cdc5719b..94514c8e2e0 100644
--- a/be/src/agent/cgroup_cpu_ctl.h
+++ b/be/src/agent/cgroup_cpu_ctl.h
@@ -37,6 +37,7 @@ const static uint64_t CPU_SOFT_LIMIT_DEFAULT_VALUE = 1024;
 class CgroupCpuCtl {
 public:
     virtual ~CgroupCpuCtl() = default;
+    CgroupCpuCtl() = default;
     CgroupCpuCtl(uint64_t tg_id) { _tg_id = tg_id; }
 
     virtual Status init();
@@ -50,6 +51,8 @@ public:
     // for log
     void get_cgroup_cpu_info(uint64_t* cpu_shares, int* cpu_hard_limit);
 
+    virtual Status delete_unused_cgroup_path(std::set<uint64_t>& used_wg_ids) 
= 0;
+
 protected:
     Status write_cg_sys_file(std::string file_path, int value, std::string 
msg, bool is_append);
 
@@ -63,7 +66,7 @@ protected:
     int _cpu_hard_limit = 0;
     std::shared_mutex _lock_mutex;
     bool _init_succ = false;
-    uint64_t _tg_id; // workload group id
+    uint64_t _tg_id = -1; // workload group id
     uint64_t _cpu_shares = 0;
 };
 
@@ -96,11 +99,14 @@ protected:
 class CgroupV1CpuCtl : public CgroupCpuCtl {
 public:
     CgroupV1CpuCtl(uint64_t tg_id) : CgroupCpuCtl(tg_id) {}
+    CgroupV1CpuCtl() = default;
     Status init() override;
     Status modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) override;
     Status modify_cg_cpu_soft_limit_no_lock(int cpu_shares) override;
     Status add_thread_to_cgroup() override;
 
+    Status delete_unused_cgroup_path(std::set<uint64_t>& used_wg_ids) override;
+
 private:
     std::string _cgroup_v1_cpu_query_path;
     std::string _cgroup_v1_cpu_tg_path; // workload group path
diff --git a/be/src/agent/workload_group_listener.cpp 
b/be/src/agent/workload_group_listener.cpp
index 6d7dfb9a3a0..f2770e8e7c4 100644
--- a/be/src/agent/workload_group_listener.cpp
+++ b/be/src/agent/workload_group_listener.cpp
@@ -57,7 +57,7 @@ void WorkloadGroupListener::handle_topic_info(const 
std::vector<TopicInfo>& topi
                          << ", reason=" << ret2.to_string();
         }
 
-        LOG(INFO) << "update task group success, tg info=" << 
tg->debug_string()
+        LOG(INFO) << "update task group finish, tg info=" << tg->debug_string()
                   << ", enable_cpu_hard_limit="
                   << (_exec_env->task_group_manager()->enable_cpu_hard_limit() 
? "true" : "false")
                   << ", cgroup cpu_shares=" << 
task_group_info.cgroup_cpu_shares
diff --git a/be/src/runtime/task_group/task_group_manager.cpp 
b/be/src/runtime/task_group/task_group_manager.cpp
index da6294045f8..98043c6395a 100644
--- a/be/src/runtime/task_group/task_group_manager.cpp
+++ b/be/src/runtime/task_group/task_group_manager.cpp
@@ -68,7 +68,7 @@ TaskGroupPtr TaskGroupManager::get_task_group_by_id(uint64_t 
tg_id) {
 }
 
 bool TaskGroupManager::set_cg_task_sche_for_query_ctx(uint64_t tg_id, 
QueryContext* query_ctx_ptr) {
-    std::lock_guard<std::mutex> lock(_task_scheduler_lock);
+    std::lock_guard<std::shared_mutex> write_lock(_task_scheduler_lock);
     if (_tg_sche_map.find(tg_id) != _tg_sche_map.end()) {
         query_ctx_ptr->set_task_scheduler(_tg_sche_map.at(tg_id).get());
     } else {
@@ -91,7 +91,7 @@ Status 
TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_i
     uint64_t cpu_shares = tg_info->cpu_share;
     bool enable_cpu_hard_limit = tg_info->enable_cpu_hard_limit;
 
-    std::lock_guard<std::mutex> lock(_task_scheduler_lock);
+    std::lock_guard<std::shared_mutex> write_lock(_task_scheduler_lock);
     // step 1: init cgroup cpu controller
     CgroupCpuCtl* cg_cu_ctl_ptr = nullptr;
     if (_cgroup_ctl_map.find(tg_id) == _cgroup_ctl_map.end()) {
@@ -101,7 +101,8 @@ Status 
TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_i
             cg_cu_ctl_ptr = cgroup_cpu_ctl.get();
             _cgroup_ctl_map.emplace(tg_id, std::move(cgroup_cpu_ctl));
         } else {
-            return Status::InternalError<false>("cgroup init failed, gid={}", 
tg_id);
+            return Status::InternalError<false>("cgroup init failed, gid={}, 
reason={}", tg_id,
+                                                ret.to_string());
         }
     }
 
@@ -157,54 +158,83 @@ Status 
TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_i
     return Status::OK();
 }
 
-void TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> id_set) {
+void TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id) 
{
+    // stop task sche may cost some time, so it should not be locked
+    std::set<doris::pipeline::TaskScheduler*> task_sche_to_del;
+    std::set<vectorized::SimplifiedScanScheduler*> scan_task_sche_to_del;
+    std::set<uint64_t> deleted_tg_ids;
     {
-        std::lock_guard<std::shared_mutex> w_lock(_group_mutex);
-        for (auto iter = _task_groups.begin(); iter != _task_groups.end();) {
+        std::shared_lock<std::shared_mutex> read_lock(_task_scheduler_lock);
+        for (auto iter = _tg_sche_map.begin(); iter != _tg_sche_map.end(); 
iter++) {
             uint64_t tg_id = iter->first;
-            if (id_set.find(tg_id) == id_set.end()) {
-                iter = _task_groups.erase(iter);
-            } else {
-                iter++;
+            if (used_wg_id.find(tg_id) == used_wg_id.end()) {
+                task_sche_to_del.insert(_tg_sche_map[tg_id].get());
+                deleted_tg_ids.insert(tg_id);
             }
         }
-    }
 
-    // stop task sche may cost some time, so it should not be locked
-    // task scheduler is stoped in task scheduler's destructor
-    std::set<std::unique_ptr<doris::pipeline::TaskScheduler>> task_sche_to_del;
-    std::set<std::unique_ptr<vectorized::SimplifiedScanScheduler>> 
scan_task_sche_to_del;
-    {
-        std::lock_guard<std::mutex> lock(_task_scheduler_lock);
-        for (auto iter = _tg_sche_map.begin(); iter != _tg_sche_map.end();) {
+        for (auto iter = _tg_scan_sche_map.begin(); iter != 
_tg_scan_sche_map.end(); iter++) {
             uint64_t tg_id = iter->first;
-            if (id_set.find(tg_id) == id_set.end()) {
-                task_sche_to_del.insert(std::move(_tg_sche_map[tg_id]));
-                iter = _tg_sche_map.erase(iter);
-            } else {
-                iter++;
+            if (used_wg_id.find(tg_id) == used_wg_id.end()) {
+                scan_task_sche_to_del.insert(_tg_scan_sche_map[tg_id].get());
             }
         }
+    }
+    // 1 stop all threads
+    for (auto* ptr1 : task_sche_to_del) {
+        ptr1->stop();
+    }
+    for (auto* ptr2 : scan_task_sche_to_del) {
+        ptr2->stop();
+    }
+    // 2 release resource in memory
+    {
+        std::lock_guard<std::shared_mutex> write_lock(_task_scheduler_lock);
+        for (uint64_t tg_id : deleted_tg_ids) {
+            _tg_sche_map.erase(tg_id);
+            _tg_scan_sche_map.erase(tg_id);
+            _cgroup_ctl_map.erase(tg_id);
+        }
+    }
 
-        for (auto iter = _tg_scan_sche_map.begin(); iter != 
_tg_scan_sche_map.end();) {
+    {
+        std::lock_guard<std::shared_mutex> write_lock(_group_mutex);
+        for (auto iter = _task_groups.begin(); iter != _task_groups.end();) {
             uint64_t tg_id = iter->first;
-            if (id_set.find(tg_id) == id_set.end()) {
-                
scan_task_sche_to_del.insert(std::move(_tg_scan_sche_map[tg_id]));
-                iter = _tg_scan_sche_map.erase(iter);
+            if (used_wg_id.find(tg_id) == used_wg_id.end()) {
+                iter = _task_groups.erase(iter);
             } else {
                 iter++;
             }
         }
+    }
 
-        for (auto iter = _cgroup_ctl_map.begin(); iter != 
_cgroup_ctl_map.end();) {
-            uint64_t tg_id = iter->first;
-            if (id_set.find(tg_id) == id_set.end()) {
-                iter = _cgroup_ctl_map.erase(iter);
+    // 3 clear cgroup dir
+    // NOTE(wb) currently we use rmdir to delete cgroup path,
+    // this action may be failed until task file is cleared which means all 
thread are stopped.
+    // So the first time to rmdir a cgroup path may failed.
+    // Using cgdelete has no such issue.
+    {
+        std::lock_guard<std::shared_mutex> write_lock(_init_cg_ctl_lock);
+        if (!_cg_cpu_ctl) {
+            _cg_cpu_ctl = std::make_unique<CgroupV1CpuCtl>();
+        }
+        if (!_is_init_succ) {
+            Status ret = _cg_cpu_ctl->init();
+            if (ret.ok()) {
+                _is_init_succ = true;
             } else {
-                iter++;
+                LOG(INFO) << "init task group mgr cpu ctl failed, " << 
ret.to_string();
+            }
+        }
+        if (_is_init_succ) {
+            Status ret = _cg_cpu_ctl->delete_unused_cgroup_path(used_wg_id);
+            if (!ret.ok()) {
+                LOG(WARNING) << ret.to_string();
             }
         }
     }
+    LOG(INFO) << "finish clear unused cgroup path";
 }
 
 void TaskGroupManager::stop() {
diff --git a/be/src/runtime/task_group/task_group_manager.h 
b/be/src/runtime/task_group/task_group_manager.h
index 91156237f40..08968b6fe99 100644
--- a/be/src/runtime/task_group/task_group_manager.h
+++ b/be/src/runtime/task_group/task_group_manager.h
@@ -77,10 +77,14 @@ private:
 
     // map for workload group id and task scheduler pool
     // used for cpu hard limit
-    std::mutex _task_scheduler_lock;
+    std::shared_mutex _task_scheduler_lock;
     std::map<uint64_t, std::unique_ptr<doris::pipeline::TaskScheduler>> 
_tg_sche_map;
     std::map<uint64_t, std::unique_ptr<vectorized::SimplifiedScanScheduler>> 
_tg_scan_sche_map;
     std::map<uint64_t, std::unique_ptr<CgroupCpuCtl>> _cgroup_ctl_map;
+
+    std::shared_mutex _init_cg_ctl_lock;
+    std::unique_ptr<CgroupCpuCtl> _cg_cpu_ctl;
+    bool _is_init_succ = false;
 };
 
 } // namespace taskgroup


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to