This is an automated email from the ASF dual-hosted git repository. wangbo pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new b272247a578 [pick]log thread num (#37258) b272247a578 is described below commit b272247a578a49a6f48292c2596baf35da034984 Author: wangbo <wan...@apache.org> AuthorDate: Thu Jul 4 15:27:52 2024 +0800 [pick]log thread num (#37258) ## Proposed changes pick #37159 --- be/src/agent/cgroup_cpu_ctl.cpp | 7 +++-- be/src/agent/cgroup_cpu_ctl.h | 4 +-- be/src/agent/workload_group_listener.cpp | 14 ++++----- be/src/common/config.cpp | 1 - be/src/common/config.h | 1 - be/src/pipeline/task_scheduler.h | 2 ++ be/src/runtime/fragment_mgr.cpp | 4 +-- be/src/runtime/workload_group/workload_group.cpp | 40 +++++++++++++++++++----- be/src/runtime/workload_group/workload_group.h | 6 ++-- be/src/util/threadpool.h | 7 +++++ be/src/vec/exec/scan/scanner_scheduler.h | 2 ++ 11 files changed, 61 insertions(+), 27 deletions(-) diff --git a/be/src/agent/cgroup_cpu_ctl.cpp b/be/src/agent/cgroup_cpu_ctl.cpp index 3fe0778ecba..b141676545e 100644 --- a/be/src/agent/cgroup_cpu_ctl.cpp +++ b/be/src/agent/cgroup_cpu_ctl.cpp @@ -130,7 +130,7 @@ Status CgroupV1CpuCtl::init() { return Status::InternalError<false>("invalid cgroup path, not find cpu quota file"); } - if (_tg_id == -1) { + if (_wg_id == -1) { // means current cgroup cpu ctl is just used to clear dir, // it does not contains workload group. // todo(wb) rethinking whether need to refactor cgroup_cpu_ctl @@ -140,7 +140,7 @@ Status CgroupV1CpuCtl::init() { } // workload group path - _cgroup_v1_cpu_tg_path = _cgroup_v1_cpu_query_path + "/" + std::to_string(_tg_id); + _cgroup_v1_cpu_tg_path = _cgroup_v1_cpu_query_path + "/" + std::to_string(_wg_id); if (access(_cgroup_v1_cpu_tg_path.c_str(), F_OK) != 0) { int ret = mkdir(_cgroup_v1_cpu_tg_path.c_str(), S_IRWXU); if (ret != 0) { @@ -186,7 +186,8 @@ Status CgroupV1CpuCtl::add_thread_to_cgroup() { return Status::OK(); #else int tid = static_cast<int>(syscall(SYS_gettid)); - std::string msg = "add thread " + std::to_string(tid) + " to group"; + std::string msg = + "add thread " + std::to_string(tid) + " to group" + " " + std::to_string(_wg_id); std::lock_guard<std::shared_mutex> w_lock(_lock_mutex); return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_tg_task_file, tid, msg, true); #endif diff --git a/be/src/agent/cgroup_cpu_ctl.h b/be/src/agent/cgroup_cpu_ctl.h index 1289f26307b..b5f8d2d5d80 100644 --- a/be/src/agent/cgroup_cpu_ctl.h +++ b/be/src/agent/cgroup_cpu_ctl.h @@ -35,7 +35,7 @@ class CgroupCpuCtl { public: virtual ~CgroupCpuCtl() = default; CgroupCpuCtl() = default; - CgroupCpuCtl(uint64_t tg_id) { _tg_id = tg_id; } + CgroupCpuCtl(uint64_t wg_id) { _wg_id = wg_id; } virtual Status init(); @@ -63,7 +63,7 @@ protected: int _cpu_hard_limit = 0; std::shared_mutex _lock_mutex; bool _init_succ = false; - uint64_t _tg_id = -1; // workload group id + uint64_t _wg_id = -1; // workload group id uint64_t _cpu_shares = 0; }; diff --git a/be/src/agent/workload_group_listener.cpp b/be/src/agent/workload_group_listener.cpp index 822e3c692f7..aca3ea597ce 100644 --- a/be/src/agent/workload_group_listener.cpp +++ b/be/src/agent/workload_group_listener.cpp @@ -43,13 +43,13 @@ void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& topi current_wg_ids.insert(workload_group_info.id); } if (!ret.ok()) { - LOG(INFO) << "[topic_publish_wg]parse topic info failed, tg_id=" + LOG(INFO) << "[topic_publish_wg]parse topic info failed, wg_id=" << workload_group_info.id << ", reason:" << ret.to_string(); continue; } // 2 update workload group - auto tg = + auto wg = _exec_env->workload_group_mgr()->get_or_create_workload_group(workload_group_info); // 3 set cpu soft hard limit switch @@ -57,17 +57,15 @@ void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& topi workload_group_info.enable_cpu_hard_limit); // 4 create and update task scheduler - tg->upsert_task_scheduler(&workload_group_info, _exec_env); + wg->upsert_task_scheduler(&workload_group_info, _exec_env); - LOG(INFO) << "[topic_publish_wg]update workload group finish, tg info=" - << tg->debug_string() << ", enable_cpu_hard_limit=" + LOG(INFO) << "[topic_publish_wg]update workload group finish, wg info=" + << wg->debug_string() << ", enable_cpu_hard_limit=" << (_exec_env->workload_group_mgr()->enable_cpu_hard_limit() ? "true" : "false") << ", cgroup cpu_shares=" << workload_group_info.cgroup_cpu_shares << ", cgroup cpu_hard_limit=" << workload_group_info.cgroup_cpu_hard_limit - << ", enable_cgroup_cpu_soft_limit=" - << (config::enable_cgroup_cpu_soft_limit ? "true" : "false") << ", cgroup home path=" << config::doris_cgroup_cpu_path - << ", list size=" << list_size; + << ", list size=" << list_size << ", thread info=" << wg->thread_debug_info(); } // NOTE(wb) when is_set_workload_group_info=false, it means FE send a empty workload group list diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index f276487d152..cbae3ee896b 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1145,7 +1145,6 @@ DEFINE_Bool(enable_flush_file_cache_async, "true"); // cgroup DEFINE_mString(doris_cgroup_cpu_path, ""); -DEFINE_mBool(enable_cgroup_cpu_soft_limit, "true"); DEFINE_mBool(enable_workload_group_memory_gc, "true"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 891a8333148..26e7fe00c79 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1219,7 +1219,6 @@ DECLARE_mBool(exit_on_exception); // cgroup DECLARE_mString(doris_cgroup_cpu_path); -DECLARE_mBool(enable_cgroup_cpu_soft_limit); DECLARE_mBool(enable_workload_group_memory_gc); diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h index 5bbf85fad45..c33103bfd30 100644 --- a/be/src/pipeline/task_scheduler.h +++ b/be/src/pipeline/task_scheduler.h @@ -93,6 +93,8 @@ public: TaskQueue* task_queue() const { return _task_queue.get(); } + std::vector<int> thread_debug_info() { return _fix_thread_pool->debug_info(); } + private: std::unique_ptr<ThreadPool> _fix_thread_pool; std::shared_ptr<TaskQueue> _task_queue; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index bd5308aeba1..dba128003bc 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -692,9 +692,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo LOG(INFO) << "Query/load id: " << print_id(query_ctx->query_id()) << ", use workload group: " << workload_group_ptr->debug_string() - << ", is pipeline: " << ((int)is_pipeline) - << ", enable cgroup soft limit: " - << ((int)config::enable_cgroup_cpu_soft_limit); + << ", is pipeline: " << ((int)is_pipeline); } else { LOG(INFO) << "Query/load id: " << print_id(query_ctx->query_id()) << " carried group info but can not find group in be"; diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index 372eecafd07..5a571528019 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -365,9 +365,9 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e Status ret = cgroup_cpu_ctl->init(); if (ret.ok()) { _cgroup_cpu_ctl = std::move(cgroup_cpu_ctl); - LOG(INFO) << "[upsert wg thread pool] cgroup init success"; + LOG(INFO) << "[upsert wg thread pool] cgroup init success, wg_id=" << tg_id; } else { - LOG(INFO) << "[upsert wg thread pool] cgroup init failed, gid= " << tg_id + LOG(INFO) << "[upsert wg thread pool] cgroup init failed, wg_id= " << tg_id << ", reason=" << ret.to_string(); } } @@ -467,11 +467,9 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e << cpu_hard_limit << ", gid=" << tg_id; } } else { - if (config::enable_cgroup_cpu_soft_limit) { - _cgroup_cpu_ctl->update_cpu_soft_limit(cpu_shares); - _cgroup_cpu_ctl->update_cpu_hard_limit( - CPU_HARD_LIMIT_DEFAULT_VALUE); // disable cpu hard limit - } + _cgroup_cpu_ctl->update_cpu_soft_limit(cpu_shares); + _cgroup_cpu_ctl->update_cpu_hard_limit( + CPU_HARD_LIMIT_DEFAULT_VALUE); // disable cpu hard limit } _cgroup_cpu_ctl->get_cgroup_cpu_info(&(tg_info->cgroup_cpu_shares), &(tg_info->cgroup_cpu_hard_limit)); @@ -489,6 +487,34 @@ void WorkloadGroup::get_query_scheduler(doris::pipeline::TaskScheduler** exec_sc *memtable_flush_pool = _memtable_flush_pool.get(); } +std::string WorkloadGroup::thread_debug_info() { + std::string str = ""; + if (_task_sched != nullptr) { + std::vector<int> exec_t_info = _task_sched->thread_debug_info(); + str = fmt::format("[exec num:{}, real_num:{}, min_num:{}, max_num:{}],", exec_t_info[0], + exec_t_info[1], exec_t_info[2], exec_t_info[3]); + } + + if (_scan_task_sched != nullptr) { + std::vector<int> exec_t_info = _scan_task_sched->thread_debug_info(); + str += fmt::format("[l_scan num:{}, real_num:{}, min_num:{}, max_num{}],", exec_t_info[0], + exec_t_info[1], exec_t_info[2], exec_t_info[3]); + } + + if (_remote_scan_task_sched != nullptr) { + std::vector<int> exec_t_info = _remote_scan_task_sched->thread_debug_info(); + str += fmt::format("[r_scan num:{}, real_num:{}, min_num:{}, max_num:{}],", exec_t_info[0], + exec_t_info[1], exec_t_info[2], exec_t_info[3]); + } + + if (_memtable_flush_pool != nullptr) { + std::vector<int> exec_t_info = _memtable_flush_pool->debug_info(); + str += fmt::format("[mem_tab_flush num:{}, real_num:{}, min_num:{}, max_num:{}]", + exec_t_info[0], exec_t_info[1], exec_t_info[2], exec_t_info[3]); + } + return str; +} + void WorkloadGroup::try_stop_schedulers() { std::shared_lock<std::shared_mutex> rlock(_task_sched_lock); if (_task_sched) { diff --git a/be/src/runtime/workload_group/workload_group.h b/be/src/runtime/workload_group/workload_group.h index e41c1793233..b57e5736eb2 100644 --- a/be/src/runtime/workload_group/workload_group.h +++ b/be/src/runtime/workload_group/workload_group.h @@ -162,6 +162,8 @@ public: return _query_ctxs; } + std::string thread_debug_info(); + private: mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, _memory_limit const uint64_t _id; @@ -186,11 +188,11 @@ private: std::unordered_map<TUniqueId, std::weak_ptr<QueryContext>> _query_ctxs; std::shared_mutex _task_sched_lock; - std::unique_ptr<CgroupCpuCtl> _cgroup_cpu_ctl = nullptr; + std::unique_ptr<CgroupCpuCtl> _cgroup_cpu_ctl {nullptr}; std::unique_ptr<doris::pipeline::TaskScheduler> _task_sched {nullptr}; std::unique_ptr<vectorized::SimplifiedScanScheduler> _scan_task_sched {nullptr}; std::unique_ptr<vectorized::SimplifiedScanScheduler> _remote_scan_task_sched {nullptr}; - std::unique_ptr<ThreadPool> _memtable_flush_pool = nullptr; + std::unique_ptr<ThreadPool> _memtable_flush_pool {nullptr}; }; using WorkloadGroupPtr = std::shared_ptr<WorkloadGroup>; diff --git a/be/src/util/threadpool.h b/be/src/util/threadpool.h index 526836cb09e..5ce27e2f27b 100644 --- a/be/src/util/threadpool.h +++ b/be/src/util/threadpool.h @@ -256,6 +256,13 @@ public: return _total_queued_tasks; } + std::vector<int> debug_info() { + std::lock_guard<std::mutex> l(_lock); + std::vector<int> arr = {_num_threads, static_cast<int>(_threads.size()), _min_threads, + _max_threads}; + return arr; + } + private: friend class ThreadPoolBuilder; friend class ThreadPoolToken; diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index f194afe4bb0..238afc15bf6 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -193,6 +193,8 @@ public: } } + std::vector<int> thread_debug_info() { return _scan_thread_pool->debug_info(); } + private: std::unique_ptr<ThreadPool> _scan_thread_pool; std::atomic<bool> _is_stop; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org