This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 0b7becd4b71 [fix](executor)Fix memtracker not set to task group #27699 0b7becd4b71 is described below commit 0b7becd4b713ff7ad014f6923d372490c3352902 Author: wangbo <wan...@apache.org> AuthorDate: Thu Nov 30 22:35:51 2023 +0800 [fix](executor)Fix memtracker not set to task group #27699 --- be/src/agent/cgroup_cpu_ctl.cpp | 5 ++-- be/src/agent/cgroup_cpu_ctl.h | 4 +-- be/src/agent/workload_group_listener.cpp | 10 ++++--- be/src/pipeline/pipeline_fragment_context.cpp | 2 +- be/src/runtime/fragment_mgr.cpp | 36 ++++++++++++++---------- be/src/runtime/query_context.h | 2 ++ be/src/runtime/task_group/task_group.h | 2 +- be/src/runtime/task_group/task_group_manager.cpp | 6 ++-- be/src/runtime/task_group/task_group_manager.h | 8 ++++-- 9 files changed, 45 insertions(+), 30 deletions(-) diff --git a/be/src/agent/cgroup_cpu_ctl.cpp b/be/src/agent/cgroup_cpu_ctl.cpp index ac09725cb23..6fa234e6ee2 100644 --- a/be/src/agent/cgroup_cpu_ctl.cpp +++ b/be/src/agent/cgroup_cpu_ctl.cpp @@ -41,7 +41,7 @@ Status CgroupCpuCtl::init() { return Status::OK(); } -void CgroupCpuCtl::get_cgroup_cpu_info(uint64_t* cpu_shares, uint64_t* cpu_hard_limit) { +void CgroupCpuCtl::get_cgroup_cpu_info(uint64_t* cpu_shares, int* cpu_hard_limit) { std::lock_guard<std::shared_mutex> w_lock(_lock_mutex); *cpu_shares = this->_cpu_shares; *cpu_hard_limit = this->_cpu_hard_limit; @@ -137,7 +137,8 @@ Status CgroupV1CpuCtl::modify_cg_cpu_soft_limit_no_lock(int cpu_shares) { } Status CgroupV1CpuCtl::modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) { - int val = _cpu_cfs_period_us * _cpu_core_num * cpu_hard_limit / 100; + int val = cpu_hard_limit > 0 ? (_cpu_cfs_period_us * _cpu_core_num * cpu_hard_limit / 100) + : CPU_HARD_LIMIT_DEFAULT_VALUE; std::string msg = "modify cpu quota value to " + std::to_string(val); return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_tg_quota_file, val, msg, false); } diff --git a/be/src/agent/cgroup_cpu_ctl.h b/be/src/agent/cgroup_cpu_ctl.h index 4d78ca82ab7..2a7cdc5719b 100644 --- a/be/src/agent/cgroup_cpu_ctl.h +++ b/be/src/agent/cgroup_cpu_ctl.h @@ -48,7 +48,7 @@ public: void update_cpu_soft_limit(int cpu_shares); // for log - void get_cgroup_cpu_info(uint64_t* cpu_shares, uint64_t* cpu_hard_limit); + void get_cgroup_cpu_info(uint64_t* cpu_shares, int* cpu_hard_limit); protected: Status write_cg_sys_file(std::string file_path, int value, std::string msg, bool is_append); @@ -60,7 +60,7 @@ protected: std::string _doris_cgroup_cpu_path; uint64_t _cpu_core_num = CpuInfo::num_cores(); uint64_t _cpu_cfs_period_us = 100000; - uint64_t _cpu_hard_limit = 0; + int _cpu_hard_limit = 0; std::shared_mutex _lock_mutex; bool _init_succ = false; uint64_t _tg_id; // workload group id diff --git a/be/src/agent/workload_group_listener.cpp b/be/src/agent/workload_group_listener.cpp index bc1d3294f6c..6d7dfb9a3a0 100644 --- a/be/src/agent/workload_group_listener.cpp +++ b/be/src/agent/workload_group_listener.cpp @@ -50,8 +50,8 @@ void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& topi task_group_info.enable_cpu_hard_limit); // 4 create and update task scheduler - Status ret2 = - _exec_env->task_group_manager()->upsert_task_scheduler(&task_group_info, _exec_env); + Status ret2 = _exec_env->task_group_manager()->upsert_cg_task_scheduler(&task_group_info, + _exec_env); if (!ret2.ok()) { LOG(WARNING) << "upsert task sche failed, tg_id=" << task_group_info.id << ", reason=" << ret2.to_string(); @@ -59,9 +59,11 @@ void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& topi LOG(INFO) << "update task group success, tg info=" << tg->debug_string() << ", enable_cpu_hard_limit=" - << _exec_env->task_group_manager()->enable_cpu_hard_limit() + << (_exec_env->task_group_manager()->enable_cpu_hard_limit() ? "true" : "false") << ", cgroup cpu_shares=" << task_group_info.cgroup_cpu_shares - << ", cgroup cpu_hard_limit=" << task_group_info.cgroup_cpu_hard_limit; + << ", cgroup cpu_hard_limit=" << task_group_info.cgroup_cpu_hard_limit + << ", enable_cgroup_cpu_soft_limit=" + << (config::enable_cgroup_cpu_soft_limit ? "true" : "false"); } _exec_env->task_group_manager()->delete_task_group_by_ids(current_wg_ids); diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 44c9645fcc4..ce3d943af70 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -696,7 +696,7 @@ Status PipelineFragmentContext::submit() { auto* scheduler = _exec_env->pipeline_task_scheduler(); if (_query_ctx->get_task_scheduler()) { scheduler = _query_ctx->get_task_scheduler(); - } else if (_task_group_entity) { + } else if (_task_group_entity && _query_ctx->use_task_group_for_cpu_limit.load()) { scheduler = _exec_env->pipeline_task_group_scheduler(); } for (auto& task : _tasks) { diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 247be409d65..d4f11d25e2f 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -669,25 +669,31 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo uint64_t tg_id = params.workload_groups[0].id; auto* tg_mgr = _exec_env->task_group_manager(); if (auto task_group_ptr = tg_mgr->get_task_group_by_id(tg_id)) { - std::stringstream ss; - ss << "Query/load id: " << print_id(query_ctx->query_id()); - ss << " use task group " << task_group_ptr->debug_string(); - if (tg_mgr->enable_cpu_soft_limit() && !config::enable_cgroup_cpu_soft_limit) { - query_ctx->set_task_group(task_group_ptr); - ss << ", cpu soft limit based doris sche"; - } else { - bool ret = tg_mgr->set_task_sche_for_query_ctx(tg_id, query_ctx.get()); - if (tg_mgr->enable_cpu_hard_limit()) { - ss << ", cpu hard limit based cgroup"; + task_group_ptr->add_mem_tracker_limiter(query_ctx->query_mem_tracker); + // set task group to queryctx for memory tracker can be removed, see QueryContext's destructor + query_ctx->set_task_group(task_group_ptr); + stringstream ss; + ss << "Query/load id: " << print_id(query_ctx->query_id()) + << ", use task group:" << task_group_ptr->debug_string() + << ", enable cpu hard limit:" + << (tg_mgr->enable_cpu_hard_limit() ? "true" : "false"); + bool ret = false; + if (tg_mgr->enable_cgroup()) { + ret = tg_mgr->set_cg_task_sche_for_query_ctx(tg_id, query_ctx.get()); + if (ret) { + ss << ", use cgroup for cpu limit."; } else { - ss << ", cpu soft limit based cgroup"; - } - if (!ret) { - ss << ", but cgroup init failed, scan or exec fallback to no group"; + ss << ", not found cgroup sche, no limit for cpu."; } + } else { + ss << ", use doris sche for cpu limit."; + query_ctx->use_task_group_for_cpu_limit.store(true); } LOG(INFO) << ss.str(); - } // else, query run with no group + } else { + VLOG_DEBUG << "Query/load id: " << print_id(query_ctx->query_id()) + << " no task group found, does not use task group."; + } } else { VLOG_DEBUG << "Query/load id: " << print_id(query_ctx->query_id()) << " does not use task group."; diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 5810b06cfc6..0e3a04f8998 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -228,6 +228,8 @@ public: // only for file scan node std::map<int, TFileScanRangeParams> file_scan_range_params_map; + std::atomic<bool> use_task_group_for_cpu_limit = false; + private: TUniqueId _query_id; ExecEnv* _exec_env = nullptr; diff --git a/be/src/runtime/task_group/task_group.h b/be/src/runtime/task_group/task_group.h index 95a329757a2..f1c8523664e 100644 --- a/be/src/runtime/task_group/task_group.h +++ b/be/src/runtime/task_group/task_group.h @@ -173,7 +173,7 @@ struct TaskGroupInfo { bool enable_cpu_hard_limit; // log cgroup cpu info uint64_t cgroup_cpu_shares = 0; - uint64_t cgroup_cpu_hard_limit = 0; + int cgroup_cpu_hard_limit = 0; static Status parse_topic_info(const TWorkloadGroupInfo& topic_info, taskgroup::TaskGroupInfo* task_group_info); diff --git a/be/src/runtime/task_group/task_group_manager.cpp b/be/src/runtime/task_group/task_group_manager.cpp index f37ed7ff711..da6294045f8 100644 --- a/be/src/runtime/task_group/task_group_manager.cpp +++ b/be/src/runtime/task_group/task_group_manager.cpp @@ -67,7 +67,7 @@ TaskGroupPtr TaskGroupManager::get_task_group_by_id(uint64_t tg_id) { return nullptr; } -bool TaskGroupManager::set_task_sche_for_query_ctx(uint64_t tg_id, QueryContext* query_ctx_ptr) { +bool TaskGroupManager::set_cg_task_sche_for_query_ctx(uint64_t tg_id, QueryContext* query_ctx_ptr) { std::lock_guard<std::mutex> lock(_task_scheduler_lock); if (_tg_sche_map.find(tg_id) != _tg_sche_map.end()) { query_ctx_ptr->set_task_scheduler(_tg_sche_map.at(tg_id).get()); @@ -83,8 +83,8 @@ bool TaskGroupManager::set_task_sche_for_query_ctx(uint64_t tg_id, QueryContext* return true; } -Status TaskGroupManager::upsert_task_scheduler(taskgroup::TaskGroupInfo* tg_info, - ExecEnv* exec_env) { +Status TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_info, + ExecEnv* exec_env) { uint64_t tg_id = tg_info->id; std::string tg_name = tg_info->name; int cpu_hard_limit = tg_info->cpu_hard_limit; diff --git a/be/src/runtime/task_group/task_group_manager.h b/be/src/runtime/task_group/task_group_manager.h index 552cddfe9dc..91156237f40 100644 --- a/be/src/runtime/task_group/task_group_manager.h +++ b/be/src/runtime/task_group/task_group_manager.h @@ -51,7 +51,7 @@ public: void get_resource_groups(const std::function<bool(const TaskGroupPtr& ptr)>& pred, std::vector<TaskGroupPtr>* task_groups); - Status upsert_task_scheduler(taskgroup::TaskGroupInfo* tg_info, ExecEnv* exec_env); + Status upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_info, ExecEnv* exec_env); void delete_task_group_by_ids(std::set<uint64_t> id_set); @@ -65,7 +65,11 @@ public: bool enable_cpu_hard_limit() { return _enable_cpu_hard_limit.load(); } - bool set_task_sche_for_query_ctx(uint64_t tg_id, QueryContext* query_ctx_ptr); + bool set_cg_task_sche_for_query_ctx(uint64_t tg_id, QueryContext* query_ctx_ptr); + + // currently cgroup both support cpu soft limit and cpu hard limit + // doris task group only support cpu soft limit + bool enable_cgroup() { return enable_cpu_hard_limit() || config::enable_cgroup_cpu_soft_limit; } private: std::shared_mutex _group_mutex; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org