This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new d83c928123a [enhancement](metrics) enhance visibility of flush thread pool (#26544) (#26819) d83c928123a is described below commit d83c928123a4fadf2a86d690b88a754dbc57bbe9 Author: Siyang Tang <82279870+tangsiyang2...@users.noreply.github.com> AuthorDate: Sun Nov 12 12:15:43 2023 +0800 [enhancement](metrics) enhance visibility of flush thread pool (#26544) (#26819) --- be/src/olap/memtable_flush_executor.cpp | 39 ++++++++++++++++++------ be/src/olap/memtable_flush_executor.h | 9 ++++-- be/src/util/doris_metrics.h | 10 +++++++ be/src/vec/exec/scan/scanner_scheduler.cpp | 48 ++++++++++++++++++++++++++---- be/src/vec/exec/scan/scanner_scheduler.h | 5 +++- 5 files changed, 92 insertions(+), 19 deletions(-) diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp index 57c2efb5294..13952697a16 100644 --- a/be/src/olap/memtable_flush_executor.cpp +++ b/be/src/olap/memtable_flush_executor.cpp @@ -18,29 +18,37 @@ #include "olap/memtable_flush_executor.h" #include <gen_cpp/olap_file.pb.h> -#include <stddef.h> #include <algorithm> +#include <cstddef> #include <ostream> #include "common/config.h" #include "common/logging.h" #include "olap/memtable.h" -#include "util/stopwatch.hpp" -#include "util/time.h" +#include "olap/rowset/rowset_writer.h" +#include "util/doris_metrics.h" +#include "util/metrics.h" namespace doris { using namespace ErrorCode; +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(flush_thread_pool_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(flush_thread_pool_thread_num, MetricUnit::NOUNIT); + +bvar::Adder<int64_t> g_flush_task_num("memtable_flush_task_num"); + class MemtableFlushTask final : public Runnable { public: MemtableFlushTask(FlushToken* flush_token, std::unique_ptr<MemTable> memtable, int64_t submit_task_time) : _flush_token(flush_token), _memtable(std::move(memtable)), - _submit_task_time(submit_task_time) {} + _submit_task_time(submit_task_time) { + g_flush_task_num << 1; + } - ~MemtableFlushTask() override = default; + ~MemtableFlushTask() override { g_flush_task_num << -1; } void run() override { _flush_token->_flush_memtable(_memtable.get(), _submit_task_time); @@ -144,10 +152,11 @@ void MemTableFlushExecutor::init(const std::vector<DataDir*>& data_dirs) { min_threads = std::max(1, config::high_priority_flush_thread_num_per_store); max_threads = data_dir_num * min_threads; - ThreadPoolBuilder("MemTableHighPriorityFlushThreadPool") - .set_min_threads(min_threads) - .set_max_threads(max_threads) - .build(&_high_prio_flush_pool); + static_cast<void>(ThreadPoolBuilder("MemTableHighPriorityFlushThreadPool") + .set_min_threads(min_threads) + .set_max_threads(max_threads) + .build(&_high_prio_flush_pool)); + _register_metrics(); } // NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are flushed in order. @@ -178,4 +187,16 @@ Status MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>* fl return Status::OK(); } +void MemTableFlushExecutor::_register_metrics() { + REGISTER_HOOK_METRIC(flush_thread_pool_queue_size, + [this]() { return _flush_pool->get_queue_size(); }); + REGISTER_HOOK_METRIC(flush_thread_pool_thread_num, + [this]() { return _flush_pool->num_threads(); }) +} + +void MemTableFlushExecutor::_deregister_metrics() { + DEREGISTER_HOOK_METRIC(flush_thread_pool_queue_size); + DEREGISTER_HOOK_METRIC(flush_thread_pool_thread_num); +} + } // namespace doris diff --git a/be/src/olap/memtable_flush_executor.h b/be/src/olap/memtable_flush_executor.h index 181b63de729..4c8a654c08c 100644 --- a/be/src/olap/memtable_flush_executor.h +++ b/be/src/olap/memtable_flush_executor.h @@ -17,9 +17,8 @@ #pragma once -#include <stdint.h> - #include <atomic> +#include <cstdint> #include <iosfwd> #include <memory> #include <utility> @@ -97,8 +96,9 @@ private: // ... class MemTableFlushExecutor { public: - MemTableFlushExecutor() {} + MemTableFlushExecutor() = default; ~MemTableFlushExecutor() { + _deregister_metrics(); _flush_pool->shutdown(); _high_prio_flush_pool->shutdown(); } @@ -111,6 +111,9 @@ public: bool should_serial, bool is_high_priority); private: + void _register_metrics(); + static void _deregister_metrics(); + std::unique_ptr<ThreadPool> _flush_pool; std::unique_ptr<ThreadPool> _high_prio_flush_pool; }; diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index 646a4449c0a..023cf75f5b4 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -227,6 +227,16 @@ public: UIntGauge* heavy_work_max_threads; UIntGauge* light_work_max_threads; + UIntGauge* flush_thread_pool_queue_size; + UIntGauge* flush_thread_pool_thread_num; + + UIntGauge* local_scan_thread_pool_queue_size; + UIntGauge* local_scan_thread_pool_thread_num; + UIntGauge* remote_scan_thread_pool_queue_size; + UIntGauge* remote_scan_thread_pool_thread_num; + UIntGauge* limited_scan_thread_pool_queue_size; + UIntGauge* limited_scan_thread_pool_thread_num; + static DorisMetrics* instance() { static DorisMetrics instance; return &instance; diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 504c46c4016..f63b04692fd 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -17,11 +17,11 @@ #include "scanner_scheduler.h" -#include <stdint.h> - #include <algorithm> +#include <cstdint> #include <functional> #include <list> +#include <memory> #include <ostream> #include <string> #include <typeinfo> @@ -40,6 +40,7 @@ #include "util/blocking_queue.hpp" #include "util/cpu_info.h" #include "util/defer_op.h" +#include "util/doris_metrics.h" #include "util/runtime_profile.h" #include "util/thread.h" #include "util/threadpool.h" @@ -53,6 +54,13 @@ namespace doris::vectorized { +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(local_scan_thread_pool_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(local_scan_thread_pool_thread_num, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(remote_scan_thread_pool_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(remote_scan_thread_pool_thread_num, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(limited_scan_thread_pool_queue_size, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(limited_scan_thread_pool_thread_num, MetricUnit::NOUNIT); + ScannerScheduler::ScannerScheduler() = default; ScannerScheduler::~ScannerScheduler() { @@ -66,6 +74,8 @@ ScannerScheduler::~ScannerScheduler() { _is_closed = true; + _deregister_metrics(); + _scheduler_pool->shutdown(); _local_scan_thread_pool->shutdown(); _remote_scan_thread_pool->shutdown(); @@ -94,9 +104,9 @@ Status ScannerScheduler::init(ExecEnv* env) { } // 2. local scan thread pool - _local_scan_thread_pool.reset( - new PriorityThreadPool(config::doris_scanner_thread_pool_thread_num, - config::doris_scanner_thread_pool_queue_size, "local_scan")); + _local_scan_thread_pool = std::make_unique<PriorityThreadPool>( + config::doris_scanner_thread_pool_thread_num, + config::doris_scanner_thread_pool_queue_size, "local_scan"); // 3. remote scan thread pool _remote_thread_pool_max_size = config::doris_max_remote_scanner_thread_pool_thread_num != -1 @@ -115,6 +125,8 @@ Status ScannerScheduler::init(ExecEnv* env) { .set_max_queue_size(config::doris_scanner_thread_pool_queue_size) .build(&_limited_scan_thread_pool); + _register_metrics(); + _is_init = true; return Status::OK(); } @@ -152,7 +164,7 @@ void ScannerScheduler::_schedule_thread(int queue_id) { } [[maybe_unused]] static void* run_scanner_bthread(void* arg) { - auto f = reinterpret_cast<std::function<void()>*>(arg); + auto* f = reinterpret_cast<std::function<void()>*>(arg); (*f)(); delete f; return nullptr; @@ -402,4 +414,28 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext ctx->push_back_scanner_and_reschedule(scanner); } +void ScannerScheduler::_register_metrics() { + REGISTER_HOOK_METRIC(local_scan_thread_pool_queue_size, + [this]() { return _local_scan_thread_pool->get_queue_size(); }); + REGISTER_HOOK_METRIC(local_scan_thread_pool_thread_num, + [this]() { return _local_scan_thread_pool->get_active_threads(); }); + REGISTER_HOOK_METRIC(remote_scan_thread_pool_queue_size, + [this]() { return _remote_scan_thread_pool->get_queue_size(); }); + REGISTER_HOOK_METRIC(remote_scan_thread_pool_thread_num, + [this]() { return _remote_scan_thread_pool->num_threads(); }); + REGISTER_HOOK_METRIC(limited_scan_thread_pool_queue_size, + [this]() { return _limited_scan_thread_pool->get_queue_size(); }); + REGISTER_HOOK_METRIC(limited_scan_thread_pool_thread_num, + [this]() { return _limited_scan_thread_pool->num_threads(); }); +} + +void ScannerScheduler::_deregister_metrics() { + DEREGISTER_HOOK_METRIC(local_scan_thread_pool_queue_size); + DEREGISTER_HOOK_METRIC(local_scan_thread_pool_thread_num); + DEREGISTER_HOOK_METRIC(remote_scan_thread_pool_queue_size); + DEREGISTER_HOOK_METRIC(remote_scan_thread_pool_thread_num); + DEREGISTER_HOOK_METRIC(limited_scan_thread_pool_queue_size); + DEREGISTER_HOOK_METRIC(limited_scan_thread_pool_thread_num); +} + } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index 81dcf5f8b76..df81fcf8b47 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -77,7 +77,10 @@ private: // execution thread function void _scanner_scan(ScannerScheduler* scheduler, ScannerContext* ctx, VScannerSPtr scanner); -private: + void _register_metrics(); + + static void _deregister_metrics(); + // Scheduling queue number. // TODO: make it configurable. static const int QUEUE_NUM = 4; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org