This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 673f4628eb6 [feat](metrics) Unify metrics of thread pool (#43144) 673f4628eb6 is described below commit 673f4628eb645558c717fc2ea620e469ab96ecbb Author: zhiqiang <hezhiqi...@selectdb.com> AuthorDate: Wed Jan 1 16:06:59 2025 +0800 [feat](metrics) Unify metrics of thread pool (#43144) ### What problem does this PR solve? Add metrics for all thread pool, more specifically, for all ThreadPool objects. All thread pool will have following metrics: 1. thread_pool_active_threads 2. thread_pool_queue_size 3. thread_pool_max_queue_size 4. thread_pool_max_threads 5. task_execution_time_ns_avg_in_last_1000_times 6. task_wait_worker_ns_avg_in_last_1000_times A new class `IntervalHistogramStat` is created for interval histogram calculation. Metrics is updated by `hook` method when they are needed by prometheus. - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label <!-- Add branch pick label that this PR should merge into --> --- be/src/olap/memtable_flush_executor.cpp | 16 ----- be/src/olap/memtable_flush_executor.h | 4 -- be/src/runtime/exec_env.h | 3 + be/src/runtime/exec_env_init.cpp | 42 +----------- be/src/runtime/fragment_mgr.cpp | 10 +-- be/src/runtime/workload_group/workload_group.cpp | 4 +- be/src/util/doris_metrics.cpp | 6 -- be/src/util/doris_metrics.h | 22 ------ be/src/util/interval_histogram.cpp | 80 ++++++++++++++++++++++ be/src/util/interval_histogram.h | 46 +++++++++++++ be/src/util/metrics.cpp | 5 +- be/src/util/threadpool.cpp | 69 +++++++++++++++++-- be/src/util/threadpool.h | 30 +++++++- be/src/vec/exec/scan/scanner_scheduler.cpp | 54 +-------------- be/src/vec/exec/scan/scanner_scheduler.h | 19 +++-- be/test/io/fs/buffered_reader_test.cpp | 12 ++-- be/test/io/fs/remote_file_system_test.cpp | 9 +-- be/test/io/fs/s3_file_writer_test.cpp | 1 + be/test/olap/rowset/beta_rowset_test.cpp | 2 + .../rowset/unique_rowset_id_generator_test.cpp | 1 + be/test/testutil/run_all_tests.cpp | 12 ++++ be/test/util/countdown_latch_test.cpp | 1 + be/test/util/interval_histogram_test.cpp | 78 +++++++++++++++++++++ .../agg_linear_histogram_test.cpp | 4 +- .../workload_manager_p0/test_curd_wlg.groovy | 56 +++++++++++++++ 25 files changed, 402 insertions(+), 184 deletions(-) diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp index 5cdb45281b9..0181cc1d64d 100644 --- a/be/src/olap/memtable_flush_executor.cpp +++ b/be/src/olap/memtable_flush_executor.cpp @@ -37,9 +37,6 @@ namespace doris { using namespace ErrorCode; -DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(flush_thread_pool_queue_size, MetricUnit::NOUNIT); -DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(flush_thread_pool_thread_num, MetricUnit::NOUNIT); - bvar::Adder<int64_t> g_flush_task_num("memtable_flush_task_num"); class MemtableFlushTask final : public Runnable { @@ -239,7 +236,6 @@ void MemTableFlushExecutor::init(int num_disk) { .set_min_threads(min_threads) .set_max_threads(max_threads) .build(&_high_prio_flush_pool)); - _register_metrics(); } // NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are flushed in order. @@ -263,16 +259,4 @@ Status MemTableFlushExecutor::create_flush_token(std::shared_ptr<FlushToken>& fl } } -void MemTableFlushExecutor::_register_metrics() { - REGISTER_HOOK_METRIC(flush_thread_pool_queue_size, - [this]() { return _flush_pool->get_queue_size(); }); - REGISTER_HOOK_METRIC(flush_thread_pool_thread_num, - [this]() { return _flush_pool->num_threads(); }) -} - -void MemTableFlushExecutor::_deregister_metrics() { - DEREGISTER_HOOK_METRIC(flush_thread_pool_queue_size); - DEREGISTER_HOOK_METRIC(flush_thread_pool_thread_num); -} - } // namespace doris diff --git a/be/src/olap/memtable_flush_executor.h b/be/src/olap/memtable_flush_executor.h index 27e8e8a9b0e..753f1106646 100644 --- a/be/src/olap/memtable_flush_executor.h +++ b/be/src/olap/memtable_flush_executor.h @@ -127,7 +127,6 @@ class MemTableFlushExecutor { public: MemTableFlushExecutor() = default; ~MemTableFlushExecutor() { - _deregister_metrics(); _flush_pool->shutdown(); _high_prio_flush_pool->shutdown(); } @@ -141,9 +140,6 @@ public: std::shared_ptr<WorkloadGroup> wg_sptr); private: - void _register_metrics(); - static void _deregister_metrics(); - std::unique_ptr<ThreadPool> _flush_pool; std::unique_ptr<ThreadPool> _high_prio_flush_pool; }; diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 0c9a4158ebc..12d625da3bf 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -300,6 +300,9 @@ public: _s_tracking_memory.store(tracking_memory, std::memory_order_release); } void set_orc_memory_pool(orc::MemoryPool* pool) { _orc_memory_pool = pool; } + void set_non_block_close_thread_pool(std::unique_ptr<ThreadPool>&& pool) { + _non_block_close_thread_pool = std::move(pool); + } #endif LoadStreamMapPool* load_stream_map_pool() { return _load_stream_map_pool.get(); } diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index df66315ff05..f0d8253254f 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -118,31 +118,10 @@ #include "runtime/memory/tcmalloc_hook.h" #endif -// Used for unit test -namespace { -std::once_flag flag; -std::unique_ptr<doris::ThreadPool> non_block_close_thread_pool; -void init_threadpool_for_test() { - static_cast<void>(doris::ThreadPoolBuilder("NonBlockCloseThreadPool") - .set_min_threads(12) - .set_max_threads(48) - .build(&non_block_close_thread_pool)); -} - -[[maybe_unused]] doris::ThreadPool* get_non_block_close_thread_pool() { - std::call_once(flag, init_threadpool_for_test); - return non_block_close_thread_pool.get(); -} -} // namespace - namespace doris { class PBackendService_Stub; class PFunctionService_Stub; -DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(scanner_thread_pool_queue_size, MetricUnit::NOUNIT); -DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_thread_num, MetricUnit::NOUNIT); -DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_queue_size, MetricUnit::NOUNIT); - static void init_doris_metrics(const std::vector<StorePath>& store_paths) { bool init_system_metrics = config::enable_system_metrics; std::set<std::string> disk_devices; @@ -178,11 +157,7 @@ static pair<size_t, size_t> get_num_threads(size_t min_num, size_t max_num) { } ThreadPool* ExecEnv::non_block_close_thread_pool() { -#ifdef BE_TEST - return get_non_block_close_thread_pool(); -#else return _non_block_close_thread_pool.get(); -#endif } ExecEnv::ExecEnv() = default; @@ -342,7 +317,6 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths, RETURN_IF_ERROR(_load_channel_mgr->init(MemInfo::mem_limit())); RETURN_IF_ERROR(_wal_manager->init()); _heartbeat_flags = new HeartbeatFlags(); - _register_metrics(); _tablet_schema_cache = TabletSchemaCache::create_global_schema_cache(config::tablet_schema_cache_capacity); @@ -672,19 +646,6 @@ Status ExecEnv::_check_deploy_mode() { return Status::OK(); } -void ExecEnv::_register_metrics() { - REGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num, - [this]() { return _send_batch_thread_pool->num_threads(); }); - - REGISTER_HOOK_METRIC(send_batch_thread_pool_queue_size, - [this]() { return _send_batch_thread_pool->get_queue_size(); }); -} - -void ExecEnv::_deregister_metrics() { - DEREGISTER_HOOK_METRIC(scanner_thread_pool_queue_size); - DEREGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num); - DEREGISTER_HOOK_METRIC(send_batch_thread_pool_queue_size); -} #ifdef BE_TEST void ExecEnv::set_new_load_stream_mgr(std::unique_ptr<NewLoadStreamMgr>&& new_load_stream_mgr) { this->_new_load_stream_mgr = std::move(new_load_stream_mgr); @@ -740,6 +701,7 @@ void ExecEnv::destroy() { SAFE_STOP(_fragment_mgr); SAFE_STOP(_runtime_filter_timer_queue); // NewLoadStreamMgr should be destoried before storage_engine & after fragment_mgr stopped. + _load_stream_mgr.reset(); _new_load_stream_mgr.reset(); _stream_load_executor.reset(); _memtable_memory_limiter.reset(); @@ -762,8 +724,8 @@ void ExecEnv::destroy() { SAFE_SHUTDOWN(_non_block_close_thread_pool); SAFE_SHUTDOWN(_s3_file_system_thread_pool); SAFE_SHUTDOWN(_send_batch_thread_pool); + SAFE_SHUTDOWN(_send_table_stats_thread_pool); - _deregister_metrics(); SAFE_DELETE(_load_channel_mgr); SAFE_DELETE(_spill_stream_mgr); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 19e8f76366c..60b7856d6aa 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -97,8 +97,7 @@ namespace doris { DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_instance_count, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(timeout_canceled_fragment_count, MetricUnit::NOUNIT); -DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_thread_pool_queue_size, MetricUnit::NOUNIT); -DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_thread_pool_num_active_threads, MetricUnit::NOUNIT); + bvar::LatencyRecorder g_fragmentmgr_prepare_latency("doris_FragmentMgr", "prepare"); bvar::Adder<uint64_t> g_fragment_executing_count("fragment_executing_count"); @@ -243,11 +242,6 @@ FragmentMgr::FragmentMgr(ExecEnv* exec_env) .set_max_threads(config::fragment_mgr_asynic_work_pool_thread_num_max) .set_max_queue_size(config::fragment_mgr_asynic_work_pool_queue_size) .build(&_thread_pool); - - REGISTER_HOOK_METRIC(fragment_thread_pool_queue_size, - [this]() { return _thread_pool->get_queue_size(); }); - REGISTER_HOOK_METRIC(fragment_thread_pool_num_active_threads, - [this]() { return _thread_pool->num_active_threads(); }); CHECK(s.ok()) << s.to_string(); } @@ -255,8 +249,6 @@ FragmentMgr::~FragmentMgr() = default; void FragmentMgr::stop() { DEREGISTER_HOOK_METRIC(fragment_instance_count); - DEREGISTER_HOOK_METRIC(fragment_thread_pool_queue_size); - DEREGISTER_HOOK_METRIC(fragment_thread_pool_num_active_threads); _stop_background_threads_latch.count_down(); if (_cancel_thread) { _cancel_thread->join(); diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index 6b9388af30a..3ceeed2de19 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -487,7 +487,7 @@ void WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info, if (_scan_task_sched == nullptr) { std::unique_ptr<vectorized::SimplifiedScanScheduler> scan_scheduler = std::make_unique<vectorized::SimplifiedScanScheduler>("Scan_" + wg_name, - cg_cpu_ctl_ptr); + cg_cpu_ctl_ptr, wg_name); Status ret = scan_scheduler->start(config::doris_scanner_thread_pool_thread_num, config::doris_scanner_thread_pool_thread_num, config::doris_scanner_thread_pool_queue_size); @@ -507,7 +507,7 @@ void WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info, vectorized::ScannerScheduler::get_remote_scan_thread_queue_size(); std::unique_ptr<vectorized::SimplifiedScanScheduler> remote_scan_scheduler = std::make_unique<vectorized::SimplifiedScanScheduler>("RScan_" + wg_name, - cg_cpu_ctl_ptr); + cg_cpu_ctl_ptr, wg_name); Status ret = remote_scan_scheduler->start(remote_max_thread_num, config::doris_scanner_min_thread_pool_thread_num, remote_scan_thread_queue_size); diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp index e77ee1c36b6..39f246d98d3 100644 --- a/be/src/util/doris_metrics.cpp +++ b/be/src/util/doris_metrics.cpp @@ -191,9 +191,6 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_ctx_cnt, MetricUnit::NOUNIT); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_ctx_cnt, MetricUnit::NOUNIT); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_cnt, MetricUnit::NOUNIT); DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_cnt, MetricUnit::NOUNIT); -DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_queued, MetricUnit::NOUNIT); -DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_running, MetricUnit::NOUNIT); -DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_submit_failed, MetricUnit::NOUNIT); const std::string DorisMetrics::_s_registry_name = "doris_be"; const std::string DorisMetrics::_s_hook_name = "doris_metrics"; @@ -319,9 +316,6 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) { INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_ctx_cnt); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_cnt); INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_cnt); - INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_queued); - INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_running); - INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_submit_failed); } void DorisMetrics::initialize(bool init_system_metrics, const std::set<std::string>& disk_devices, diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index d089758c21c..26bad02fdd2 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -197,13 +197,6 @@ public: UIntGauge* query_cache_sql_total_count = nullptr; UIntGauge* query_cache_partition_total_count = nullptr; - UIntGauge* scanner_thread_pool_queue_size = nullptr; - UIntGauge* add_batch_task_queue_size = nullptr; - UIntGauge* send_batch_thread_pool_thread_num = nullptr; - UIntGauge* send_batch_thread_pool_queue_size = nullptr; - UIntGauge* fragment_thread_pool_queue_size = nullptr; - UIntGauge* fragment_thread_pool_num_active_threads = nullptr; - // Upload metrics UIntGauge* upload_total_byte = nullptr; IntCounter* upload_rowset_count = nullptr; @@ -224,18 +217,6 @@ public: UIntGauge* arrow_flight_work_pool_max_queue_size = nullptr; UIntGauge* arrow_flight_work_max_threads = nullptr; - UIntGauge* flush_thread_pool_queue_size = nullptr; - UIntGauge* flush_thread_pool_thread_num = nullptr; - - UIntGauge* local_scan_thread_pool_queue_size = nullptr; - UIntGauge* local_scan_thread_pool_thread_num = nullptr; - UIntGauge* remote_scan_thread_pool_queue_size = nullptr; - UIntGauge* remote_scan_thread_pool_thread_num = nullptr; - UIntGauge* limited_scan_thread_pool_queue_size = nullptr; - UIntGauge* limited_scan_thread_pool_thread_num = nullptr; - UIntGauge* group_local_scan_thread_pool_queue_size = nullptr; - UIntGauge* group_local_scan_thread_pool_thread_num = nullptr; - IntCounter* num_io_bytes_read_total = nullptr; IntCounter* num_io_bytes_read_from_cache = nullptr; IntCounter* num_io_bytes_read_from_remote = nullptr; @@ -244,9 +225,6 @@ public: IntCounter* scanner_ctx_cnt = nullptr; IntCounter* scanner_cnt = nullptr; IntCounter* scanner_task_cnt = nullptr; - IntCounter* scanner_task_queued = nullptr; - IntCounter* scanner_task_submit_failed = nullptr; - IntCounter* scanner_task_running = nullptr; static DorisMetrics* instance() { static DorisMetrics instance; diff --git a/be/src/util/interval_histogram.cpp b/be/src/util/interval_histogram.cpp new file mode 100644 index 00000000000..ec894fc69fa --- /dev/null +++ b/be/src/util/interval_histogram.cpp @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/interval_histogram.h" + +#include <algorithm> +#include <mutex> +#include <numeric> +#include <vector> + +#include "gutil/integral_types.h" + +namespace doris { + +template <typename T> +IntervalHistogramStat<T>::IntervalHistogramStat(size_t N) : window(N) {} + +template <typename T> +void IntervalHistogramStat<T>::add(T value) { + std::unique_lock<std::shared_mutex> lock(mutex); + if (window.full()) { + window.pop_front(); + } + window.push_back(value); +} + +template <typename T> +T IntervalHistogramStat<T>::mean() { + std::shared_lock<std::shared_mutex> lock(mutex); + if (window.empty()) { + return T(); + } + T sum = std::accumulate(window.begin(), window.end(), T()); + return sum / window.size(); +} + +template <typename T> +T IntervalHistogramStat<T>::median() { + std::shared_lock<std::shared_mutex> lock(mutex); + if (window.empty()) { + return T(); + } + + std::vector<T> sorted(window.begin(), window.end()); + std::sort(sorted.begin(), sorted.end()); + + size_t mid = sorted.size() / 2; + return sorted.size() % 2 == 0 ? (sorted[mid - 1] + sorted[mid]) / 2 : sorted[mid]; +} + +template <typename T> +T IntervalHistogramStat<T>::max() { + std::shared_lock<std::shared_mutex> lock(mutex); + return *std::max_element(window.begin(), window.end()); +} + +template <typename T> +T IntervalHistogramStat<T>::min() { + std::shared_lock<std::shared_mutex> lock(mutex); + return *std::min_element(window.begin(), window.end()); +} + +template class doris::IntervalHistogramStat<int64>; +template class doris::IntervalHistogramStat<int32>; + +} // namespace doris diff --git a/be/src/util/interval_histogram.h b/be/src/util/interval_histogram.h new file mode 100644 index 00000000000..2d5d9e6c6d2 --- /dev/null +++ b/be/src/util/interval_histogram.h @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <boost/circular_buffer.hpp> +#include <shared_mutex> + +namespace doris { + +// A thread-safe interval histogram stat class. +// IntervalHistogramStat will keep a FIXED-SIZE window of values and provide +// statistics like mean, median, max, min. + +template <typename T> +class IntervalHistogramStat { +public: + explicit IntervalHistogramStat(size_t N); + + void add(T value); + + T mean(); + T median(); + T max(); + T min(); + +private: + boost::circular_buffer<T> window; + mutable std::shared_mutex mutex; +}; + +} // namespace doris diff --git a/be/src/util/metrics.cpp b/be/src/util/metrics.cpp index 23dbb628a0d..1a3aa51cd2b 100644 --- a/be/src/util/metrics.cpp +++ b/be/src/util/metrics.cpp @@ -169,6 +169,7 @@ std::string HistogramMetric::to_string() const { std::string HistogramMetric::to_prometheus(const std::string& display_name, const Labels& entity_labels, const Labels& metric_labels) const { + // TODO: Use std::string concate for better performance. std::stringstream ss; for (const auto& percentile : _s_output_percentiles) { auto quantile_lable = Labels({{"quantile", percentile.first}}); @@ -322,12 +323,12 @@ void MetricRegistry::trigger_all_hooks(bool force) const { std::string MetricRegistry::to_prometheus(bool with_tablet_metrics) const { // Reorder by MetricPrototype EntityMetricsByType entity_metrics_by_types; - std::lock_guard<std::mutex> l(_lock); + std::lock_guard<std::mutex> l1(_lock); for (const auto& entity : _entities) { if (entity.first->_type == MetricEntityType::kTablet && !with_tablet_metrics) { continue; } - std::lock_guard<std::mutex> l(entity.first->_lock); + std::lock_guard<std::mutex> l2(entity.first->_lock); entity.first->trigger_hook_unlocked(false); for (const auto& metric : entity.first->_metrics) { std::pair<MetricEntity*, Metric*> new_elem = diff --git a/be/src/util/threadpool.cpp b/be/src/util/threadpool.cpp index e9af13f556e..5548ad7f400 100644 --- a/be/src/util/threadpool.cpp +++ b/be/src/util/threadpool.cpp @@ -33,10 +33,23 @@ #include "gutil/port.h" #include "gutil/strings/substitute.h" #include "util/debug/sanitizer_scopes.h" +#include "util/doris_metrics.h" +#include "util/metrics.h" #include "util/scoped_cleanup.h" +#include "util/stopwatch.hpp" #include "util/thread.h" namespace doris { +// The name of these varialbs will be useds as metric name in prometheus. +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_active_threads, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_max_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_max_threads, MetricUnit::NOUNIT); +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_submit_failed, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(task_execution_time_ns_avg_in_last_1000_times, + MetricUnit::NANOSECONDS); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(task_wait_worker_ns_avg_in_last_1000_times, + MetricUnit::NANOSECONDS); using namespace ErrorCode; using std::string; @@ -52,8 +65,9 @@ private: std::function<void()> _func; }; -ThreadPoolBuilder::ThreadPoolBuilder(string name) +ThreadPoolBuilder::ThreadPoolBuilder(string name, string workload_group) : _name(std::move(name)), + _workload_group(std::move(workload_group)), _min_threads(0), _max_threads(std::thread::hardware_concurrency()), _max_queue_size(std::numeric_limits<int>::max()), @@ -238,6 +252,7 @@ bool ThreadPoolToken::need_dispatch() { ThreadPool::ThreadPool(const ThreadPoolBuilder& builder) : _name(builder._name), + _workload_group(builder._workload_group), _min_threads(builder._min_threads), _max_threads(builder._max_threads), _max_queue_size(builder._max_queue_size), @@ -248,7 +263,8 @@ ThreadPool::ThreadPool(const ThreadPoolBuilder& builder) _active_threads(0), _total_queued_tasks(0), _cgroup_cpu_ctl(builder._cgroup_cpu_ctl), - _tokenless(new_token(ExecutionMode::CONCURRENT)) {} + _tokenless(new_token(ExecutionMode::CONCURRENT)), + _id(UniqueId::gen_uid()) {} ThreadPool::~ThreadPool() { // There should only be one live token: the one used in tokenless submission. @@ -270,10 +286,48 @@ Status ThreadPool::init() { return status; } } + // _id of thread pool is used to make sure when we create thread pool with same name, we can + // get different _metric_entity + // If not, we will have problem when we deregister entity and register hook. + _metric_entity = DorisMetrics::instance()->metric_registry()->register_entity( + fmt::format("thread_pool_{}", _name), {{"thread_pool_name", _name}, + {"workload_group", _workload_group}, + {"id", _id.to_string()}}); + + INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_active_threads); + INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_max_threads); + INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_queue_size); + INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_max_queue_size); + INT_GAUGE_METRIC_REGISTER(_metric_entity, task_execution_time_ns_avg_in_last_1000_times); + INT_GAUGE_METRIC_REGISTER(_metric_entity, task_wait_worker_ns_avg_in_last_1000_times); + INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_submit_failed); + + _metric_entity->register_hook("update", [this]() { + { + std::lock_guard<std::mutex> l(_lock); + if (!_pool_status.ok()) { + return; + } + } + + thread_pool_active_threads->set_value(num_active_threads()); + thread_pool_queue_size->set_value(get_queue_size()); + thread_pool_max_queue_size->set_value(get_max_queue_size()); + thread_pool_max_threads->set_value(max_threads()); + task_execution_time_ns_avg_in_last_1000_times->set_value( + _task_execution_time_ns_statistic.mean()); + task_wait_worker_ns_avg_in_last_1000_times->set_value( + _task_wait_worker_time_ns_statistic.mean()); + }); return Status::OK(); } void ThreadPool::shutdown() { + // Why access to doris_metrics is safe here? + // Since DorisMetrics is a singleton, it will be destroyed only after doris_main is exited. + // The shutdown/destroy of ThreadPool is guaranteed to take place before doris_main exits by + // ExecEnv::destroy(). + DorisMetrics::instance()->metric_registry()->deregister_entity(_metric_entity); debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; std::unique_lock<std::mutex> l(_lock); check_not_pool_thread_unlocked(); @@ -357,8 +411,6 @@ Status ThreadPool::submit_func(std::function<void()> f) { Status ThreadPool::do_submit(std::shared_ptr<Runnable> r, ThreadPoolToken* token) { DCHECK(token); - std::chrono::time_point<std::chrono::system_clock> submit_time = - std::chrono::system_clock::now(); std::unique_lock<std::mutex> l(_lock); if (PREDICT_FALSE(!_pool_status.ok())) { @@ -373,6 +425,7 @@ Status ThreadPool::do_submit(std::shared_ptr<Runnable> r, ThreadPoolToken* token int64_t capacity_remaining = static_cast<int64_t>(_max_threads) - _active_threads + static_cast<int64_t>(_max_queue_size) - _total_queued_tasks; if (capacity_remaining < 1) { + thread_pool_submit_failed->increment(1); return Status::Error<SERVICE_UNAVAILABLE>( "Thread pool {} is at capacity ({}/{} tasks running, {}/{} tasks queued)", _name, _num_threads + _num_threads_pending_start, _max_threads, _total_queued_tasks, @@ -408,7 +461,7 @@ Status ThreadPool::do_submit(std::shared_ptr<Runnable> r, ThreadPoolToken* token Task task; task.runnable = std::move(r); - task.submit_time = submit_time; + task.submit_time_wather.start(); // Add the task to the token's queue. ThreadPoolToken::State state = token->state(); @@ -528,12 +581,15 @@ void ThreadPool::dispatch_thread() { continue; } + MonotonicStopWatch task_execution_time_watch; + task_execution_time_watch.start(); // Get the next token and task to execute. ThreadPoolToken* token = _queue.front(); _queue.pop_front(); DCHECK_EQ(ThreadPoolToken::State::RUNNING, token->state()); DCHECK(!token->_entries.empty()); Task task = std::move(token->_entries.front()); + _task_wait_worker_time_ns_statistic.add(task.submit_time_wather.elapsed_time()); token->_entries.pop_front(); token->_active_threads++; --_total_queued_tasks; @@ -543,7 +599,6 @@ void ThreadPool::dispatch_thread() { // Execute the task task.runnable->run(); - // Destruct the task while we do not hold the lock. // // The task's destructor may be expensive if it has a lot of bound @@ -552,7 +607,7 @@ void ThreadPool::dispatch_thread() { // with this threadpool, and produce a deadlock. task.runnable.reset(); l.lock(); - + _task_execution_time_ns_statistic.add(task_execution_time_watch.elapsed_time()); // Possible states: // 1. The token was shut down while we ran its task. Transition to QUIESCED. // 2. The token has no more queued tasks. Transition back to IDLE. diff --git a/be/src/util/threadpool.h b/be/src/util/threadpool.h index f822c307aa6..8c89f570a0a 100644 --- a/be/src/util/threadpool.h +++ b/be/src/util/threadpool.h @@ -20,6 +20,8 @@ #pragma once +#include <gen_cpp/Types_types.h> + #include <boost/intrusive/detail/algo_type.hpp> #include <boost/intrusive/list.hpp> #include <boost/intrusive/list_hook.hpp> @@ -38,6 +40,9 @@ #include "agent/cgroup_cpu_ctl.h" #include "common/status.h" +#include "util/interval_histogram.h" +#include "util/metrics.h" +#include "util/uid_util.h" #include "util/work_thread_pool.hpp" namespace doris { @@ -99,7 +104,7 @@ public: // class ThreadPoolBuilder { public: - explicit ThreadPoolBuilder(std::string name); + explicit ThreadPoolBuilder(std::string name, std::string workload_group = ""); // Note: We violate the style guide by returning mutable references here // in order to provide traditional Builder pattern conveniences. @@ -132,6 +137,7 @@ public: private: friend class ThreadPool; const std::string _name; + const std::string _workload_group; int _min_threads; int _max_threads; int _max_queue_size; @@ -255,6 +261,11 @@ public: return _total_queued_tasks; } + int get_max_queue_size() const { + std::lock_guard<std::mutex> l(_lock); + return _max_queue_size; + } + std::vector<int> debug_info() const { std::lock_guard<std::mutex> l(_lock); std::vector<int> arr = {_num_threads, static_cast<int>(_threads.size()), _min_threads, @@ -280,7 +291,7 @@ private: std::shared_ptr<Runnable> runnable; // Time at which the entry was submitted to the pool. - std::chrono::time_point<std::chrono::system_clock> submit_time; + MonotonicStopWatch submit_time_wather; }; // Creates a new thread pool using a builder. @@ -308,6 +319,7 @@ private: void release_token(ThreadPoolToken* t); const std::string _name; + const std::string _workload_group; int _min_threads; int _max_threads; const int _max_queue_size; @@ -392,6 +404,20 @@ private: // ExecutionMode::CONCURRENT token used by the pool for tokenless submission. std::unique_ptr<ThreadPoolToken> _tokenless; + const UniqueId _id; + + std::shared_ptr<MetricEntity> _metric_entity; + IntGauge* thread_pool_active_threads = nullptr; + IntGauge* thread_pool_queue_size = nullptr; + IntGauge* thread_pool_max_queue_size = nullptr; + IntGauge* thread_pool_max_threads = nullptr; + IntGauge* task_execution_time_ns_avg_in_last_1000_times = nullptr; + IntGauge* task_wait_worker_ns_avg_in_last_1000_times = nullptr; + + IntervalHistogramStat<int64_t> _task_execution_time_ns_statistic {1000}; + IntervalHistogramStat<int64_t> _task_wait_worker_time_ns_statistic {1000}; + + IntCounter* thread_pool_submit_failed = nullptr; }; // Entry point for token-based task submission and blocking for a particular diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index f419f58037a..1b14d172790 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -56,24 +56,9 @@ namespace doris::vectorized { -DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(local_scan_thread_pool_queue_size, MetricUnit::NOUNIT); -DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(local_scan_thread_pool_thread_num, MetricUnit::NOUNIT); -DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(remote_scan_thread_pool_queue_size, MetricUnit::NOUNIT); -DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(remote_scan_thread_pool_thread_num, MetricUnit::NOUNIT); -DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(limited_scan_thread_pool_queue_size, MetricUnit::NOUNIT); -DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(limited_scan_thread_pool_thread_num, MetricUnit::NOUNIT); -DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(group_local_scan_thread_pool_queue_size, MetricUnit::NOUNIT); -DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(group_local_scan_thread_pool_thread_num, MetricUnit::NOUNIT); - ScannerScheduler::ScannerScheduler() = default; -ScannerScheduler::~ScannerScheduler() { - if (!_is_init) { - return; - } - - _deregister_metrics(); -} +ScannerScheduler::~ScannerScheduler() = default; void ScannerScheduler::stop() { if (!_is_init) { @@ -116,7 +101,6 @@ Status ScannerScheduler::init(ExecEnv* env) { .set_max_threads(config::doris_scanner_thread_pool_thread_num) .set_max_queue_size(config::doris_scanner_thread_pool_queue_size) .build(&_limited_scan_thread_pool)); - _register_metrics(); _is_init = true; return Status::OK(); } @@ -141,11 +125,6 @@ Status ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx, scanner_delegate->_scanner->start_wait_worker_timer(); auto s = ctx->thread_token->submit_func([scanner_ref = scan_task, ctx]() { - DorisMetrics::instance()->scanner_task_queued->increment(-1); - DorisMetrics::instance()->scanner_task_running->increment(1); - Defer metrics_defer( - [&] { DorisMetrics::instance()->scanner_task_running->increment(-1); }); - auto status = [&] { RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref)); return Status::OK(); @@ -171,11 +150,6 @@ Status ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx, auto sumbit_task = [&]() { SimplifiedScanScheduler* scan_sched = ctx->get_scan_scheduler(); auto work_func = [scanner_ref = scan_task, ctx]() { - DorisMetrics::instance()->scanner_task_queued->increment(-1); - DorisMetrics::instance()->scanner_task_running->increment(1); - Defer metrics_defer( - [&] { DorisMetrics::instance()->scanner_task_running->increment(-1); }); - auto status = [&] { RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref)); return Status::OK(); @@ -348,32 +322,6 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx, ctx->push_back_scan_task(scan_task); } -void ScannerScheduler::_register_metrics() { - REGISTER_HOOK_METRIC(local_scan_thread_pool_queue_size, - [this]() { return _local_scan_thread_pool->get_queue_size(); }); - REGISTER_HOOK_METRIC(local_scan_thread_pool_thread_num, - [this]() { return _local_scan_thread_pool->get_active_threads(); }); - REGISTER_HOOK_METRIC(remote_scan_thread_pool_queue_size, - [this]() { return _remote_scan_thread_pool->get_queue_size(); }); - REGISTER_HOOK_METRIC(remote_scan_thread_pool_thread_num, - [this]() { return _remote_scan_thread_pool->get_active_threads(); }); - REGISTER_HOOK_METRIC(limited_scan_thread_pool_queue_size, - [this]() { return _limited_scan_thread_pool->get_queue_size(); }); - REGISTER_HOOK_METRIC(limited_scan_thread_pool_thread_num, - [this]() { return _limited_scan_thread_pool->num_threads(); }); -} - -void ScannerScheduler::_deregister_metrics() { - DEREGISTER_HOOK_METRIC(local_scan_thread_pool_queue_size); - DEREGISTER_HOOK_METRIC(local_scan_thread_pool_thread_num); - DEREGISTER_HOOK_METRIC(remote_scan_thread_pool_queue_size); - DEREGISTER_HOOK_METRIC(remote_scan_thread_pool_thread_num); - DEREGISTER_HOOK_METRIC(limited_scan_thread_pool_queue_size); - DEREGISTER_HOOK_METRIC(limited_scan_thread_pool_thread_num); - DEREGISTER_HOOK_METRIC(group_local_scan_thread_pool_queue_size); - DEREGISTER_HOOK_METRIC(group_local_scan_thread_pool_thread_num); -} - int ScannerScheduler::get_remote_scan_thread_num() { int remote_max_thread_num = config::doris_max_remote_scanner_thread_pool_thread_num != -1 ? config::doris_max_remote_scanner_thread_pool_thread_num diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index 7731b3ba8f9..e94659c79d1 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -114,8 +114,12 @@ struct SimplifiedScanTask { class SimplifiedScanScheduler { public: - SimplifiedScanScheduler(std::string sched_name, std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl) - : _is_stop(false), _cgroup_cpu_ctl(cgroup_cpu_ctl), _sched_name(sched_name) {} + SimplifiedScanScheduler(std::string sched_name, std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl, + std::string workload_group = "system") + : _is_stop(false), + _cgroup_cpu_ctl(cgroup_cpu_ctl), + _sched_name(sched_name), + _workload_group(workload_group) {} ~SimplifiedScanScheduler() { stop(); @@ -129,7 +133,7 @@ public: } Status start(int max_thread_num, int min_thread_num, int queue_size) { - RETURN_IF_ERROR(ThreadPoolBuilder(_sched_name) + RETURN_IF_ERROR(ThreadPoolBuilder(_sched_name, _workload_group) .set_min_threads(min_thread_num) .set_max_threads(max_thread_num) .set_max_queue_size(queue_size) @@ -140,13 +144,7 @@ public: Status submit_scan_task(SimplifiedScanTask scan_task) { if (!_is_stop) { - DorisMetrics::instance()->scanner_task_queued->increment(1); - auto st = _scan_thread_pool->submit_func([scan_task] { scan_task.scan_func(); }); - if (!st.ok()) { - DorisMetrics::instance()->scanner_task_queued->increment(-1); - DorisMetrics::instance()->scanner_task_submit_failed->increment(1); - } - return st; + return _scan_thread_pool->submit_func([scan_task] { scan_task.scan_func(); }); } else { return Status::InternalError<false>("scanner pool {} is shutdown.", _sched_name); } @@ -216,6 +214,7 @@ private: std::atomic<bool> _is_stop; std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl; std::string _sched_name; + std::string _workload_group; }; } // namespace doris::vectorized diff --git a/be/test/io/fs/buffered_reader_test.cpp b/be/test/io/fs/buffered_reader_test.cpp index 658c98ba514..1038f056aba 100644 --- a/be/test/io/fs/buffered_reader_test.cpp +++ b/be/test/io/fs/buffered_reader_test.cpp @@ -36,7 +36,10 @@ namespace doris { using io::FileReader; class BufferedReaderTest : public testing::Test { public: - BufferedReaderTest() { + BufferedReaderTest() = default; + +protected: + void SetUp() override { std::unique_ptr<ThreadPool> _pool; static_cast<void>(ThreadPoolBuilder("BufferedReaderPrefetchThreadPool") .set_min_threads(5) @@ -44,10 +47,9 @@ public: .build(&_pool)); ExecEnv::GetInstance()->_buffered_reader_prefetch_thread_pool = std::move(_pool); } - -protected: - virtual void SetUp() {} - virtual void TearDown() {} + void TearDown() override { + ExecEnv::GetInstance()->_buffered_reader_prefetch_thread_pool.reset(); + } }; class SyncLocalFileReader : public io::FileReader { diff --git a/be/test/io/fs/remote_file_system_test.cpp b/be/test/io/fs/remote_file_system_test.cpp index c5d80d1b65d..309a31eb97f 100644 --- a/be/test/io/fs/remote_file_system_test.cpp +++ b/be/test/io/fs/remote_file_system_test.cpp @@ -410,10 +410,10 @@ TEST_F(RemoteFileSystemTest, TestHdfsFileSystem) { TEST_F(RemoteFileSystemTest, TestS3FileSystem) { std::unique_ptr<ThreadPool> _pool; - ThreadPoolBuilder("S3FileUploadThreadPool") - .set_min_threads(5) - .set_max_threads(10) - .build(&_pool); + std::ignore = ThreadPoolBuilder("S3FileUploadThreadPool") + .set_min_threads(5) + .set_max_threads(10) + .build(&_pool); ExecEnv::GetInstance()->_s3_file_upload_thread_pool = std::move(_pool); S3Conf s3_conf; S3URI s3_uri(s3_location); @@ -563,6 +563,7 @@ TEST_F(RemoteFileSystemTest, TestS3FileSystem) { std::string download_content; CHECK_STATUS_OK(fs->direct_download(direct_remote_file, &download_content)); ASSERT_EQ("abc", download_content); + ExecEnv::GetInstance()->_s3_file_upload_thread_pool.reset(); } } // namespace doris diff --git a/be/test/io/fs/s3_file_writer_test.cpp b/be/test/io/fs/s3_file_writer_test.cpp index 7021346a704..6469559d0d8 100644 --- a/be/test/io/fs/s3_file_writer_test.cpp +++ b/be/test/io/fs/s3_file_writer_test.cpp @@ -306,6 +306,7 @@ public: sp->clear_call_back(mockcallback.point_name); }); sp->disable_processing(); + ExecEnv::GetInstance()->_s3_file_upload_thread_pool.reset(); } }; diff --git a/be/test/olap/rowset/beta_rowset_test.cpp b/be/test/olap/rowset/beta_rowset_test.cpp index 0c3001758f0..2e13436b3d3 100644 --- a/be/test/olap/rowset/beta_rowset_test.cpp +++ b/be/test/olap/rowset/beta_rowset_test.cpp @@ -236,6 +236,8 @@ TEST_F(BetaRowsetTest, ReadTest) { .region = "region", .ak = "ak", .sk = "sk", + .token = "", + .bucket = "", }}; std::string resource_id = "10000"; auto res = io::S3FileSystem::create(std::move(s3_conf), io::FileSystem::TMP_FS_ID); diff --git a/be/test/olap/rowset/unique_rowset_id_generator_test.cpp b/be/test/olap/rowset/unique_rowset_id_generator_test.cpp index aeb6d7bc8ae..7ef95e938e1 100644 --- a/be/test/olap/rowset/unique_rowset_id_generator_test.cpp +++ b/be/test/olap/rowset/unique_rowset_id_generator_test.cpp @@ -123,6 +123,7 @@ TEST_F(UniqueRowsetIdGeneratorTest, GenerateIdBenchmark) { hi <<= 56; RowsetId last_id = id_generator.next_id(); EXPECT_EQ(last_id.hi, hi + kNumThreads * kIdPerThread + 1); + pool.reset(); } } // namespace doris diff --git a/be/test/testutil/run_all_tests.cpp b/be/test/testutil/run_all_tests.cpp index 1208141a8fa..44cccb7fec1 100644 --- a/be/test/testutil/run_all_tests.cpp +++ b/be/test/testutil/run_all_tests.cpp @@ -50,6 +50,16 @@ int main(int argc, char** argv) { doris::ThreadLocalHandle::create_thread_local_if_not_exits(); doris::ExecEnv::GetInstance()->init_mem_tracker(); + // Used for unit test + std::unique_ptr<doris::ThreadPool> non_block_close_thread_pool; + + std::ignore = doris::ThreadPoolBuilder("NonBlockCloseThreadPool") + .set_min_threads(12) + .set_max_threads(48) + .build(&non_block_close_thread_pool); + doris::ExecEnv::GetInstance()->set_non_block_close_thread_pool( + std::move(non_block_close_thread_pool)); + doris::thread_context()->thread_mem_tracker_mgr->init(); std::shared_ptr<doris::MemTrackerLimiter> test_tracker = doris::MemTrackerLimiter::create_shared(doris::MemTrackerLimiter::Type::GLOBAL, @@ -96,5 +106,7 @@ int main(int argc, char** argv) { doris::ExecEnv::GetInstance()->set_tracking_memory(false); int res = RUN_ALL_TESTS(); + + doris::ExecEnv::GetInstance()->set_non_block_close_thread_pool(nullptr); return res; } diff --git a/be/test/util/countdown_latch_test.cpp b/be/test/util/countdown_latch_test.cpp index ff3c21c6e13..86999ca929d 100644 --- a/be/test/util/countdown_latch_test.cpp +++ b/be/test/util/countdown_latch_test.cpp @@ -59,6 +59,7 @@ TEST(TestCountDownLatch, TestLatch) { EXPECT_TRUE(pool->submit_func(std::bind(decrement_latch, &latch, 1000)).ok()); latch.wait(); EXPECT_EQ(0, latch.count()); + pool.reset(); } // Test that resetting to zero while there are waiters lets the waiters diff --git a/be/test/util/interval_histogram_test.cpp b/be/test/util/interval_histogram_test.cpp new file mode 100644 index 00000000000..aa63c360165 --- /dev/null +++ b/be/test/util/interval_histogram_test.cpp @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/interval_histogram.h" + +#include <gtest/gtest.h> + +#include <thread> + +namespace doris { + +TEST(IntervalHistogramStat, SerialTest) { + IntervalHistogramStat<int> stat(5); + + stat.add(10); + stat.add(20); + stat.add(30); + stat.add(40); + stat.add(50); + + EXPECT_EQ(stat.mean(), 30); + EXPECT_EQ(stat.median(), 30); + EXPECT_EQ(stat.max(), 50); + EXPECT_EQ(stat.min(), 10); + + // Make window move forward + stat.add(60); + stat.add(70); + + // window now contains [30, 40, 50, 60, 70] + EXPECT_EQ(stat.mean(), 50); + EXPECT_EQ(stat.median(), 50); + EXPECT_EQ(stat.max(), 70); + EXPECT_EQ(stat.min(), 30); +} + +TEST(IntervalHistogramStatTest, ParallelTest) { + constexpr int thread_count = 10; + constexpr int values_per_thread = 10; + IntervalHistogramStat<int> stat(thread_count * values_per_thread); + + auto add_values = [&stat](int start_value, int count) { + for (int i = 0; i < count; ++i) { + stat.add(start_value + i); + } + }; + + std::vector<std::thread> threads; + for (int i = 0; i < thread_count; ++i) { + threads.emplace_back(add_values, i * values_per_thread, values_per_thread); + } + + for (auto& thread : threads) { + thread.join(); + } + + int total_values = thread_count * values_per_thread; + EXPECT_EQ(stat.mean(), (total_values - 1) / 2); + EXPECT_EQ(stat.max(), total_values - 1); + EXPECT_EQ(stat.min(), 0); + EXPECT_EQ(stat.median(), (total_values - 1) / 2); +} + +} // namespace doris diff --git a/be/test/vec/aggregate_functions/agg_linear_histogram_test.cpp b/be/test/vec/aggregate_functions/agg_linear_histogram_test.cpp index 3dbf34a4dcb..fe94afe5118 100644 --- a/be/test/vec/aggregate_functions/agg_linear_histogram_test.cpp +++ b/be/test/vec/aggregate_functions/agg_linear_histogram_test.cpp @@ -204,8 +204,8 @@ public: << "(" << data_types[0]->get_name() << ")"; AggregateFunctionSimpleFactory factory = AggregateFunctionSimpleFactory::instance(); - auto agg_function = - factory.get("linear_histogram", data_types, false, -1, {.enable_decimal256 = true}); + auto agg_function = factory.get("linear_histogram", data_types, false, -1, + {.enable_decimal256 = true, .column_names = {""}}); EXPECT_NE(agg_function, nullptr); std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]); diff --git a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy index be54bec5e2e..b1bbae2e3ce 100644 --- a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy +++ b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy @@ -15,6 +15,16 @@ // specific language governing permissions and limitations // under the License. +def getMetrics = { ip, port -> + def dst = 'http://' + ip + ':' + port + def conn = new URL(dst + "/metrics").openConnection() + conn.setRequestMethod("GET") + def encoding = Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" + + (context.config.feHttpPassword == null ? "" : context.config.feHttpPassword)).getBytes("UTF-8")) + conn.setRequestProperty("Authorization", "Basic ${encoding}") + return conn.getInputStream().getText() + } + suite("test_crud_wlg") { def table_name = "wlg_test_table" def table_name2 = "wlg_test_table2" @@ -787,4 +797,50 @@ suite("test_crud_wlg") { sql "drop workload group if exists default_val_wg" + for (int i = 0; i < 20; i++) { + // 1. SHOW BACKENDS get be ip and http port + Map<String, String> backendId_to_backendIP = new HashMap<>(); + Map<String, String> backendId_to_backendHttpPort = new HashMap<>(); + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + // Print above maps in logger. + logger.info("backendId_to_backendIP: " + backendId_to_backendIP); + logger.info("backendId_to_backendHttpPort: " + backendId_to_backendHttpPort); + + // 2. CREATE WORKLOAD GROUP + sql "drop workload group if exists test_wg_metrics;" + sql "create workload group if not exists test_wg_metrics " + + "properties ( " + + " 'cpu_share'='10', " + + " 'memory_limit'='10%', " + + " 'enable_memory_overcommit'='true' " + + ");" + sql "set workload_group=test_wg_metrics;" + wg = sql("select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num,tag,read_bytes_per_second,remote_read_bytes_per_second from information_schema.workload_groups where name = 'test_wg_metrics' order by name;"); + logger.info("wg: " + wg); + + // 3. EXECUTE A QUERY SO THAT THE WORKLOAD GROUP IS USED + sql "select count(*) from numbers(\"number\"=\"100\");" + + // curl backend http port to get metrics + // get first backendId + backendId = backendId_to_backendIP.keySet().iterator().next(); + backendIP = backendId_to_backendIP.get(backendId); + backendHttpPort = backendId_to_backendHttpPort.get(backendId); + logger.info("backendId: " + backendId + ", backendIP: " + backendIP + ", backendHttpPort: " + backendHttpPort); + + // Create a for loop to get metrics 5 times + for (int j = 0; j < 5; j++) { + String metrics = getMetrics(backendIP, backendHttpPort); + String filteredMetrics = metrics.split('\n').findAll { line -> + line.startsWith('doris_be_thread_pool') && line.contains('workload_group="test_wg_metrics"') && line.contains('thread_pool_name="Scan_test_wg_metrics"') + }.join('\n') + // Filter metrics with name test_wg_metrics + logger.info("filteredMetrics: " + filteredMetrics); + List<String> lines = filteredMetrics.split('\n').findAll { it.trim() } + assert lines.size() == 5 + } + + sql "drop workload group if exists test_wg_metrics;" + } + sql "drop workload group if exists test_wg_metrics;" } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org