This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 36c8b59607a [feature](cpu cores) get the cores when running within a cgroup. (#32370) 36c8b59607a is described below commit 36c8b59607a0cb6d194bfec6c667fdd473d7638c Author: Mryange <59914473+mrya...@users.noreply.github.com> AuthorDate: Wed Mar 20 18:27:27 2024 +0800 [feature](cpu cores) get the cores when running within a cgroup. (#32370) get the cores when running within a cgroup --- be/src/runtime/exec_env_init.cpp | 1 + be/src/util/cpu_info.cpp | 83 +++++++++++++++++++++++++++++++++------- 2 files changed, 70 insertions(+), 14 deletions(-) diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 9a4c18abdd3..2c46e4ab244 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -310,6 +310,7 @@ Status ExecEnv::init_pipeline_task_scheduler() { executors_size = CpuInfo::num_cores(); } + LOG_INFO("pipeline executors_size set ").tag("size", executors_size); // TODO pipeline workload group combie two blocked schedulers. auto t_queue = std::make_shared<pipeline::MultiCoreTaskQueue>(executors_size); _without_group_block_scheduler = diff --git a/be/src/util/cpu_info.cpp b/be/src/util/cpu_info.cpp index 60bdf1cd179..6ead3a61ead 100644 --- a/be/src/util/cpu_info.cpp +++ b/be/src/util/cpu_info.cpp @@ -108,7 +108,58 @@ static struct { {"ssse3", CpuInfo::SSSE3}, {"sse4_1", CpuInfo::SSE4_1}, {"sse4_2", CpuInfo::SSE4_2}, {"popcnt", CpuInfo::POPCNT}, {"avx", CpuInfo::AVX}, {"avx2", CpuInfo::AVX2}, }; -static const long num_flags = sizeof(flag_mappings) / sizeof(flag_mappings[0]); + +int cgroup_bandwidth_quota(int physical_cores) { + namespace fs = std::filesystem; + fs::path cpu_max = "/sys/fs/cgroup/cpu.max"; + fs::path cfs_quota = "/sys/fs/cgroup/cpu/cpu.cfs_quota_us"; + fs::path cfs_period = "/sys/fs/cgroup/cpu/cpu.cfs_period_us"; + + int64_t quota, period; + char byte_buffer[1000]; + int64_t read_bytes; + + if (fs::exists(cpu_max)) { + // cgroup v2 + // https://www.kernel.org/doc/html/latest/admin-guide/cgroup-v2.html + std::ifstream file(cpu_max); + file.read(byte_buffer, 999); + read_bytes = file.gcount(); + byte_buffer[read_bytes] = '\0'; + if (sscanf(byte_buffer, "%" SCNd64 " %" SCNd64 "", "a, &period) != 2) { + return physical_cores; + } + } else if (fs::exists(cfs_quota) && fs::exists(cfs_period)) { + // cgroup v1 + // https://www.kernel.org/doc/html/latest/scheduler/sched-bwc.html#management + + // Read the quota, this indicates how many microseconds the CPU can be utilized by this cgroup per period + std::ifstream quota_file(cfs_quota); + quota_file.read(byte_buffer, 999); + read_bytes = quota_file.gcount(); + byte_buffer[read_bytes] = '\0'; + if (sscanf(byte_buffer, "%" SCNd64 "", "a) != 1) { + return physical_cores; + } + + // Read the time period, a cgroup can utilize the CPU up to quota microseconds every period + std::ifstream period_file(cfs_period); + period_file.read(byte_buffer, 999); + read_bytes = period_file.gcount(); + byte_buffer[read_bytes] = '\0'; + if (sscanf(byte_buffer, "%" SCNd64 "", &period) != 1) { + return physical_cores; + } + } else { + // No cgroup quota + return physical_cores; + } + if (quota > 0 && period > 0) { + return int64_t(ceil(double(quota) / double(period))); + } else { + return physical_cores; + } +} // Helper function to parse for hardware flags. // values contains a list of space-separated flags. check to see if the flags we @@ -116,9 +167,9 @@ static const long num_flags = sizeof(flag_mappings) / sizeof(flag_mappings[0]); // Returns a bitmap of flags. int64_t ParseCPUFlags(const string& values) { int64_t flags = 0; - for (int i = 0; i < num_flags; ++i) { - if (contains(values, flag_mappings[i].name)) { - flags |= flag_mappings[i].flag; + for (auto& flag_mapping : flag_mappings) { + if (contains(values, flag_mapping.name)) { + flags |= flag_mapping.flag; } } return flags; @@ -131,8 +182,9 @@ void CpuInfo::init() { string value; float max_mhz = 0; - int num_cores = 0; + int physical_num_cores = 0; + // maybe use std::thread::hardware_concurrency()? // Read from /proc/cpuinfo std::ifstream cpuinfo("/proc/cpuinfo"); while (cpuinfo) { @@ -143,23 +195,24 @@ void CpuInfo::init() { value = line.substr(colon + 1, string::npos); trim(name); trim(value); - if (name.compare("flags") == 0) { + if (name == "flags") { hardware_flags_ |= ParseCPUFlags(value); - } else if (name.compare("cpu MHz") == 0) { + } else if (name == "cpu MHz") { // Every core will report a different speed. We'll take the max, assuming // that when impala is running, the core will not be in a lower power state. // TODO: is there a more robust way to do this, such as // Window's QueryPerformanceFrequency() float mhz = atof(value.c_str()); max_mhz = max(mhz, max_mhz); - } else if (name.compare("processor") == 0) { - ++num_cores; - } else if (name.compare("model name") == 0) { + } else if (name == "processor") { + ++physical_num_cores; + } else if (name == "model name") { model_name_ = value; } } } + int num_cores = cgroup_bandwidth_quota(physical_num_cores); if (max_mhz != 0) { cycles_per_ms_ = max_mhz * 1000; } else { @@ -172,7 +225,9 @@ void CpuInfo::init() { } else { num_cores_ = 1; } - if (config::num_cores > 0) num_cores_ = config::num_cores; + if (config::num_cores > 0) { + num_cores_ = config::num_cores; + } #ifdef __APPLE__ size_t len = sizeof(max_num_cores_); @@ -394,9 +449,9 @@ std::string CpuInfo::debug_string() { << " " << L2 << std::endl << " " << L3 << std::endl << " Hardware Supports:" << std::endl; - for (int i = 0; i < num_flags; ++i) { - if (is_supported(flag_mappings[i].flag)) { - stream << " " << flag_mappings[i].name << std::endl; + for (auto& flag_mapping : flag_mappings) { + if (is_supported(flag_mapping.flag)) { + stream << " " << flag_mapping.name << std::endl; } } stream << " Numa Nodes: " << max_num_numa_nodes_ << std::endl; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org