This is an automated email from the ASF dual-hosted git repository.

wangbo pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new b272247a578 [pick]log thread num (#37258)
b272247a578 is described below

commit b272247a578a49a6f48292c2596baf35da034984
Author: wangbo <wan...@apache.org>
AuthorDate: Thu Jul 4 15:27:52 2024 +0800

    [pick]log thread num (#37258)
    
    ## Proposed changes
    
    pick #37159
---
 be/src/agent/cgroup_cpu_ctl.cpp                  |  7 +++--
 be/src/agent/cgroup_cpu_ctl.h                    |  4 +--
 be/src/agent/workload_group_listener.cpp         | 14 ++++-----
 be/src/common/config.cpp                         |  1 -
 be/src/common/config.h                           |  1 -
 be/src/pipeline/task_scheduler.h                 |  2 ++
 be/src/runtime/fragment_mgr.cpp                  |  4 +--
 be/src/runtime/workload_group/workload_group.cpp | 40 +++++++++++++++++++-----
 be/src/runtime/workload_group/workload_group.h   |  6 ++--
 be/src/util/threadpool.h                         |  7 +++++
 be/src/vec/exec/scan/scanner_scheduler.h         |  2 ++
 11 files changed, 61 insertions(+), 27 deletions(-)

diff --git a/be/src/agent/cgroup_cpu_ctl.cpp b/be/src/agent/cgroup_cpu_ctl.cpp
index 3fe0778ecba..b141676545e 100644
--- a/be/src/agent/cgroup_cpu_ctl.cpp
+++ b/be/src/agent/cgroup_cpu_ctl.cpp
@@ -130,7 +130,7 @@ Status CgroupV1CpuCtl::init() {
         return Status::InternalError<false>("invalid cgroup path, not find cpu 
quota file");
     }
 
-    if (_tg_id == -1) {
+    if (_wg_id == -1) {
         // means current cgroup cpu ctl is just used to clear dir,
         // it does not contains workload group.
         // todo(wb) rethinking whether need to refactor cgroup_cpu_ctl
@@ -140,7 +140,7 @@ Status CgroupV1CpuCtl::init() {
     }
 
     // workload group path
-    _cgroup_v1_cpu_tg_path = _cgroup_v1_cpu_query_path + "/" + 
std::to_string(_tg_id);
+    _cgroup_v1_cpu_tg_path = _cgroup_v1_cpu_query_path + "/" + 
std::to_string(_wg_id);
     if (access(_cgroup_v1_cpu_tg_path.c_str(), F_OK) != 0) {
         int ret = mkdir(_cgroup_v1_cpu_tg_path.c_str(), S_IRWXU);
         if (ret != 0) {
@@ -186,7 +186,8 @@ Status CgroupV1CpuCtl::add_thread_to_cgroup() {
     return Status::OK();
 #else
     int tid = static_cast<int>(syscall(SYS_gettid));
-    std::string msg = "add thread " + std::to_string(tid) + " to group";
+    std::string msg =
+            "add thread " + std::to_string(tid) + " to group" + " " + 
std::to_string(_wg_id);
     std::lock_guard<std::shared_mutex> w_lock(_lock_mutex);
     return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_tg_task_file, tid, 
msg, true);
 #endif
diff --git a/be/src/agent/cgroup_cpu_ctl.h b/be/src/agent/cgroup_cpu_ctl.h
index 1289f26307b..b5f8d2d5d80 100644
--- a/be/src/agent/cgroup_cpu_ctl.h
+++ b/be/src/agent/cgroup_cpu_ctl.h
@@ -35,7 +35,7 @@ class CgroupCpuCtl {
 public:
     virtual ~CgroupCpuCtl() = default;
     CgroupCpuCtl() = default;
-    CgroupCpuCtl(uint64_t tg_id) { _tg_id = tg_id; }
+    CgroupCpuCtl(uint64_t wg_id) { _wg_id = wg_id; }
 
     virtual Status init();
 
@@ -63,7 +63,7 @@ protected:
     int _cpu_hard_limit = 0;
     std::shared_mutex _lock_mutex;
     bool _init_succ = false;
-    uint64_t _tg_id = -1; // workload group id
+    uint64_t _wg_id = -1; // workload group id
     uint64_t _cpu_shares = 0;
 };
 
diff --git a/be/src/agent/workload_group_listener.cpp 
b/be/src/agent/workload_group_listener.cpp
index 822e3c692f7..aca3ea597ce 100644
--- a/be/src/agent/workload_group_listener.cpp
+++ b/be/src/agent/workload_group_listener.cpp
@@ -43,13 +43,13 @@ void WorkloadGroupListener::handle_topic_info(const 
std::vector<TopicInfo>& topi
             current_wg_ids.insert(workload_group_info.id);
         }
         if (!ret.ok()) {
-            LOG(INFO) << "[topic_publish_wg]parse topic info failed, tg_id="
+            LOG(INFO) << "[topic_publish_wg]parse topic info failed, wg_id="
                       << workload_group_info.id << ", reason:" << 
ret.to_string();
             continue;
         }
 
         // 2 update workload group
-        auto tg =
+        auto wg =
                 
_exec_env->workload_group_mgr()->get_or_create_workload_group(workload_group_info);
 
         // 3 set cpu soft hard limit switch
@@ -57,17 +57,15 @@ void WorkloadGroupListener::handle_topic_info(const 
std::vector<TopicInfo>& topi
                 workload_group_info.enable_cpu_hard_limit);
 
         // 4 create and update task scheduler
-        tg->upsert_task_scheduler(&workload_group_info, _exec_env);
+        wg->upsert_task_scheduler(&workload_group_info, _exec_env);
 
-        LOG(INFO) << "[topic_publish_wg]update workload group finish, tg info="
-                  << tg->debug_string() << ", enable_cpu_hard_limit="
+        LOG(INFO) << "[topic_publish_wg]update workload group finish, wg info="
+                  << wg->debug_string() << ", enable_cpu_hard_limit="
                   << (_exec_env->workload_group_mgr()->enable_cpu_hard_limit() 
? "true" : "false")
                   << ", cgroup cpu_shares=" << 
workload_group_info.cgroup_cpu_shares
                   << ", cgroup cpu_hard_limit=" << 
workload_group_info.cgroup_cpu_hard_limit
-                  << ", enable_cgroup_cpu_soft_limit="
-                  << (config::enable_cgroup_cpu_soft_limit ? "true" : "false")
                   << ", cgroup home path=" << config::doris_cgroup_cpu_path
-                  << ", list size=" << list_size;
+                  << ", list size=" << list_size << ", thread info=" << 
wg->thread_debug_info();
     }
 
     // NOTE(wb) when is_set_workload_group_info=false, it means FE send a 
empty workload group list
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index f276487d152..cbae3ee896b 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1145,7 +1145,6 @@ DEFINE_Bool(enable_flush_file_cache_async, "true");
 
 // cgroup
 DEFINE_mString(doris_cgroup_cpu_path, "");
-DEFINE_mBool(enable_cgroup_cpu_soft_limit, "true");
 
 DEFINE_mBool(enable_workload_group_memory_gc, "true");
 
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 891a8333148..26e7fe00c79 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1219,7 +1219,6 @@ DECLARE_mBool(exit_on_exception);
 
 // cgroup
 DECLARE_mString(doris_cgroup_cpu_path);
-DECLARE_mBool(enable_cgroup_cpu_soft_limit);
 
 DECLARE_mBool(enable_workload_group_memory_gc);
 
diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h
index 5bbf85fad45..c33103bfd30 100644
--- a/be/src/pipeline/task_scheduler.h
+++ b/be/src/pipeline/task_scheduler.h
@@ -93,6 +93,8 @@ public:
 
     TaskQueue* task_queue() const { return _task_queue.get(); }
 
+    std::vector<int> thread_debug_info() { return 
_fix_thread_pool->debug_info(); }
+
 private:
     std::unique_ptr<ThreadPool> _fix_thread_pool;
     std::shared_ptr<TaskQueue> _task_queue;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index bd5308aeba1..dba128003bc 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -692,9 +692,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params, 
TUniqueId query_id, boo
 
                 LOG(INFO) << "Query/load id: " << 
print_id(query_ctx->query_id())
                           << ", use workload group: " << 
workload_group_ptr->debug_string()
-                          << ", is pipeline: " << ((int)is_pipeline)
-                          << ", enable cgroup soft limit: "
-                          << ((int)config::enable_cgroup_cpu_soft_limit);
+                          << ", is pipeline: " << ((int)is_pipeline);
             } else {
                 LOG(INFO) << "Query/load id: " << 
print_id(query_ctx->query_id())
                           << " carried group info but can not find group in 
be";
diff --git a/be/src/runtime/workload_group/workload_group.cpp 
b/be/src/runtime/workload_group/workload_group.cpp
index 372eecafd07..5a571528019 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -365,9 +365,9 @@ void 
WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
         Status ret = cgroup_cpu_ctl->init();
         if (ret.ok()) {
             _cgroup_cpu_ctl = std::move(cgroup_cpu_ctl);
-            LOG(INFO) << "[upsert wg thread pool] cgroup init success";
+            LOG(INFO) << "[upsert wg thread pool] cgroup init success, wg_id=" 
<< tg_id;
         } else {
-            LOG(INFO) << "[upsert wg thread pool] cgroup init failed, gid= " 
<< tg_id
+            LOG(INFO) << "[upsert wg thread pool] cgroup init failed, wg_id= " 
<< tg_id
                       << ", reason=" << ret.to_string();
         }
     }
@@ -467,11 +467,9 @@ void 
WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
                           << cpu_hard_limit << ", gid=" << tg_id;
             }
         } else {
-            if (config::enable_cgroup_cpu_soft_limit) {
-                _cgroup_cpu_ctl->update_cpu_soft_limit(cpu_shares);
-                _cgroup_cpu_ctl->update_cpu_hard_limit(
-                        CPU_HARD_LIMIT_DEFAULT_VALUE); // disable cpu hard 
limit
-            }
+            _cgroup_cpu_ctl->update_cpu_soft_limit(cpu_shares);
+            _cgroup_cpu_ctl->update_cpu_hard_limit(
+                    CPU_HARD_LIMIT_DEFAULT_VALUE); // disable cpu hard limit
         }
         _cgroup_cpu_ctl->get_cgroup_cpu_info(&(tg_info->cgroup_cpu_shares),
                                              
&(tg_info->cgroup_cpu_hard_limit));
@@ -489,6 +487,34 @@ void 
WorkloadGroup::get_query_scheduler(doris::pipeline::TaskScheduler** exec_sc
     *memtable_flush_pool = _memtable_flush_pool.get();
 }
 
+std::string WorkloadGroup::thread_debug_info() {
+    std::string str = "";
+    if (_task_sched != nullptr) {
+        std::vector<int> exec_t_info = _task_sched->thread_debug_info();
+        str = fmt::format("[exec num:{}, real_num:{}, min_num:{}, 
max_num:{}],", exec_t_info[0],
+                          exec_t_info[1], exec_t_info[2], exec_t_info[3]);
+    }
+
+    if (_scan_task_sched != nullptr) {
+        std::vector<int> exec_t_info = _scan_task_sched->thread_debug_info();
+        str += fmt::format("[l_scan num:{}, real_num:{}, min_num:{}, 
max_num{}],", exec_t_info[0],
+                           exec_t_info[1], exec_t_info[2], exec_t_info[3]);
+    }
+
+    if (_remote_scan_task_sched != nullptr) {
+        std::vector<int> exec_t_info = 
_remote_scan_task_sched->thread_debug_info();
+        str += fmt::format("[r_scan num:{}, real_num:{}, min_num:{}, 
max_num:{}],", exec_t_info[0],
+                           exec_t_info[1], exec_t_info[2], exec_t_info[3]);
+    }
+
+    if (_memtable_flush_pool != nullptr) {
+        std::vector<int> exec_t_info = _memtable_flush_pool->debug_info();
+        str += fmt::format("[mem_tab_flush num:{}, real_num:{}, min_num:{}, 
max_num:{}]",
+                           exec_t_info[0], exec_t_info[1], exec_t_info[2], 
exec_t_info[3]);
+    }
+    return str;
+}
+
 void WorkloadGroup::try_stop_schedulers() {
     std::shared_lock<std::shared_mutex> rlock(_task_sched_lock);
     if (_task_sched) {
diff --git a/be/src/runtime/workload_group/workload_group.h 
b/be/src/runtime/workload_group/workload_group.h
index e41c1793233..b57e5736eb2 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -162,6 +162,8 @@ public:
         return _query_ctxs;
     }
 
+    std::string thread_debug_info();
+
 private:
     mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, 
_memory_limit
     const uint64_t _id;
@@ -186,11 +188,11 @@ private:
     std::unordered_map<TUniqueId, std::weak_ptr<QueryContext>> _query_ctxs;
 
     std::shared_mutex _task_sched_lock;
-    std::unique_ptr<CgroupCpuCtl> _cgroup_cpu_ctl = nullptr;
+    std::unique_ptr<CgroupCpuCtl> _cgroup_cpu_ctl {nullptr};
     std::unique_ptr<doris::pipeline::TaskScheduler> _task_sched {nullptr};
     std::unique_ptr<vectorized::SimplifiedScanScheduler> _scan_task_sched 
{nullptr};
     std::unique_ptr<vectorized::SimplifiedScanScheduler> 
_remote_scan_task_sched {nullptr};
-    std::unique_ptr<ThreadPool> _memtable_flush_pool = nullptr;
+    std::unique_ptr<ThreadPool> _memtable_flush_pool {nullptr};
 };
 
 using WorkloadGroupPtr = std::shared_ptr<WorkloadGroup>;
diff --git a/be/src/util/threadpool.h b/be/src/util/threadpool.h
index 526836cb09e..5ce27e2f27b 100644
--- a/be/src/util/threadpool.h
+++ b/be/src/util/threadpool.h
@@ -256,6 +256,13 @@ public:
         return _total_queued_tasks;
     }
 
+    std::vector<int> debug_info() {
+        std::lock_guard<std::mutex> l(_lock);
+        std::vector<int> arr = {_num_threads, 
static_cast<int>(_threads.size()), _min_threads,
+                                _max_threads};
+        return arr;
+    }
+
 private:
     friend class ThreadPoolBuilder;
     friend class ThreadPoolToken;
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h 
b/be/src/vec/exec/scan/scanner_scheduler.h
index f194afe4bb0..238afc15bf6 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -193,6 +193,8 @@ public:
         }
     }
 
+    std::vector<int> thread_debug_info() { return 
_scan_thread_pool->debug_info(); }
+
 private:
     std::unique_ptr<ThreadPool> _scan_thread_pool;
     std::atomic<bool> _is_stop;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to