This is an automated email from the ASF dual-hosted git repository. wangbo pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 4472648c077 [branch-2.1] pick workload group usage metrics (#46177) 4472648c077 is described below commit 4472648c07735a642d6df3bfffd00c85368d632d Author: wangbo <wan...@selectdb.com> AuthorDate: Tue Dec 31 10:09:48 2024 +0800 [branch-2.1] pick workload group usage metrics (#46177) pick #45284 #44870 --- be/src/common/config.cpp | 4 +- be/src/common/config.h | 4 +- be/src/common/daemon.cpp | 15 ++ be/src/common/daemon.h | 1 + be/src/pipeline/pipeline_task.cpp | 2 +- be/src/runtime/query_context.h | 4 +- be/src/runtime/thread_context.h | 72 +++++---- be/src/runtime/workload_group/workload_group.cpp | 36 +++-- be/src/runtime/workload_group/workload_group.h | 27 ++-- .../workload_group/workload_group_manager.cpp | 20 ++- .../workload_group/workload_group_manager.h | 2 + .../workload_group/workload_group_metrics.cpp | 127 ++++++++++++++++ .../workload_group/workload_group_metrics.h | 89 +++++++++++ be/src/runtime/workload_management/io_throttle.cpp | 11 -- be/src/runtime/workload_management/io_throttle.h | 10 +- be/src/util/cgroup_util.cpp | 168 +++++++++++++++++++++ be/src/util/cgroup_util.h | 22 +++ be/src/util/cpu_info.cpp | 55 +------ be/src/util/system_metrics.cpp | 35 +++++ be/src/util/system_metrics.h | 4 + be/src/vec/exec/scan/vscanner.cpp | 2 +- be/src/vec/sink/writer/async_result_writer.cpp | 2 +- be/test/util/cgroup_util_test.cpp | 90 +++++++++++ be/test/util/test_data/cgroup_cpu_data/cpuset1 | 1 + be/test/util/test_data/cgroup_cpu_data/cpuset2 | 1 + be/test/util/test_data/cgroup_cpu_data/cpuset3 | 1 + .../test_data/cgroup_cpu_data/test11/child/cpu.max | 1 + .../util/test_data/cgroup_cpu_data/test11/cpu.max | 1 + .../test_data/cgroup_cpu_data/test12/child/cpu.max | 1 + .../util/test_data/cgroup_cpu_data/test12/cpu.max | 1 + .../test_data/cgroup_cpu_data/test13/child/cpu.max | 1 + .../util/test_data/cgroup_cpu_data/test13/cpu.max | 1 + .../test_data/cgroup_cpu_data/test14/child/cpu.max | 1 + .../util/test_data/cgroup_cpu_data/test14/cpu.max | 1 + .../test21/child/cpuset.cpus.effective | 1 + .../cgroup_cpu_data/test21/cpuset.cpus.effective | 1 + .../test22/child/cpuset.cpus.effective | 0 .../cgroup_cpu_data/test22/cpuset.cpus.effective | 1 + .../cgroup_cpu_data/test31/child/cpu.cfs_period_us | 1 + .../cgroup_cpu_data/test31/child/cpu.cfs_quota_us | 1 + .../cgroup_cpu_data/test31/cpu.cfs_period_us | 1 + .../cgroup_cpu_data/test31/cpu.cfs_quota_us | 1 + .../cgroup_cpu_data/test32/child/cpu.cfs_period_us | 1 + .../cgroup_cpu_data/test32/child/cpu.cfs_quota_us | 1 + .../cgroup_cpu_data/test32/cpu.cfs_period_us | 1 + .../cgroup_cpu_data/test32/cpu.cfs_quota_us | 1 + .../cgroup_cpu_data/test33/child/cpu.cfs_period_us | 1 + .../cgroup_cpu_data/test33/child/cpu.cfs_quota_us | 1 + .../cgroup_cpu_data/test33/cpu.cfs_period_us | 1 + .../cgroup_cpu_data/test33/cpu.cfs_quota_us | 1 + .../test_data/cgroup_cpu_data/test41/cpuset.cpus | 1 + 51 files changed, 682 insertions(+), 147 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 783d1d65922..483e7753ec2 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1221,7 +1221,9 @@ DEFINE_Bool(exit_on_exception, "false"); DEFINE_Bool(enable_flush_file_cache_async, "true"); // cgroup -DEFINE_mString(doris_cgroup_cpu_path, ""); +DEFINE_String(doris_cgroup_cpu_path, ""); + +DEFINE_Int32(workload_group_metrics_interval_ms, "5000"); DEFINE_mBool(enable_workload_group_memory_gc, "true"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 303aecd2a23..3579ce54fce 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1288,7 +1288,9 @@ DECLARE_mInt32(tablet_schema_cache_capacity); DECLARE_mBool(exit_on_exception); // cgroup -DECLARE_mString(doris_cgroup_cpu_path); +DECLARE_String(doris_cgroup_cpu_path); + +DECLARE_Int32(workload_group_metrics_interval_ms); DECLARE_mBool(enable_workload_group_memory_gc); diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index fe842c1654f..11050c233c7 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -346,6 +346,8 @@ void Daemon::calculate_metrics_thread() { // update lst map DorisMetrics::instance()->system_metrics()->get_network_traffic( &lst_net_send_bytes, &lst_net_receive_bytes); + + DorisMetrics::instance()->system_metrics()->update_be_avail_cpu_num(); } DorisMetrics::instance()->all_rowsets_num->set_value( @@ -407,6 +409,13 @@ void Daemon::wg_weighted_memory_ratio_refresh_thread() { } } +void Daemon::calculate_workload_group_metrics_thread() { + while (!_stop_background_threads_latch.wait_for( + std::chrono::milliseconds(config::workload_group_metrics_interval_ms))) { + ExecEnv::GetInstance()->workload_group_mgr()->refresh_workload_group_metrics(); + } +} + void Daemon::start() { Status st; st = Thread::create( @@ -451,6 +460,12 @@ void Daemon::start() { [this]() { this->wg_weighted_memory_ratio_refresh_thread(); }, &_threads.emplace_back()); CHECK(st.ok()) << st; + + st = Thread::create( + "Daemon", "workload_group_metrics", + [this]() { this->calculate_workload_group_metrics_thread(); }, + &_threads.emplace_back()); + CHECK(st.ok()) << st; } void Daemon::stop() { diff --git a/be/src/common/daemon.h b/be/src/common/daemon.h index 8b956451125..9674b139f00 100644 --- a/be/src/common/daemon.h +++ b/be/src/common/daemon.h @@ -46,6 +46,7 @@ private: void cache_prune_stale_thread(); void report_runtime_query_statistics_thread(); void wg_weighted_memory_ratio_refresh_thread(); + void calculate_workload_group_metrics_thread(); CountDownLatch _stop_background_threads_latch; std::vector<scoped_refptr<Thread>> _threads; diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index eb2073e8f8c..242837da6b9 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -226,7 +226,7 @@ Status PipelineTask::execute(bool* eos) { if (cpu_qs) { cpu_qs->add_cpu_nanos(delta_cpu_time); } - query_context()->update_wg_cpu_adder(delta_cpu_time); + query_context()->update_cpu_time(delta_cpu_time); }}; // The status must be runnable *eos = false; diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 1e1f1bf57cf..dcff789043a 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -300,9 +300,9 @@ public: // only for file scan node std::map<int, TFileScanRangeParams> file_scan_range_params_map; - void update_wg_cpu_adder(int64_t delta_cpu_time) { + void update_cpu_time(int64_t delta_cpu_time) { if (_workload_group != nullptr) { - _workload_group->update_cpu_adder(delta_cpu_time); + _workload_group->update_cpu_time(delta_cpu_time); } } diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index f3a7476a962..3fbb194c87f 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -111,38 +111,40 @@ __VA_ARGS__; \ } while (0) -#define LIMIT_LOCAL_SCAN_IO(data_dir, bytes_read) \ - std::shared_ptr<IOThrottle> iot = nullptr; \ - auto* t_ctx = doris::thread_context(true); \ - if (t_ctx) { \ - iot = t_ctx->get_local_scan_io_throttle(data_dir); \ - } \ - if (iot) { \ - iot->acquire(-1); \ - } \ - Defer defer { \ - [&]() { \ - if (iot) { \ - iot->update_next_io_time(*bytes_read); \ - t_ctx->update_total_local_scan_io_adder(*bytes_read); \ - } \ - } \ +#define LIMIT_LOCAL_SCAN_IO(data_dir, bytes_read) \ + std::shared_ptr<IOThrottle> iot = nullptr; \ + auto* t_ctx = doris::thread_context(true); \ + if (t_ctx) { \ + iot = t_ctx->get_local_scan_io_throttle(data_dir); \ + } \ + if (iot) { \ + iot->acquire(-1); \ + } \ + Defer defer { \ + [&]() { \ + if (iot) { \ + iot->update_next_io_time(*bytes_read); \ + t_ctx->update_local_scan_io(data_dir, *bytes_read); \ + } \ + } \ } -#define LIMIT_REMOTE_SCAN_IO(bytes_read) \ - std::shared_ptr<IOThrottle> iot = nullptr; \ - if (auto* t_ctx = doris::thread_context(true)) { \ - iot = t_ctx->get_remote_scan_io_throttle(); \ - } \ - if (iot) { \ - iot->acquire(-1); \ - } \ - Defer defer { \ - [&]() { \ - if (iot) { \ - iot->update_next_io_time(*bytes_read); \ - } \ - } \ +#define LIMIT_REMOTE_SCAN_IO(bytes_read) \ + std::shared_ptr<IOThrottle> iot = nullptr; \ + auto* t_ctx = doris::thread_context(true); \ + if (t_ctx) { \ + iot = t_ctx->get_remote_scan_io_throttle(); \ + } \ + if (iot) { \ + iot->acquire(-1); \ + } \ + Defer defer { \ + [&]() { \ + if (iot) { \ + iot->update_next_io_time(*bytes_read); \ + t_ctx->update_remote_scan_io(*bytes_read); \ + } \ + } \ } namespace doris { @@ -274,9 +276,15 @@ public: return nullptr; } - void update_total_local_scan_io_adder(size_t bytes_read) { + void update_local_scan_io(std::string path, size_t bytes_read) { + if (std::shared_ptr<WorkloadGroup> wg_ptr = _wg_wptr.lock()) { + wg_ptr->update_local_scan_io(path, bytes_read); + } + } + + void update_remote_scan_io(size_t bytes_read) { if (std::shared_ptr<WorkloadGroup> wg_ptr = _wg_wptr.lock()) { - wg_ptr->update_total_local_scan_io_adder(bytes_read); + wg_ptr->update_remote_scan_io(bytes_read); } } diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index 39437048326..5f36298dcd4 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -35,6 +35,7 @@ #include "runtime/exec_env.h" #include "runtime/memory/global_memory_arbitrator.h" #include "runtime/memory/mem_tracker_limiter.h" +#include "runtime/workload_group/workload_group_metrics.h" #include "runtime/workload_management/io_throttle.h" #include "util/mem_info.h" #include "util/parse_util.h" @@ -71,18 +72,11 @@ WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info, bool need_create_ _need_create_query_thread_pool(need_create_query_thread_pool) { std::vector<DataDirInfo>& data_dir_list = io::BeConfDataDirReader::be_config_data_dir_list; for (const auto& data_dir : data_dir_list) { - _scan_io_throttle_map[data_dir.path] = - std::make_shared<IOThrottle>(_name, data_dir.bvar_name + "_read_bytes"); - } - _remote_scan_io_throttle = std::make_shared<IOThrottle>(_name, "remote_read_bytes"); - _mem_used_status = std::make_unique<bvar::Status<int64_t>>(_name, "memory_used", 0); - _cpu_usage_adder = std::make_unique<bvar::Adder<uint64_t>>(_name, "cpu_usage_adder"); - _cpu_usage_per_second = std::make_unique<bvar::PerSecond<bvar::Adder<uint64_t>>>( - _name, "cpu_usage", _cpu_usage_adder.get(), 10); - _total_local_scan_io_adder = - std::make_unique<bvar::Adder<size_t>>(_name, "total_local_read_bytes"); - _total_local_scan_io_per_second = std::make_unique<bvar::PerSecond<bvar::Adder<size_t>>>( - _name, "total_local_read_bytes_per_second", _total_local_scan_io_adder.get(), 1); + _scan_io_throttle_map[data_dir.path] = std::make_shared<IOThrottle>(data_dir.bvar_name); + } + _remote_scan_io_throttle = std::make_shared<IOThrottle>(); + + _wg_metrics = std::make_shared<WorkloadGroupMetrics>(this); } std::string WorkloadGroup::debug_string() const { @@ -162,7 +156,7 @@ int64_t WorkloadGroup::make_memory_tracker_snapshots( } } refresh_memory(used_memory); - _mem_used_status->set_value(used_memory); + _wg_metrics->update_memory_used_bytes(used_memory); return used_memory; } @@ -638,16 +632,20 @@ std::shared_ptr<IOThrottle> WorkloadGroup::get_remote_scan_io_throttle() { return _remote_scan_io_throttle; } -void WorkloadGroup::update_cpu_adder(int64_t delta_cpu_time) { - (*_cpu_usage_adder) << (uint64_t)delta_cpu_time; +void WorkloadGroup::update_cpu_time(int64_t delta_cpu_time) { + _wg_metrics->update_cpu_time_nanos(delta_cpu_time); +} + +void WorkloadGroup::update_local_scan_io(std::string path, size_t scan_bytes) { + _wg_metrics->update_local_scan_io_bytes(path, (uint64_t)scan_bytes); } -void WorkloadGroup::update_total_local_scan_io_adder(size_t scan_bytes) { - (*_total_local_scan_io_adder) << scan_bytes; +void WorkloadGroup::update_remote_scan_io(size_t scan_bytes) { + _wg_metrics->update_remote_scan_io_bytes((uint64_t)scan_bytes); } -int64_t WorkloadGroup::get_remote_scan_bytes_per_second() { - return _remote_scan_io_throttle->get_bvar_io_per_second(); +int64_t WorkloadGroup::get_mem_used() { + return _total_mem_used; } void WorkloadGroup::try_stop_schedulers() { diff --git a/be/src/runtime/workload_group/workload_group.h b/be/src/runtime/workload_group/workload_group.h index 9b27e72c2f9..7d7ceabb442 100644 --- a/be/src/runtime/workload_group/workload_group.h +++ b/be/src/runtime/workload_group/workload_group.h @@ -17,7 +17,6 @@ #pragma once -#include <bvar/bvar.h> #include <gen_cpp/BackendService_types.h> #include <stddef.h> #include <stdint.h> @@ -55,6 +54,8 @@ class TaskScheduler; class WorkloadGroup; struct WorkloadGroupInfo; struct TrackerLimiterGroup; +class WorkloadGroupMetrics; + class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> { public: explicit WorkloadGroup(const WorkloadGroupInfo& tg_info); @@ -201,16 +202,13 @@ public: void upsert_scan_io_throttle(WorkloadGroupInfo* tg_info); - void update_cpu_adder(int64_t delta_cpu_time); + void update_cpu_time(int64_t delta_cpu_time); - void update_total_local_scan_io_adder(size_t scan_bytes); + void update_local_scan_io(std::string path, size_t scan_bytes); - int64_t get_mem_used() { return _mem_used_status->get_value(); } - uint64_t get_cpu_usage() { return _cpu_usage_per_second->get_value(); } - int64_t get_local_scan_bytes_per_second() { - return _total_local_scan_io_per_second->get_value(); - } - int64_t get_remote_scan_bytes_per_second(); + void update_remote_scan_io(size_t scan_bytes); + + int64_t get_mem_used(); void create_cgroup_cpu_ctl(); @@ -222,6 +220,10 @@ public: return _memtable_flush_pool.get(); } + std::shared_ptr<WorkloadGroupMetrics> get_metrics() { return _wg_metrics; } + + friend class WorkloadGroupMetrics; + private: void create_cgroup_cpu_ctl_no_lock(); void upsert_cgroup_cpu_ctl_no_lock(WorkloadGroupInfo* wg_info); @@ -274,12 +276,7 @@ private: // for some background workload, it doesn't need to create query thread pool const bool _need_create_query_thread_pool; - // bvar metric - std::unique_ptr<bvar::Status<int64_t>> _mem_used_status; - std::unique_ptr<bvar::Adder<uint64_t>> _cpu_usage_adder; - std::unique_ptr<bvar::PerSecond<bvar::Adder<uint64_t>>> _cpu_usage_per_second; - std::unique_ptr<bvar::Adder<size_t>> _total_local_scan_io_adder; - std::unique_ptr<bvar::PerSecond<bvar::Adder<size_t>>> _total_local_scan_io_per_second; + std::shared_ptr<WorkloadGroupMetrics> _wg_metrics {nullptr}; }; using WorkloadGroupPtr = std::shared_ptr<WorkloadGroup>; diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index bb9757c94c3..1261f414f92 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -25,6 +25,7 @@ #include "pipeline/task_scheduler.h" #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/workload_group/workload_group.h" +#include "runtime/workload_group/workload_group_metrics.h" #include "util/mem_info.h" #include "util/threadpool.h" #include "util/time.h" @@ -279,16 +280,25 @@ void WorkloadGroupMgr::get_wg_resource_usage(vectorized::Block* block) { for (const auto& [id, wg] : _workload_groups) { SchemaScannerHelper::insert_int64_value(0, be_id, block); SchemaScannerHelper::insert_int64_value(1, wg->id(), block); - SchemaScannerHelper::insert_int64_value(2, wg->get_mem_used(), block); + SchemaScannerHelper::insert_int64_value(2, wg->get_metrics()->get_memory_used(), block); - double cpu_usage_p = - (double)wg->get_cpu_usage() / (double)total_cpu_time_ns_per_second * 100; + double cpu_usage_p = (double)wg->get_metrics()->get_cpu_time_nanos_per_second() / + (double)total_cpu_time_ns_per_second * 100; cpu_usage_p = std::round(cpu_usage_p * 100.0) / 100.0; SchemaScannerHelper::insert_double_value(3, cpu_usage_p, block); - SchemaScannerHelper::insert_int64_value(4, wg->get_local_scan_bytes_per_second(), block); - SchemaScannerHelper::insert_int64_value(5, wg->get_remote_scan_bytes_per_second(), block); + SchemaScannerHelper::insert_int64_value( + 4, wg->get_metrics()->get_local_scan_bytes_per_second(), block); + SchemaScannerHelper::insert_int64_value( + 5, wg->get_metrics()->get_remote_scan_bytes_per_second(), block); + } +} + +void WorkloadGroupMgr::refresh_workload_group_metrics() { + std::shared_lock<std::shared_mutex> r_lock(_group_mutex); + for (const auto& [id, wg] : _workload_groups) { + wg->get_metrics()->refresh_metrics(); } } diff --git a/be/src/runtime/workload_group/workload_group_manager.h b/be/src/runtime/workload_group/workload_group_manager.h index 52624f05fdf..0d281031972 100644 --- a/be/src/runtime/workload_group/workload_group_manager.h +++ b/be/src/runtime/workload_group/workload_group_manager.h @@ -74,6 +74,8 @@ public: return _workload_groups[INTERNAL_WORKLOAD_GROUP_ID]; } + void refresh_workload_group_metrics(); + private: std::shared_mutex _group_mutex; std::unordered_map<uint64_t, WorkloadGroupPtr> _workload_groups; diff --git a/be/src/runtime/workload_group/workload_group_metrics.cpp b/be/src/runtime/workload_group/workload_group_metrics.cpp new file mode 100644 index 00000000000..af54e10779e --- /dev/null +++ b/be/src/runtime/workload_group/workload_group_metrics.cpp @@ -0,0 +1,127 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/workload_group/workload_group_metrics.h" + +#include "common/config.h" +#include "runtime/workload_group/workload_group.h" +#include "runtime/workload_management/io_throttle.h" +#include "util/doris_metrics.h" +#include "util/metrics.h" + +namespace doris { + +WorkloadGroupMetrics::~WorkloadGroupMetrics() { + DorisMetrics::instance()->metric_registry()->deregister_entity(_entity); +} + +WorkloadGroupMetrics::WorkloadGroupMetrics(WorkloadGroup* wg) { + _entity = DorisMetrics::instance()->metric_registry()->register_entity( + "workload_group." + wg->name(), {{"name", wg->name()}}); + + _cpu_time_metric = std::make_unique<doris::MetricPrototype>( + doris::MetricType::COUNTER, doris::MetricUnit::SECONDS, "workload_group_cpu_time_sec"); + _cpu_time_counter = + (IntAtomicCounter*)(_entity->register_metric<IntAtomicCounter>(_cpu_time_metric.get())); + + _mem_used_bytes_metric = std::make_unique<doris::MetricPrototype>( + doris::MetricType::COUNTER, doris::MetricUnit::BYTES, "workload_group_mem_used_bytes"); + _mem_used_bytes_counter = (IntAtomicCounter*)(_entity->register_metric<IntAtomicCounter>( + _mem_used_bytes_metric.get())); + + _local_scan_bytes_metric = std::make_unique<doris::MetricPrototype>( + doris::MetricType::COUNTER, doris::MetricUnit::BYTES, + "workload_group_local_scan_bytes"); + _local_scan_bytes_counter = (IntAtomicCounter*)(_entity->register_metric<IntAtomicCounter>( + _local_scan_bytes_metric.get())); + + _remote_scan_bytes_metric = std::make_unique<doris::MetricPrototype>( + doris::MetricType::COUNTER, doris::MetricUnit::BYTES, + "workload_group_remote_scan_bytes"); + _remote_scan_bytes_counter = (IntAtomicCounter*)(_entity->register_metric<IntAtomicCounter>( + _remote_scan_bytes_metric.get())); + + for (const auto& [key, io_throttle] : wg->_scan_io_throttle_map) { + std::unique_ptr<doris::MetricPrototype> metric = std::make_unique<doris::MetricPrototype>( + doris::MetricType::COUNTER, doris::MetricUnit::BYTES, + "workload_group_local_scan_bytes_" + io_throttle->metric_name()); + _local_scan_bytes_counter_map[key] = + (IntAtomicCounter*)(_entity->register_metric<IntAtomicCounter>(metric.get())); + _local_scan_bytes_metric_map[key] = std::move(metric); + } +} + +void WorkloadGroupMetrics::update_cpu_time_nanos(uint64_t delta_cpu_time) { + _cpu_time_nanos += delta_cpu_time; +} + +void WorkloadGroupMetrics::update_memory_used_bytes(int64_t memory_used) { + _memory_used = memory_used; +} + +void WorkloadGroupMetrics::update_local_scan_io_bytes(std::string path, uint64_t delta_io_bytes) { + _local_scan_bytes_counter->increment(delta_io_bytes); + _local_scan_bytes_counter_map[path]->increment((int64_t)delta_io_bytes); +} + +void WorkloadGroupMetrics::update_remote_scan_io_bytes(uint64_t delta_io_bytes) { + _remote_scan_bytes_counter->increment(delta_io_bytes); +} + +void WorkloadGroupMetrics::refresh_metrics() { + int interval_second = config::workload_group_metrics_interval_ms / 1000; + + // cpu + uint64_t _current_cpu_time_nanos = _cpu_time_nanos.load(); + uint64_t _cpu_time_sec = _current_cpu_time_nanos / (1000L * 1000L * 1000L); + _cpu_time_counter->set_value(_cpu_time_sec); + _per_sec_cpu_time_nanos = (_current_cpu_time_nanos - _last_cpu_time_nanos) / interval_second; + _last_cpu_time_nanos = _current_cpu_time_nanos; + + // memory + _mem_used_bytes_counter->set_value(_memory_used); + + // local scan + int64_t current_local_scan_bytes = _local_scan_bytes_counter->value(); + _per_sec_local_scan_bytes = + (current_local_scan_bytes - _last_local_scan_bytes) / interval_second; + _last_local_scan_bytes = current_local_scan_bytes; + + // remote scan + int64_t current_remote_scan_bytes = _remote_scan_bytes_counter->value(); + _per_sec_remote_scan_bytes = + (current_remote_scan_bytes - _last_remote_scan_bytes) / interval_second; + _last_remote_scan_bytes = current_remote_scan_bytes; +} + +uint64_t WorkloadGroupMetrics::get_cpu_time_nanos_per_second() { + return _per_sec_cpu_time_nanos.load(); +} + +int64_t WorkloadGroupMetrics::get_local_scan_bytes_per_second() { + return _per_sec_local_scan_bytes.load(); +} + +int64_t WorkloadGroupMetrics::get_remote_scan_bytes_per_second() { + return _last_remote_scan_bytes.load(); +} + +int64_t WorkloadGroupMetrics::get_memory_used() { + return _mem_used_bytes_counter->value(); +} + +} // namespace doris \ No newline at end of file diff --git a/be/src/runtime/workload_group/workload_group_metrics.h b/be/src/runtime/workload_group/workload_group_metrics.h new file mode 100644 index 00000000000..e68715df249 --- /dev/null +++ b/be/src/runtime/workload_group/workload_group_metrics.h @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <atomic> +#include <map> +#include <memory> +#include <string> + +namespace doris { + +class WorkloadGroup; + +template <typename T> +class AtomicCounter; +using IntAtomicCounter = AtomicCounter<int64_t>; +class MetricEntity; +struct MetricPrototype; + +class WorkloadGroupMetrics { +public: + WorkloadGroupMetrics(WorkloadGroup* wg); + + ~WorkloadGroupMetrics(); + + void update_cpu_time_nanos(uint64_t delta_cpu_time); + + void update_memory_used_bytes(int64_t memory_used); + + void update_local_scan_io_bytes(std::string path, uint64_t delta_io_bytes); + + void update_remote_scan_io_bytes(uint64_t delta_io_bytes); + + void refresh_metrics(); + + uint64_t get_cpu_time_nanos_per_second(); + + int64_t get_local_scan_bytes_per_second(); + + int64_t get_remote_scan_bytes_per_second(); + + int64_t get_memory_used(); + +private: + std::unique_ptr<doris::MetricPrototype> _cpu_time_metric {nullptr}; + std::unique_ptr<doris::MetricPrototype> _mem_used_bytes_metric {nullptr}; + std::unique_ptr<doris::MetricPrototype> _local_scan_bytes_metric {nullptr}; + std::unique_ptr<doris::MetricPrototype> _remote_scan_bytes_metric {nullptr}; + // NOTE: _local_scan_bytes_metric is sum of all disk's IO + // _local_disk_io_metric is every disk's IO + std::map<std::string, std::unique_ptr<doris::MetricPrototype>> _local_scan_bytes_metric_map; + + IntAtomicCounter* _cpu_time_counter {nullptr}; // used for metric + IntAtomicCounter* _mem_used_bytes_counter {nullptr}; // used for metric + IntAtomicCounter* _local_scan_bytes_counter {nullptr}; // used for metric + IntAtomicCounter* _remote_scan_bytes_counter {nullptr}; // used for metric + std::map<std::string, IntAtomicCounter*> _local_scan_bytes_counter_map; // used for metric + + std::atomic<uint64_t> _cpu_time_nanos {0}; + std::atomic<uint64_t> _last_cpu_time_nanos {0}; + std::atomic<uint64_t> _per_sec_cpu_time_nanos {0}; // used for system table + + std::atomic<uint64_t> _per_sec_local_scan_bytes {0}; + std::atomic<uint64_t> _last_local_scan_bytes {0}; // used for system table + + std::atomic<uint64_t> _per_sec_remote_scan_bytes {0}; + std::atomic<uint64_t> _last_remote_scan_bytes {0}; // used for system table + + std::atomic<uint64_t> _memory_used {0}; + + std::shared_ptr<MetricEntity> _entity {nullptr}; +}; + +} // namespace doris \ No newline at end of file diff --git a/be/src/runtime/workload_management/io_throttle.cpp b/be/src/runtime/workload_management/io_throttle.cpp index dacfa29012f..118fc518072 100644 --- a/be/src/runtime/workload_management/io_throttle.cpp +++ b/be/src/runtime/workload_management/io_throttle.cpp @@ -22,12 +22,6 @@ namespace doris { -IOThrottle::IOThrottle(std::string prefix, std::string name) { - _io_adder = std::make_unique<bvar::Adder<size_t>>(prefix, name); - _io_adder_per_second = std::make_unique<bvar::PerSecond<bvar::Adder<size_t>>>( - prefix, name + "_per_second", _io_adder.get(), 1); -} - bool IOThrottle::acquire(int64_t block_timeout_ms) { if (_io_bytes_per_second_limit < 0) { return true; @@ -57,11 +51,6 @@ bool IOThrottle::try_acquire() { } void IOThrottle::update_next_io_time(int64_t io_bytes) { - Defer defer {[&]() { - if (io_bytes > 0) { - (*_io_adder) << io_bytes; - } - }}; if (_io_bytes_per_second_limit <= 0 || io_bytes <= 0) { return; } diff --git a/be/src/runtime/workload_management/io_throttle.h b/be/src/runtime/workload_management/io_throttle.h index 4212527020e..f688922fcd2 100644 --- a/be/src/runtime/workload_management/io_throttle.h +++ b/be/src/runtime/workload_management/io_throttle.h @@ -28,7 +28,9 @@ namespace doris { class IOThrottle { public: - IOThrottle(std::string prefix, std::string name); + IOThrottle() = default; + + IOThrottle(std::string metric_name) : _metric_name(metric_name) {} ~IOThrottle() = default; @@ -41,7 +43,7 @@ public: void set_io_bytes_per_second(int64_t read_bytes_per_second); - size_t get_bvar_io_per_second() { return _io_adder_per_second->get_value(); } + std::string metric_name() { return _metric_name; } private: std::mutex _mutex; @@ -49,8 +51,6 @@ private: int64_t _next_io_time_micros {0}; std::atomic<int64_t> _io_bytes_per_second_limit {-1}; - // bvar monitor - std::unique_ptr<bvar::Adder<size_t>> _io_adder; - std::unique_ptr<bvar::PerSecond<bvar::Adder<size_t>>> _io_adder_per_second; + std::string _metric_name; }; }; // namespace doris \ No newline at end of file diff --git a/be/src/util/cgroup_util.cpp b/be/src/util/cgroup_util.cpp index 8f64fe699c6..dc04d5e3ff7 100644 --- a/be/src/util/cgroup_util.cpp +++ b/be/src/util/cgroup_util.cpp @@ -18,6 +18,7 @@ #include "util/cgroup_util.h" #include <algorithm> +#include <boost/algorithm/string.hpp> #include <fstream> #include <utility> #include <vector> @@ -218,6 +219,10 @@ std::optional<std::string> CGroupUtil::get_cgroupsv2_path(const std::string& sub Status CGroupUtil::read_int_line_from_cgroup_file(const std::filesystem::path& file_path, int64_t* val) { std::ifstream file_stream(file_path, std::ios::in); + if (!file_stream.is_open()) { + return Status::CgroupError("Error open {}", file_path.string()); + } + string line; getline(file_stream, line); if (file_stream.fail() || file_stream.bad()) { @@ -264,4 +269,167 @@ void CGroupUtil::read_int_metric_from_cgroup_file( } } +Status CGroupUtil::read_string_line_from_cgroup_file(const std::filesystem::path& file_path, + std::string* line_ptr) { + std::ifstream file_stream(file_path, std::ios::in); + if (!file_stream.is_open()) { + return Status::CgroupError("Error open {}", file_path.string()); + } + string line; + getline(file_stream, line); + if (file_stream.fail() || file_stream.bad()) { + return Status::CgroupError("Error reading {}: {}", file_path.string(), get_str_err_msg()); + } + *line_ptr = line; + return Status::OK(); +} + +Status CGroupUtil::parse_cpuset_line(std::string cpuset_line, int* cpu_count_ptr) { + if (cpuset_line.empty()) { + return Status::CgroupError("cpuset line is empty"); + } + std::vector<string> ranges; + boost::split(ranges, cpuset_line, boost::is_any_of(",")); + int cpu_count = 0; + + for (const std::string& range : ranges) { + std::vector<std::string> cpu_values; + boost::split(cpu_values, range, boost::is_any_of("-")); + + if (cpu_values.size() == 2) { + int start = std::stoi(cpu_values[0]); + int end = std::stoi(cpu_values[1]); + cpu_count += (end - start) + 1; + } else { + cpu_count++; + } + } + *cpu_count_ptr = cpu_count; + return Status::OK(); +} + +int CGroupUtil::get_cgroup_limited_cpu_number(int physical_cores) { + if (physical_cores <= 0) { + return physical_cores; + } + int ret = physical_cores; +#if defined(OS_LINUX) + // For cgroup v2 + // Child cgroup's cpu.max may bigger than parent group's cpu.max, + // so it should look up from current cgroup to top group. + // For cpuset, child cgroup's cpuset.cpus could not bigger thant parent's cpuset.cpus. + if (CGroupUtil::cgroupsv2_enable()) { + std::string cgroupv2_process_path = CGroupUtil::cgroupv2_of_process(); + if (cgroupv2_process_path.empty()) { + return ret; + } + std::filesystem::path current_cgroup_path = (default_cgroups_mount / cgroupv2_process_path); + ret = get_cgroup_v2_cpu_quota_number(current_cgroup_path, default_cgroups_mount, ret); + + current_cgroup_path = (default_cgroups_mount / cgroupv2_process_path); + ret = get_cgroup_v2_cpuset_number(current_cgroup_path, default_cgroups_mount, ret); + } else if (CGroupUtil::cgroupsv1_enable()) { + // cpu quota, should find first not empty config from current path to top. + // because if a process attach to current cgroup, its cpu quota may not be set. + std::string cpu_quota_path = ""; + Status cpu_quota_ret = CGroupUtil::find_abs_cgroupv1_path("cpu", &cpu_quota_path); + if (cpu_quota_ret.ok() && !cpu_quota_path.empty()) { + std::filesystem::path current_cgroup_path = cpu_quota_path; + ret = get_cgroup_v1_cpu_quota_number(current_cgroup_path, default_cgroups_mount, ret); + } + + //cpuset + // just lookup current process cgroup path is enough + // because if a process attach to current cgroup, its cpuset.cpus must be set. + std::string cpuset_path = ""; + Status cpuset_ret = CGroupUtil::find_abs_cgroupv1_path("cpuset", &cpuset_path); + if (cpuset_ret.ok() && !cpuset_path.empty()) { + std::filesystem::path current_path = cpuset_path; + ret = get_cgroup_v1_cpuset_number(current_path, ret); + } + } +#endif + return ret; +} + +int CGroupUtil::get_cgroup_v2_cpu_quota_number(std::filesystem::path& current_path, + const std::filesystem::path& default_cg_mout_path, + int cpu_num) { + int ret = cpu_num; + while (current_path != default_cg_mout_path.parent_path()) { + std::ifstream cpu_max_file(current_path / "cpu.max"); + if (cpu_max_file.is_open()) { + std::string cpu_limit_str; + double cpu_period; + cpu_max_file >> cpu_limit_str >> cpu_period; + if (cpu_limit_str != "max" && cpu_period != 0) { + double cpu_limit = std::stod(cpu_limit_str); + ret = std::min(static_cast<int>(std::ceil(cpu_limit / cpu_period)), ret); + } + } + current_path = current_path.parent_path(); + } + return ret; +} + +int CGroupUtil::get_cgroup_v2_cpuset_number(std::filesystem::path& current_path, + const std::filesystem::path& default_cg_mout_path, + int cpu_num) { + int ret = cpu_num; + while (current_path != default_cg_mout_path.parent_path()) { + std::ifstream cpuset_cpus_file(current_path / "cpuset.cpus.effective"); + current_path = current_path.parent_path(); + if (cpuset_cpus_file.is_open()) { + std::string cpuset_line; + cpuset_cpus_file >> cpuset_line; + if (cpuset_line.empty()) { + continue; + } + int cpus_count = 0; + static_cast<void>(CGroupUtil::parse_cpuset_line(cpuset_line, &cpus_count)); + ret = std::min(cpus_count, ret); + break; + } + } + return ret; +} + +int CGroupUtil::get_cgroup_v1_cpu_quota_number(std::filesystem::path& current_path, + const std::filesystem::path& default_cg_mout_path, + int cpu_num) { + int ret = cpu_num; + while (current_path != default_cg_mout_path.parent_path()) { + std::ifstream cpu_quota_file(current_path / "cpu.cfs_quota_us"); + std::ifstream cpu_period_file(current_path / "cpu.cfs_period_us"); + if (cpu_quota_file.is_open() && cpu_period_file.is_open()) { + double cpu_quota_value; + double cpu_period_value; + cpu_quota_file >> cpu_quota_value; + cpu_period_file >> cpu_period_value; + if (cpu_quota_value > 0 && cpu_period_value > 0) { + ret = std::min(ret, + static_cast<int>(std::ceil(cpu_quota_value / cpu_period_value))); + break; + } + } + current_path = current_path.parent_path(); + } + return ret; +} + +int CGroupUtil::get_cgroup_v1_cpuset_number(std::filesystem::path& current_path, int cpu_num) { + int ret = cpu_num; + std::string cpuset_line = ""; + Status cpuset_ret = CGroupUtil::read_string_line_from_cgroup_file( + (current_path / "cpuset.cpus"), &cpuset_line); + if (cpuset_ret.ok() && !cpuset_line.empty()) { + int cpuset_count = 0; + static_cast<void>(CGroupUtil::parse_cpuset_line(cpuset_line, &cpuset_count)); + if (cpuset_count > 0) { + ret = std::min(ret, cpuset_count); + } + } + return ret; +} + } // namespace doris diff --git a/be/src/util/cgroup_util.h b/be/src/util/cgroup_util.h index bc1417453f4..54fc9494599 100644 --- a/be/src/util/cgroup_util.h +++ b/be/src/util/cgroup_util.h @@ -104,5 +104,27 @@ public: static void read_int_metric_from_cgroup_file( const std::filesystem::path& file_path, std::unordered_map<std::string, int64_t>& metrics_map); + + static Status read_string_line_from_cgroup_file(const std::filesystem::path& file_path, + std::string* line_ptr); + + // cpuset_line: 0-4,6,8-10 + static Status parse_cpuset_line(std::string cpuset_line, int* cpu_count_ptr); + + static int get_cgroup_limited_cpu_number(int physical_cores); + + static int get_cgroup_v2_cpu_quota_number(std::filesystem::path& current_path, + const std::filesystem::path& default_cg_mout_path, + int cpu_num); + + static int get_cgroup_v2_cpuset_number(std::filesystem::path& current_path, + const std::filesystem::path& default_cg_mout_path, + int cpu_num); + + static int get_cgroup_v1_cpu_quota_number(std::filesystem::path& current_path, + const std::filesystem::path& default_cg_mout_path, + int cpu_num); + + static int get_cgroup_v1_cpuset_number(std::filesystem::path& current_path, int cpu_num); }; } // namespace doris diff --git a/be/src/util/cpu_info.cpp b/be/src/util/cpu_info.cpp index 116dacb8da7..b49985cdc06 100644 --- a/be/src/util/cpu_info.cpp +++ b/be/src/util/cpu_info.cpp @@ -59,6 +59,7 @@ #include "gflags/gflags.h" #include "gutil/stringprintf.h" #include "gutil/strings/substitute.h" +#include "util/cgroup_util.h" #include "util/pretty_printer.h" using boost::algorithm::contains; @@ -109,58 +110,6 @@ static struct { {"popcnt", CpuInfo::POPCNT}, {"avx", CpuInfo::AVX}, {"avx2", CpuInfo::AVX2}, }; -int cgroup_bandwidth_quota(int physical_cores) { - namespace fs = std::filesystem; - fs::path cpu_max = "/sys/fs/cgroup/cpu.max"; - fs::path cfs_quota = "/sys/fs/cgroup/cpu/cpu.cfs_quota_us"; - fs::path cfs_period = "/sys/fs/cgroup/cpu/cpu.cfs_period_us"; - - int64_t quota, period; - char byte_buffer[1000]; - int64_t read_bytes; - - if (fs::exists(cpu_max)) { - // cgroup v2 - // https://www.kernel.org/doc/html/latest/admin-guide/cgroup-v2.html - std::ifstream file(cpu_max); - file.read(byte_buffer, 999); - read_bytes = file.gcount(); - byte_buffer[read_bytes] = '\0'; - if (sscanf(byte_buffer, "%" SCNd64 " %" SCNd64 "", "a, &period) != 2) { - return physical_cores; - } - } else if (fs::exists(cfs_quota) && fs::exists(cfs_period)) { - // cgroup v1 - // https://www.kernel.org/doc/html/latest/scheduler/sched-bwc.html#management - - // Read the quota, this indicates how many microseconds the CPU can be utilized by this cgroup per period - std::ifstream quota_file(cfs_quota); - quota_file.read(byte_buffer, 999); - read_bytes = quota_file.gcount(); - byte_buffer[read_bytes] = '\0'; - if (sscanf(byte_buffer, "%" SCNd64 "", "a) != 1) { - return physical_cores; - } - - // Read the time period, a cgroup can utilize the CPU up to quota microseconds every period - std::ifstream period_file(cfs_period); - period_file.read(byte_buffer, 999); - read_bytes = period_file.gcount(); - byte_buffer[read_bytes] = '\0'; - if (sscanf(byte_buffer, "%" SCNd64 "", &period) != 1) { - return physical_cores; - } - } else { - // No cgroup quota - return physical_cores; - } - if (quota > 0 && period > 0) { - return int64_t(ceil(double(quota) / double(period))); - } else { - return physical_cores; - } -} - // Helper function to parse for hardware flags. // values contains a list of space-separated flags. check to see if the flags we // care about are present. @@ -212,7 +161,7 @@ void CpuInfo::init() { } } - int num_cores = cgroup_bandwidth_quota(physical_num_cores); + int num_cores = CGroupUtil::get_cgroup_limited_cpu_number(physical_num_cores); if (max_mhz != 0) { cycles_per_ms_ = int64_t(max_mhz) * 1000; } else { diff --git a/be/src/util/system_metrics.cpp b/be/src/util/system_metrics.cpp index 8203be6d6a7..cf50e5be344 100644 --- a/be/src/util/system_metrics.cpp +++ b/be/src/util/system_metrics.cpp @@ -33,11 +33,27 @@ #include "gutil/strings/split.h" // for string split #include "gutil/strtoint.h" // for atoi64 +#include "runtime/workload_group/workload_group_metrics.h" +#include "util/cgroup_util.h" #include "util/mem_info.h" #include "util/perf_counters.h" namespace doris { +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(avail_cpu_num, MetricUnit::NOUNIT); + +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(host_cpu_num, MetricUnit::NOUNIT); +struct CpuNumberMetrics { + CpuNumberMetrics(MetricEntity* ent) : entity(ent) { + INT_COUNTER_METRIC_REGISTER(entity, host_cpu_num); + INT_COUNTER_METRIC_REGISTER(entity, avail_cpu_num); + } + + IntAtomicCounter* host_cpu_num {nullptr}; + IntAtomicCounter* avail_cpu_num {nullptr}; + MetricEntity* entity = nullptr; +}; + #define DEFINE_CPU_COUNTER_METRIC(metric) \ DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(cpu_##metric, MetricUnit::PERCENT, "", cpu, \ Labels({{"mode", #metric}})); @@ -386,11 +402,22 @@ void SystemMetrics::update() { void SystemMetrics::_install_cpu_metrics() { get_cpu_name(); + + int cpu_num = 0; for (auto cpu_name : _cpu_names) { + // NOTE: cpu_name comes from /proc/stat which named 'cpu' is not a real cpu name, it should be skipped. + if (cpu_name != "cpu") { + cpu_num++; + } auto cpu_entity = _registry->register_entity(cpu_name, {{"device", cpu_name}}); CpuMetrics* metrics = new CpuMetrics(cpu_entity.get()); _cpu_metrics.emplace(cpu_name, metrics); } + + auto cpu_num_entity = _registry->register_entity("doris_be_host_cpu_num"); + _cpu_num_metrics = std::make_unique<CpuNumberMetrics>(cpu_num_entity.get()); + + _cpu_num_metrics->host_cpu_num->set_value(cpu_num); } #ifdef BE_TEST @@ -983,6 +1010,14 @@ void SystemMetrics::_update_proc_metrics() { fclose(fp); } +void SystemMetrics::update_be_avail_cpu_num() { + int64_t physical_cpu_num = _cpu_num_metrics->host_cpu_num->value(); + if (physical_cpu_num > 0) { + physical_cpu_num = CGroupUtil::get_cgroup_limited_cpu_number(physical_cpu_num); + _cpu_num_metrics->avail_cpu_num->set_value(physical_cpu_num); + } +} + void SystemMetrics::get_metrics_from_proc_vmstat() { #ifdef BE_TEST FILE* fp = fopen(k_ut_vmstat_path, "r"); diff --git a/be/src/util/system_metrics.h b/be/src/util/system_metrics.h index c72ba369301..2c5446b81f4 100644 --- a/be/src/util/system_metrics.h +++ b/be/src/util/system_metrics.h @@ -31,6 +31,7 @@ namespace doris { struct CpuMetrics; +struct CpuNumberMetrics; struct MemoryMetrics; struct DiskMetrics; struct NetworkMetrics; @@ -65,6 +66,8 @@ public: void update_max_network_receive_bytes_rate(int64_t max_receive_bytes_rate); void update_allocator_metrics(); + void update_be_avail_cpu_num(); + private: void _install_cpu_metrics(); // On Intel(R) Xeon(R) CPU E5-2450 0 @ 2.10GHz; @@ -99,6 +102,7 @@ private: static const char* _s_hook_name; std::map<std::string, CpuMetrics*> _cpu_metrics; + std::unique_ptr<CpuNumberMetrics> _cpu_num_metrics; std::unique_ptr<MemoryMetrics> _memory_metrics; std::map<std::string, DiskMetrics*> _disk_metrics; std::map<std::string, NetworkMetrics*> _network_metrics; diff --git a/be/src/vec/exec/scan/vscanner.cpp b/be/src/vec/exec/scan/vscanner.cpp index 58511e890d6..09b5d6dd1a6 100644 --- a/be/src/vec/exec/scan/vscanner.cpp +++ b/be/src/vec/exec/scan/vscanner.cpp @@ -291,7 +291,7 @@ void VScanner::update_scan_cpu_timer() { _scan_cpu_timer += cpu_time; _query_statistics->add_cpu_nanos(cpu_time); if (_state && _state->get_query_ctx()) { - _state->get_query_ctx()->update_wg_cpu_adder(cpu_time); + _state->get_query_ctx()->update_cpu_time(cpu_time); } } diff --git a/be/src/vec/sink/writer/async_result_writer.cpp b/be/src/vec/sink/writer/async_result_writer.cpp index 2d49a0e8978..aa85a83756f 100644 --- a/be/src/vec/sink/writer/async_result_writer.cpp +++ b/be/src/vec/sink/writer/async_result_writer.cpp @@ -131,7 +131,7 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi cpu_time_stop_watch.start(); Defer defer {[&]() { if (state && state->get_query_ctx()) { - state->get_query_ctx()->update_wg_cpu_adder(cpu_time_stop_watch.elapsed_time()); + state->get_query_ctx()->update_cpu_time(cpu_time_stop_watch.elapsed_time()); } }}; if (!_eos && _data_queue.empty() && _writer_status.ok()) { diff --git a/be/test/util/cgroup_util_test.cpp b/be/test/util/cgroup_util_test.cpp index 92102120327..4cc5c601b28 100644 --- a/be/test/util/cgroup_util_test.cpp +++ b/be/test/util/cgroup_util_test.cpp @@ -87,4 +87,94 @@ TEST_F(CGroupUtilTest, memlimit) { } } +TEST_F(CGroupUtilTest, readcpu) { + std::string dir_path = GetCurrentRunningDir(); + + std::string cpuset_1_path(dir_path + "/util/test_data/cgroup_cpu_data/cpuset1"); + std::string cpuset_1_str = ""; + Status ret1 = CGroupUtil::read_string_line_from_cgroup_file(cpuset_1_path, &cpuset_1_str); + EXPECT_TRUE(ret1.ok()); + int cpu_count1 = 0; + static_cast<void>(CGroupUtil::parse_cpuset_line(cpuset_1_str, &cpu_count1)); + EXPECT_TRUE(cpu_count1 == 3); + + std::string cpuset_2_path(dir_path + "/util/test_data/cgroup_cpu_data/cpuset2"); + std::string cpuset_2_str = ""; + Status ret2 = CGroupUtil::read_string_line_from_cgroup_file(cpuset_2_path, &cpuset_2_str); + EXPECT_TRUE(ret2.ok()); + int cpu_count2 = 0; + static_cast<void>(CGroupUtil::parse_cpuset_line(cpuset_2_str, &cpu_count2)); + EXPECT_TRUE(cpu_count2 == 11); + + std::string cpuset_3_path(dir_path + "/util/test_data/cgroup_cpu_data/cpuset3"); + std::string cpuset_3_str = ""; + Status ret3 = CGroupUtil::read_string_line_from_cgroup_file(cpuset_3_path, &cpuset_3_str); + EXPECT_TRUE(ret3.ok()); + int cpu_count3 = 0; + static_cast<void>(CGroupUtil::parse_cpuset_line(cpuset_3_str, &cpu_count3)); + EXPECT_TRUE(cpu_count3 == 10); + + int ret = CGroupUtil::get_cgroup_limited_cpu_number(16); + EXPECT_TRUE(ret > 0); + + // 1 read cgroup v2 quota + // 1.1 read default value + std::filesystem::path path11 = dir_path + "/util/test_data/cgroup_cpu_data/test11/child"; + std::filesystem::path default_path_11 = dir_path + "/util/test_data/cgroup_cpu_data/test11"; + int ret11 = CGroupUtil::get_cgroup_v2_cpu_quota_number(path11, default_path_11, 96); + EXPECT_TRUE(ret11 == 96); + + // 1.2 read from child to parent + std::filesystem::path path12 = dir_path + "/util/test_data/cgroup_cpu_data/test12/child"; + std::filesystem::path default_path_12 = dir_path + "/util/test_data/cgroup_cpu_data/test12"; + int ret12 = CGroupUtil::get_cgroup_v2_cpu_quota_number(path12, default_path_12, 96); + EXPECT_TRUE(ret12 == 2); + + // 1.3 read parent + std::filesystem::path path13 = dir_path + "/util/test_data/cgroup_cpu_data/test13/child"; + std::filesystem::path default_path_13 = dir_path + "/util/test_data/cgroup_cpu_data/test13"; + int ret13 = CGroupUtil::get_cgroup_v2_cpu_quota_number(path13, default_path_13, 96); + EXPECT_TRUE(ret13 == 2); + + // 1.4 read child + std::filesystem::path path14 = dir_path + "/util/test_data/cgroup_cpu_data/test14/child"; + std::filesystem::path default_path_14 = dir_path + "/util/test_data/cgroup_cpu_data/test14"; + int ret14 = CGroupUtil::get_cgroup_v2_cpu_quota_number(path14, default_path_14, 96); + EXPECT_TRUE(ret14 == 3); + + // 2 read cgroup v2 cpuset + // 2.1 read child + std::filesystem::path path21 = dir_path + "/util/test_data/cgroup_cpu_data/test21/child"; + std::filesystem::path default_path_21 = dir_path + "/util/test_data/cgroup_cpu_data/test21"; + int ret21 = CGroupUtil::get_cgroup_v2_cpuset_number(path21, default_path_21, 96); + EXPECT_TRUE(ret21 == 2); + // 2.2 read parent + std::filesystem::path path22 = dir_path + "/util/test_data/cgroup_cpu_data/test22/child"; + std::filesystem::path default_path_22 = dir_path + "/util/test_data/cgroup_cpu_data/test22"; + int ret22 = CGroupUtil::get_cgroup_v2_cpuset_number(path22, default_path_22, 96); + EXPECT_TRUE(ret22 == 7); + + // 3 read cgroup v1 quota + // 3.1 read child + std::filesystem::path path31 = dir_path + "/util/test_data/cgroup_cpu_data/test31/child"; + std::filesystem::path default_path_31 = dir_path + "/util/test_data/cgroup_cpu_data/test31"; + int ret31 = CGroupUtil::get_cgroup_v1_cpu_quota_number(path31, default_path_31, 96); + EXPECT_TRUE(ret31 == 5); + // 3.2 read parent + std::filesystem::path path32 = dir_path + "/util/test_data/cgroup_cpu_data/test32/child"; + std::filesystem::path default_path_32 = dir_path + "/util/test_data/cgroup_cpu_data/test32"; + int ret32 = CGroupUtil::get_cgroup_v1_cpu_quota_number(path32, default_path_32, 96); + EXPECT_TRUE(ret32 == 6); + // 3.3 read default + std::filesystem::path path33 = dir_path + "/util/test_data/cgroup_cpu_data/test33/child"; + std::filesystem::path default_path_33 = dir_path + "/util/test_data/cgroup_cpu_data/test33"; + int ret33 = CGroupUtil::get_cgroup_v1_cpu_quota_number(path33, default_path_33, 96); + EXPECT_TRUE(ret33 == 96); + + // 4 read cgroup v1 cpuset + std::filesystem::path path41 = dir_path + "/util/test_data/cgroup_cpu_data/test41"; + int ret41 = CGroupUtil::get_cgroup_v1_cpuset_number(path41, 96); + EXPECT_TRUE(ret41 == 3); +} + } // namespace doris diff --git a/be/test/util/test_data/cgroup_cpu_data/cpuset1 b/be/test/util/test_data/cgroup_cpu_data/cpuset1 new file mode 100644 index 00000000000..f2fdae292d6 --- /dev/null +++ b/be/test/util/test_data/cgroup_cpu_data/cpuset1 @@ -0,0 +1 @@ +1,4,6 \ No newline at end of file diff --git a/be/test/util/test_data/cgroup_cpu_data/cpuset2 b/be/test/util/test_data/cgroup_cpu_data/cpuset2 new file mode 100644 index 00000000000..9528de88c0a --- /dev/null +++ b/be/test/util/test_data/cgroup_cpu_data/cpuset2 @@ -0,0 +1 @@ +1-5,7-10,20-21 \ No newline at end of file diff --git a/be/test/util/test_data/cgroup_cpu_data/cpuset3 b/be/test/util/test_data/cgroup_cpu_data/cpuset3 new file mode 100644 index 00000000000..02c93d74ecd --- /dev/null +++ b/be/test/util/test_data/cgroup_cpu_data/cpuset3 @@ -0,0 +1 @@ +4,11-15,20,31-33 \ No newline at end of file diff --git a/be/test/util/test_data/cgroup_cpu_data/test11/child/cpu.max b/be/test/util/test_data/cgroup_cpu_data/test11/child/cpu.max new file mode 100644 index 00000000000..b22d43b0084 --- /dev/null +++ b/be/test/util/test_data/cgroup_cpu_data/test11/child/cpu.max @@ -0,0 +1 @@ +max 100000 0 \ No newline at end of file diff --git a/be/test/util/test_data/cgroup_cpu_data/test11/cpu.max b/be/test/util/test_data/cgroup_cpu_data/test11/cpu.max new file mode 100644 index 00000000000..b22d43b0084 --- /dev/null +++ b/be/test/util/test_data/cgroup_cpu_data/test11/cpu.max @@ -0,0 +1 @@ +max 100000 0 \ No newline at end of file diff --git a/be/test/util/test_data/cgroup_cpu_data/test12/child/cpu.max b/be/test/util/test_data/cgroup_cpu_data/test12/child/cpu.max new file mode 100644 index 00000000000..434d117ae1c --- /dev/null +++ b/be/test/util/test_data/cgroup_cpu_data/test12/child/cpu.max @@ -0,0 +1 @@ +300000 100000 \ No newline at end of file diff --git a/be/test/util/test_data/cgroup_cpu_data/test12/cpu.max b/be/test/util/test_data/cgroup_cpu_data/test12/cpu.max new file mode 100644 index 00000000000..20b625d5126 --- /dev/null +++ b/be/test/util/test_data/cgroup_cpu_data/test12/cpu.max @@ -0,0 +1 @@ +200000 100000 \ No newline at end of file diff --git a/be/test/util/test_data/cgroup_cpu_data/test13/child/cpu.max b/be/test/util/test_data/cgroup_cpu_data/test13/child/cpu.max new file mode 100644 index 00000000000..b22d43b0084 --- /dev/null +++ b/be/test/util/test_data/cgroup_cpu_data/test13/child/cpu.max @@ -0,0 +1 @@ +max 100000 0 \ No newline at end of file diff --git a/be/test/util/test_data/cgroup_cpu_data/test13/cpu.max b/be/test/util/test_data/cgroup_cpu_data/test13/cpu.max new file mode 100644 index 00000000000..20b625d5126 --- /dev/null +++ b/be/test/util/test_data/cgroup_cpu_data/test13/cpu.max @@ -0,0 +1 @@ +200000 100000 \ No newline at end of file diff --git a/be/test/util/test_data/cgroup_cpu_data/test14/child/cpu.max b/be/test/util/test_data/cgroup_cpu_data/test14/child/cpu.max new file mode 100644 index 00000000000..434d117ae1c --- /dev/null +++ b/be/test/util/test_data/cgroup_cpu_data/test14/child/cpu.max @@ -0,0 +1 @@ +300000 100000 \ No newline at end of file diff --git a/be/test/util/test_data/cgroup_cpu_data/test14/cpu.max b/be/test/util/test_data/cgroup_cpu_data/test14/cpu.max new file mode 100644 index 00000000000..4199eb01a27 --- /dev/null +++ b/be/test/util/test_data/cgroup_cpu_data/test14/cpu.max @@ -0,0 +1 @@ +400000 100000 \ No newline at end of file diff --git a/be/test/util/test_data/cgroup_cpu_data/test21/child/cpuset.cpus.effective b/be/test/util/test_data/cgroup_cpu_data/test21/child/cpuset.cpus.effective new file mode 100644 index 00000000000..fb00853f18d --- /dev/null +++ b/be/test/util/test_data/cgroup_cpu_data/test21/child/cpuset.cpus.effective @@ -0,0 +1 @@ +0-1 \ No newline at end of file diff --git a/be/test/util/test_data/cgroup_cpu_data/test21/cpuset.cpus.effective b/be/test/util/test_data/cgroup_cpu_data/test21/cpuset.cpus.effective new file mode 100644 index 00000000000..745f3eb7203 --- /dev/null +++ b/be/test/util/test_data/cgroup_cpu_data/test21/cpuset.cpus.effective @@ -0,0 +1 @@ +0-6 \ No newline at end of file diff --git a/be/test/util/test_data/cgroup_cpu_data/test22/child/cpuset.cpus.effective b/be/test/util/test_data/cgroup_cpu_data/test22/child/cpuset.cpus.effective new file mode 100644 index 00000000000..e69de29bb2d diff --git a/be/test/util/test_data/cgroup_cpu_data/test22/cpuset.cpus.effective b/be/test/util/test_data/cgroup_cpu_data/test22/cpuset.cpus.effective new file mode 100644 index 00000000000..745f3eb7203 --- /dev/null +++ b/be/test/util/test_data/cgroup_cpu_data/test22/cpuset.cpus.effective @@ -0,0 +1 @@ +0-6 \ No newline at end of file diff --git a/be/test/util/test_data/cgroup_cpu_data/test31/child/cpu.cfs_period_us b/be/test/util/test_data/cgroup_cpu_data/test31/child/cpu.cfs_period_us new file mode 100644 index 00000000000..483fb82b6dd --- /dev/null +++ b/be/test/util/test_data/cgroup_cpu_data/test31/child/cpu.cfs_period_us @@ -0,0 +1 @@ +100000 \ No newline at end of file diff --git a/be/test/util/test_data/cgroup_cpu_data/test31/child/cpu.cfs_quota_us b/be/test/util/test_data/cgroup_cpu_data/test31/child/cpu.cfs_quota_us new file mode 100644 index 00000000000..516a58aff39 --- /dev/null +++ b/be/test/util/test_data/cgroup_cpu_data/test31/child/cpu.cfs_quota_us @@ -0,0 +1 @@ +500000 \ No newline at end of file diff --git a/be/test/util/test_data/cgroup_cpu_data/test31/cpu.cfs_period_us b/be/test/util/test_data/cgroup_cpu_data/test31/cpu.cfs_period_us new file mode 100644 index 00000000000..483fb82b6dd --- /dev/null +++ b/be/test/util/test_data/cgroup_cpu_data/test31/cpu.cfs_period_us @@ -0,0 +1 @@ +100000 \ No newline at end of file diff --git a/be/test/util/test_data/cgroup_cpu_data/test31/cpu.cfs_quota_us b/be/test/util/test_data/cgroup_cpu_data/test31/cpu.cfs_quota_us new file mode 100644 index 00000000000..212f56fce5d --- /dev/null +++ b/be/test/util/test_data/cgroup_cpu_data/test31/cpu.cfs_quota_us @@ -0,0 +1 @@ +600000 \ No newline at end of file diff --git a/be/test/util/test_data/cgroup_cpu_data/test32/child/cpu.cfs_period_us b/be/test/util/test_data/cgroup_cpu_data/test32/child/cpu.cfs_period_us new file mode 100644 index 00000000000..483fb82b6dd --- /dev/null +++ b/be/test/util/test_data/cgroup_cpu_data/test32/child/cpu.cfs_period_us @@ -0,0 +1 @@ +100000 \ No newline at end of file diff --git a/be/test/util/test_data/cgroup_cpu_data/test32/child/cpu.cfs_quota_us b/be/test/util/test_data/cgroup_cpu_data/test32/child/cpu.cfs_quota_us new file mode 100644 index 00000000000..d7d17fcbef9 --- /dev/null +++ b/be/test/util/test_data/cgroup_cpu_data/test32/child/cpu.cfs_quota_us @@ -0,0 +1 @@ +-1 \ No newline at end of file diff --git a/be/test/util/test_data/cgroup_cpu_data/test32/cpu.cfs_period_us b/be/test/util/test_data/cgroup_cpu_data/test32/cpu.cfs_period_us new file mode 100644 index 00000000000..483fb82b6dd --- /dev/null +++ b/be/test/util/test_data/cgroup_cpu_data/test32/cpu.cfs_period_us @@ -0,0 +1 @@ +100000 \ No newline at end of file diff --git a/be/test/util/test_data/cgroup_cpu_data/test32/cpu.cfs_quota_us b/be/test/util/test_data/cgroup_cpu_data/test32/cpu.cfs_quota_us new file mode 100644 index 00000000000..212f56fce5d --- /dev/null +++ b/be/test/util/test_data/cgroup_cpu_data/test32/cpu.cfs_quota_us @@ -0,0 +1 @@ +600000 \ No newline at end of file diff --git a/be/test/util/test_data/cgroup_cpu_data/test33/child/cpu.cfs_period_us b/be/test/util/test_data/cgroup_cpu_data/test33/child/cpu.cfs_period_us new file mode 100644 index 00000000000..483fb82b6dd --- /dev/null +++ b/be/test/util/test_data/cgroup_cpu_data/test33/child/cpu.cfs_period_us @@ -0,0 +1 @@ +100000 \ No newline at end of file diff --git a/be/test/util/test_data/cgroup_cpu_data/test33/child/cpu.cfs_quota_us b/be/test/util/test_data/cgroup_cpu_data/test33/child/cpu.cfs_quota_us new file mode 100644 index 00000000000..d7d17fcbef9 --- /dev/null +++ b/be/test/util/test_data/cgroup_cpu_data/test33/child/cpu.cfs_quota_us @@ -0,0 +1 @@ +-1 \ No newline at end of file diff --git a/be/test/util/test_data/cgroup_cpu_data/test33/cpu.cfs_period_us b/be/test/util/test_data/cgroup_cpu_data/test33/cpu.cfs_period_us new file mode 100644 index 00000000000..483fb82b6dd --- /dev/null +++ b/be/test/util/test_data/cgroup_cpu_data/test33/cpu.cfs_period_us @@ -0,0 +1 @@ +100000 \ No newline at end of file diff --git a/be/test/util/test_data/cgroup_cpu_data/test33/cpu.cfs_quota_us b/be/test/util/test_data/cgroup_cpu_data/test33/cpu.cfs_quota_us new file mode 100644 index 00000000000..d7d17fcbef9 --- /dev/null +++ b/be/test/util/test_data/cgroup_cpu_data/test33/cpu.cfs_quota_us @@ -0,0 +1 @@ +-1 \ No newline at end of file diff --git a/be/test/util/test_data/cgroup_cpu_data/test41/cpuset.cpus b/be/test/util/test_data/cgroup_cpu_data/test41/cpuset.cpus new file mode 100644 index 00000000000..b62d3ca93d6 --- /dev/null +++ b/be/test/util/test_data/cgroup_cpu_data/test41/cpuset.cpus @@ -0,0 +1 @@ +1-3 \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org