This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b7becd4b71  [fix](executor)Fix memtracker not set to task group #27699
0b7becd4b71 is described below

commit 0b7becd4b713ff7ad014f6923d372490c3352902
Author: wangbo <wan...@apache.org>
AuthorDate: Thu Nov 30 22:35:51 2023 +0800

     [fix](executor)Fix memtracker not set to task group #27699
---
 be/src/agent/cgroup_cpu_ctl.cpp                  |  5 ++--
 be/src/agent/cgroup_cpu_ctl.h                    |  4 +--
 be/src/agent/workload_group_listener.cpp         | 10 ++++---
 be/src/pipeline/pipeline_fragment_context.cpp    |  2 +-
 be/src/runtime/fragment_mgr.cpp                  | 36 ++++++++++++++----------
 be/src/runtime/query_context.h                   |  2 ++
 be/src/runtime/task_group/task_group.h           |  2 +-
 be/src/runtime/task_group/task_group_manager.cpp |  6 ++--
 be/src/runtime/task_group/task_group_manager.h   |  8 ++++--
 9 files changed, 45 insertions(+), 30 deletions(-)

diff --git a/be/src/agent/cgroup_cpu_ctl.cpp b/be/src/agent/cgroup_cpu_ctl.cpp
index ac09725cb23..6fa234e6ee2 100644
--- a/be/src/agent/cgroup_cpu_ctl.cpp
+++ b/be/src/agent/cgroup_cpu_ctl.cpp
@@ -41,7 +41,7 @@ Status CgroupCpuCtl::init() {
     return Status::OK();
 }
 
-void CgroupCpuCtl::get_cgroup_cpu_info(uint64_t* cpu_shares, uint64_t* 
cpu_hard_limit) {
+void CgroupCpuCtl::get_cgroup_cpu_info(uint64_t* cpu_shares, int* 
cpu_hard_limit) {
     std::lock_guard<std::shared_mutex> w_lock(_lock_mutex);
     *cpu_shares = this->_cpu_shares;
     *cpu_hard_limit = this->_cpu_hard_limit;
@@ -137,7 +137,8 @@ Status CgroupV1CpuCtl::modify_cg_cpu_soft_limit_no_lock(int 
cpu_shares) {
 }
 
 Status CgroupV1CpuCtl::modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) {
-    int val = _cpu_cfs_period_us * _cpu_core_num * cpu_hard_limit / 100;
+    int val = cpu_hard_limit > 0 ? (_cpu_cfs_period_us * _cpu_core_num * 
cpu_hard_limit / 100)
+                                 : CPU_HARD_LIMIT_DEFAULT_VALUE;
     std::string msg = "modify cpu quota value to " + std::to_string(val);
     return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_tg_quota_file, val, 
msg, false);
 }
diff --git a/be/src/agent/cgroup_cpu_ctl.h b/be/src/agent/cgroup_cpu_ctl.h
index 4d78ca82ab7..2a7cdc5719b 100644
--- a/be/src/agent/cgroup_cpu_ctl.h
+++ b/be/src/agent/cgroup_cpu_ctl.h
@@ -48,7 +48,7 @@ public:
     void update_cpu_soft_limit(int cpu_shares);
 
     // for log
-    void get_cgroup_cpu_info(uint64_t* cpu_shares, uint64_t* cpu_hard_limit);
+    void get_cgroup_cpu_info(uint64_t* cpu_shares, int* cpu_hard_limit);
 
 protected:
     Status write_cg_sys_file(std::string file_path, int value, std::string 
msg, bool is_append);
@@ -60,7 +60,7 @@ protected:
     std::string _doris_cgroup_cpu_path;
     uint64_t _cpu_core_num = CpuInfo::num_cores();
     uint64_t _cpu_cfs_period_us = 100000;
-    uint64_t _cpu_hard_limit = 0;
+    int _cpu_hard_limit = 0;
     std::shared_mutex _lock_mutex;
     bool _init_succ = false;
     uint64_t _tg_id; // workload group id
diff --git a/be/src/agent/workload_group_listener.cpp 
b/be/src/agent/workload_group_listener.cpp
index bc1d3294f6c..6d7dfb9a3a0 100644
--- a/be/src/agent/workload_group_listener.cpp
+++ b/be/src/agent/workload_group_listener.cpp
@@ -50,8 +50,8 @@ void WorkloadGroupListener::handle_topic_info(const 
std::vector<TopicInfo>& topi
                 task_group_info.enable_cpu_hard_limit);
 
         // 4 create and update task scheduler
