This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 36c8b59607a [feature](cpu cores) get the cores when running within a 
cgroup. (#32370)
36c8b59607a is described below

commit 36c8b59607a0cb6d194bfec6c667fdd473d7638c
Author: Mryange <59914473+mrya...@users.noreply.github.com>
AuthorDate: Wed Mar 20 18:27:27 2024 +0800

    [feature](cpu cores) get the cores when running within a cgroup. (#32370)
    
    get the cores when running within a cgroup
---
 be/src/runtime/exec_env_init.cpp |  1 +
 be/src/util/cpu_info.cpp         | 83 +++++++++++++++++++++++++++++++++-------
 2 files changed, 70 insertions(+), 14 deletions(-)

diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 9a4c18abdd3..2c46e4ab244 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -310,6 +310,7 @@ Status ExecEnv::init_pipeline_task_scheduler() {
         executors_size = CpuInfo::num_cores();
     }
 
+    LOG_INFO("pipeline executors_size set ").tag("size", executors_size);
     // TODO pipeline workload group combie two blocked schedulers.
     auto t_queue = 
std::make_shared<pipeline::MultiCoreTaskQueue>(executors_size);
     _without_group_block_scheduler =
diff --git a/be/src/util/cpu_info.cpp b/be/src/util/cpu_info.cpp
index 60bdf1cd179..6ead3a61ead 100644
--- a/be/src/util/cpu_info.cpp
+++ b/be/src/util/cpu_info.cpp
@@ -108,7 +108,58 @@ static struct {
         {"ssse3", CpuInfo::SSSE3},   {"sse4_1", CpuInfo::SSE4_1}, {"sse4_2", 
CpuInfo::SSE4_2},
         {"popcnt", CpuInfo::POPCNT}, {"avx", CpuInfo::AVX},       {"avx2", 
CpuInfo::AVX2},
 };
-static const long num_flags = sizeof(flag_mappings) / sizeof(flag_mappings[0]);
+
+int cgroup_bandwidth_quota(int physical_cores) {
+    namespace fs = std::filesystem;
+    fs::path cpu_max = "/sys/fs/cgroup/cpu.max";
+    fs::path cfs_quota = "/sys/fs/cgroup/cpu/cpu.cfs_quota_us";
+    fs::path cfs_period = "/sys/fs/cgroup/cpu/cpu.cfs_period_us";
+
+    int64_t quota, period;
+    char byte_buffer[1000];
+    int64_t read_bytes;
+
+    if (fs::exists(cpu_max)) {
+        // cgroup v2
+        // https://www.kernel.org/doc/html/latest/admin-guide/cgroup-v2.html
+        std::ifstream file(cpu_max);
+        file.read(byte_buffer, 999);
+        read_bytes = file.gcount();
+        byte_buffer[read_bytes] = '\0';
+        if (sscanf(byte_buffer, "%" SCNd64 " %" SCNd64 "", &quota, &period) != 
2) {
+            return physical_cores;
+        }
+    } else if (fs::exists(cfs_quota) && fs::exists(cfs_period)) {
+        // cgroup v1
+        // 
https://www.kernel.org/doc/html/latest/scheduler/sched-bwc.html#management
+
+        // Read the quota, this indicates how many microseconds the CPU can be 
utilized by this cgroup per period
+        std::ifstream quota_file(cfs_quota);
+        quota_file.read(byte_buffer, 999);
+        read_bytes = quota_file.gcount();
+        byte_buffer[read_bytes] = '\0';
+        if (sscanf(byte_buffer, "%" SCNd64 "", &quota) != 1) {
+            return physical_cores;
+        }
+
+        // Read the time period, a cgroup can utilize the CPU up to quota 
microseconds every period
+        std::ifstream period_file(cfs_period);
+        period_file.read(byte_buffer, 999);
+        read_bytes = period_file.gcount();
+        byte_buffer[read_bytes] = '\0';
+        if (sscanf(byte_buffer, "%" SCNd64 "", &period) != 1) {
+            return physical_cores;
+        }
+    } else {
+        // No cgroup quota
+        return physical_cores;
+    }
+    if (quota > 0 && period > 0) {
+        return int64_t(ceil(double(quota) / double(period)));
+    } else {
+        return physical_cores;
+    }
+}
 
 // Helper function to parse for hardware flags.
 // values contains a list of space-separated flags.  check to see if the flags 
we
@@ -116,9 +167,9 @@ static const long num_flags = sizeof(flag_mappings) / 
sizeof(flag_mappings[0]);
 // Returns a bitmap of flags.
 int64_t ParseCPUFlags(const string& values) {
     int64_t flags = 0;
-    for (int i = 0; i < num_flags; ++i) {
-        if (contains(values, flag_mappings[i].name)) {
-            flags |= flag_mappings[i].flag;
+    for (auto& flag_mapping : flag_mappings) {
+        if (contains(values, flag_mapping.name)) {
+            flags |= flag_mapping.flag;
         }
     }
     return flags;
@@ -131,8 +182,9 @@ void CpuInfo::init() {
     string value;
 
     float max_mhz = 0;
-    int num_cores = 0;
+    int physical_num_cores = 0;
 
+    // maybe use std::thread::hardware_concurrency()?
     // Read from /proc/cpuinfo
     std::ifstream cpuinfo("/proc/cpuinfo");
     while (cpuinfo) {
@@ -143,23 +195,24 @@ void CpuInfo::init() {
             value = line.substr(colon + 1, string::npos);
             trim(name);
             trim(value);
-            if (name.compare("flags") == 0) {
+            if (name == "flags") {
                 hardware_flags_ |= ParseCPUFlags(value);
-            } else if (name.compare("cpu MHz") == 0) {
+            } else if (name == "cpu MHz") {
                 // Every core will report a different speed.  We'll take the 
max, assuming
                 // that when impala is running, the core will not be in a 
lower power state.
                 // TODO: is there a more robust way to do this, such as
                 // Window's QueryPerformanceFrequency()
                 float mhz = atof(value.c_str());
                 max_mhz = max(mhz, max_mhz);
-            } else if (name.compare("processor") == 0) {
-                ++num_cores;
-            } else if (name.compare("model name") == 0) {
+            } else if (name == "processor") {
+                ++physical_num_cores;
+            } else if (name == "model name") {
                 model_name_ = value;
             }
         }
     }
 
+    int num_cores = cgroup_bandwidth_quota(physical_num_cores);
     if (max_mhz != 0) {
         cycles_per_ms_ = max_mhz * 1000;
     } else {
@@ -172,7 +225,9 @@ void CpuInfo::init() {
     } else {
         num_cores_ = 1;
     }
-    if (config::num_cores > 0) num_cores_ = config::num_cores;
+    if (config::num_cores > 0) {
+        num_cores_ = config::num_cores;
+    }
 
 #ifdef __APPLE__
     size_t len = sizeof(max_num_cores_);
@@ -394,9 +449,9 @@ std::string CpuInfo::debug_string() {
            << "  " << L2 << std::endl
            << "  " << L3 << std::endl
            << "  Hardware Supports:" << std::endl;
-    for (int i = 0; i < num_flags; ++i) {
-        if (is_supported(flag_mappings[i].flag)) {
-            stream << "    " << flag_mappings[i].name << std::endl;
+    for (auto& flag_mapping : flag_mappings) {
+        if (is_supported(flag_mapping.flag)) {
+            stream << "    " << flag_mapping.name << std::endl;
         }
     }
     stream << "  Numa Nodes: " << max_num_numa_nodes_ << std::endl;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to