-        Status ret2 =
-                
_exec_env->task_group_manager()->upsert_task_scheduler(&task_group_info, 
_exec_env);
+        Status ret2 = 
_exec_env->task_group_manager()->upsert_cg_task_scheduler(&task_group_info,
+                                                                               
 _exec_env);
         if (!ret2.ok()) {
             LOG(WARNING) << "upsert task sche failed, tg_id=" << 
task_group_info.id
                          << ", reason=" << ret2.to_string();
@@ -59,9 +59,11 @@ void WorkloadGroupListener::handle_topic_info(const 
std::vector<TopicInfo>& topi
 
         LOG(INFO) << "update task group success, tg info=" << 
tg->debug_string()
                   << ", enable_cpu_hard_limit="
-                  << _exec_env->task_group_manager()->enable_cpu_hard_limit()
+                  << (_exec_env->task_group_manager()->enable_cpu_hard_limit() 
? "true" : "false")
                   << ", cgroup cpu_shares=" << 
task_group_info.cgroup_cpu_shares
-                  << ", cgroup cpu_hard_limit=" << 
task_group_info.cgroup_cpu_hard_limit;
+                  << ", cgroup cpu_hard_limit=" << 
task_group_info.cgroup_cpu_hard_limit
+                  << ", enable_cgroup_cpu_soft_limit="
+                  << (config::enable_cgroup_cpu_soft_limit ? "true" : "false");
     }
 
     _exec_env->task_group_manager()->delete_task_group_by_ids(current_wg_ids);
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 44c9645fcc4..ce3d943af70 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -696,7 +696,7 @@ Status PipelineFragmentContext::submit() {
     auto* scheduler = _exec_env->pipeline_task_scheduler();
     if (_query_ctx->get_task_scheduler()) {
         scheduler = _query_ctx->get_task_scheduler();
-    } else if (_task_group_entity) {
+    } else if (_task_group_entity && 
_query_ctx->use_task_group_for_cpu_limit.load()) {
         scheduler = _exec_env->pipeline_task_group_scheduler();
     }
     for (auto& task : _tasks) {
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 247be409d65..d4f11d25e2f 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -669,25 +669,31 @@ Status FragmentMgr::_get_query_ctx(const Params& params, 
TUniqueId query_id, boo
                 uint64_t tg_id = params.workload_groups[0].id;
                 auto* tg_mgr = _exec_env->task_group_manager();
                 if (auto task_group_ptr = tg_mgr->get_task_group_by_id(tg_id)) 
{
-                    std::stringstream ss;
-                    ss << "Query/load id: " << print_id(query_ctx->query_id());
-                    ss << " use task group " << task_group_ptr->debug_string();
-                    if (tg_mgr->enable_cpu_soft_limit() && 
!config::enable_cgroup_cpu_soft_limit) {
-                        query_ctx->set_task_group(task_group_ptr);
-                        ss << ", cpu soft limit based doris sche";
-                    } else {
-                        bool ret = tg_mgr->set_task_sche_for_query_ctx(tg_id, 
query_ctx.get());
-                        if (tg_mgr->enable_cpu_hard_limit()) {
-                            ss << ", cpu hard limit based cgroup";
+                    
task_group_ptr->add_mem_tracker_limiter(query_ctx->query_mem_tracker);
+                    // set task group to queryctx for memory tracker can be 
removed, see QueryContext's destructor
+                    query_ctx->set_task_group(task_group_ptr);
+                    stringstream ss;
+                    ss << "Query/load id: " << print_id(query_ctx->query_id())
+                       << ", use task group:" << task_group_ptr->debug_string()
+                       << ", enable cpu hard limit:"
+                       << (tg_mgr->enable_cpu_hard_limit() ? "true" : "false");
+                    bool ret = false;
+                    if (tg_mgr->enable_cgroup()) {
+                        ret = tg_mgr->set_cg_task_sche_for_query_ctx(tg_id, 
query_ctx.get());
+                        if (ret) {
+                            ss << ", use cgroup for cpu limit.";
                         } else {
-                            ss << ", cpu soft limit based cgroup";
-                        }
-                        if (!ret) {
-                            ss << ", but cgroup init failed, scan or exec 
fallback to no group";
+                            ss << ", not found cgroup sche, no limit for cpu.";
                         }
+                    } else {
+                        ss << ", use doris sche for cpu limit.";
+                        query_ctx->use_task_group_for_cpu_limit.store(true);
                     }
                     LOG(INFO) << ss.str();
-                } // else, query run with no group
+                } else {
+                    VLOG_DEBUG << "Query/load id: " << 
print_id(query_ctx->query_id())
+                               << " no task group found, does not use task 
group.";
+                }
             } else {
                 VLOG_DEBUG << "Query/load id: " << 
print_id(query_ctx->query_id())
                            << " does not use task group.";
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 5810b06cfc6..0e3a04f8998 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -228,6 +228,8 @@ public:
     // only for file scan node
     std::map<int, TFileScanRangeParams> file_scan_range_params_map;
 
+    std::atomic<bool> use_task_group_for_cpu_limit = false;
+
 private:
     TUniqueId _query_id;
     ExecEnv* _exec_env = nullptr;
diff --git a/be/src/runtime/task_group/task_group.h 
b/be/src/runtime/task_group/task_group.h
index 95a329757a2..f1c8523664e 100644
--- a/be/src/runtime/task_group/task_group.h
+++ b/be/src/runtime/task_group/task_group.h
@@ -173,7 +173,7 @@ struct TaskGroupInfo {
     bool enable_cpu_hard_limit;
     // log cgroup cpu info
     uint64_t cgroup_cpu_shares = 0;
-    uint64_t cgroup_cpu_hard_limit = 0;
+    int cgroup_cpu_hard_limit = 0;
 
     static Status parse_topic_info(const TWorkloadGroupInfo& topic_info,
                                    taskgroup::TaskGroupInfo* task_group_info);
diff --git a/be/src/runtime/task_group/task_group_manager.cpp 
b/be/src/runtime/task_group/task_group_manager.cpp
index f37ed7ff711..da6294045f8 100644
--- a/be/src/runtime/task_group/task_group_manager.cpp
+++ b/be/src/runtime/task_group/task_group_manager.cpp
@@ -67,7 +67,7 @@ TaskGroupPtr TaskGroupManager::get_task_group_by_id(uint64_t 
tg_id) {
     return nullptr;
 }
 
-bool TaskGroupManager::set_task_sche_for_query_ctx(uint64_t tg_id, 
QueryContext* query_ctx_ptr) {
+bool TaskGroupManager::set_cg_task_sche_for_query_ctx(uint64_t tg_id, 
QueryContext* query_ctx_ptr) {
     std::lock_guard<std::mutex> lock(_task_scheduler_lock);
     if (_tg_sche_map.find(tg_id) != _tg_sche_map.end()) {
         query_ctx_ptr->set_task_scheduler(_tg_sche_map.at(tg_id).get());
@@ -83,8 +83,8 @@ bool TaskGroupManager::set_task_sche_for_query_ctx(uint64_t 
tg_id, QueryContext*
     return true;
 }
 
-Status TaskGroupManager::upsert_task_scheduler(taskgroup::TaskGroupInfo* 
tg_info,
-                                               ExecEnv* exec_env) {
+Status TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* 
tg_info,
+                                                  ExecEnv* exec_env) {
     uint64_t tg_id = tg_info->id;
     std::string tg_name = tg_info->name;
     int cpu_hard_limit = tg_info->cpu_hard_limit;
diff --git a/be/src/runtime/task_group/task_group_manager.h 
b/be/src/runtime/task_group/task_group_manager.h
index 552cddfe9dc..91156237f40 100644
--- a/be/src/runtime/task_group/task_group_manager.h
+++ b/be/src/runtime/task_group/task_group_manager.h
@@ -51,7 +51,7 @@ public:
     void get_resource_groups(const std::function<bool(const TaskGroupPtr& 
ptr)>& pred,
                              std::vector<TaskGroupPtr>* task_groups);
 
-    Status upsert_task_scheduler(taskgroup::TaskGroupInfo* tg_info, ExecEnv* 
exec_env);
+    Status upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_info, 
ExecEnv* exec_env);
 
     void delete_task_group_by_ids(std::set<uint64_t> id_set);
 
@@ -65,7 +65,11 @@ public:
 
     bool enable_cpu_hard_limit() { return _enable_cpu_hard_limit.load(); }
 
-    bool set_task_sche_for_query_ctx(uint64_t tg_id, QueryContext* 
query_ctx_ptr);
+    bool set_cg_task_sche_for_query_ctx(uint64_t tg_id, QueryContext* 
query_ctx_ptr);
+
+    // currently cgroup both support cpu soft limit and cpu hard limit
+    // doris task group only support cpu soft limit
+    bool enable_cgroup() { return enable_cpu_hard_limit() || 
config::enable_cgroup_cpu_soft_limit; }
 
 private:
     std::shared_mutex _group_mutex;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to