This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new d83c928123a [enhancement](metrics)  enhance visibility of flush thread 
pool (#26544) (#26819)
d83c928123a is described below

commit d83c928123a4fadf2a86d690b88a754dbc57bbe9
Author: Siyang Tang <82279870+tangsiyang2...@users.noreply.github.com>
AuthorDate: Sun Nov 12 12:15:43 2023 +0800

    [enhancement](metrics)  enhance visibility of flush thread pool (#26544) 
(#26819)
---
 be/src/olap/memtable_flush_executor.cpp    | 39 ++++++++++++++++++------
 be/src/olap/memtable_flush_executor.h      |  9 ++++--
 be/src/util/doris_metrics.h                | 10 +++++++
 be/src/vec/exec/scan/scanner_scheduler.cpp | 48 ++++++++++++++++++++++++++----
 be/src/vec/exec/scan/scanner_scheduler.h   |  5 +++-
 5 files changed, 92 insertions(+), 19 deletions(-)

diff --git a/be/src/olap/memtable_flush_executor.cpp 
b/be/src/olap/memtable_flush_executor.cpp
index 57c2efb5294..13952697a16 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -18,29 +18,37 @@
 #include "olap/memtable_flush_executor.h"
 
 #include <gen_cpp/olap_file.pb.h>
-#include <stddef.h>
 
 #include <algorithm>
+#include <cstddef>
 #include <ostream>
 
 #include "common/config.h"
 #include "common/logging.h"
 #include "olap/memtable.h"
-#include "util/stopwatch.hpp"
-#include "util/time.h"
+#include "olap/rowset/rowset_writer.h"
+#include "util/doris_metrics.h"
+#include "util/metrics.h"
 
 namespace doris {
 using namespace ErrorCode;
 
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(flush_thread_pool_queue_size, 
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(flush_thread_pool_thread_num, 
MetricUnit::NOUNIT);
+
+bvar::Adder<int64_t> g_flush_task_num("memtable_flush_task_num");
+
 class MemtableFlushTask final : public Runnable {
 public:
     MemtableFlushTask(FlushToken* flush_token, std::unique_ptr<MemTable> 
memtable,
                       int64_t submit_task_time)
             : _flush_token(flush_token),
               _memtable(std::move(memtable)),
-              _submit_task_time(submit_task_time) {}
+              _submit_task_time(submit_task_time) {
+        g_flush_task_num << 1;
+    }
 
-    ~MemtableFlushTask() override = default;
+    ~MemtableFlushTask() override { g_flush_task_num << -1; }
 
     void run() override {
         _flush_token->_flush_memtable(_memtable.get(), _submit_task_time);
@@ -144,10 +152,11 @@ void MemTableFlushExecutor::init(const 
std::vector<DataDir*>& data_dirs) {
 
     min_threads = std::max(1, 
config::high_priority_flush_thread_num_per_store);
     max_threads = data_dir_num * min_threads;
-    ThreadPoolBuilder("MemTableHighPriorityFlushThreadPool")
-            .set_min_threads(min_threads)
-            .set_max_threads(max_threads)
-            .build(&_high_prio_flush_pool);
+    static_cast<void>(ThreadPoolBuilder("MemTableHighPriorityFlushThreadPool")
+                              .set_min_threads(min_threads)
+                              .set_max_threads(max_threads)
+                              .build(&_high_prio_flush_pool));
+    _register_metrics();
 }
 
 // NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are 
flushed in order.
@@ -178,4 +187,16 @@ Status 
MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>* fl
     return Status::OK();
 }
 
+void MemTableFlushExecutor::_register_metrics() {
+    REGISTER_HOOK_METRIC(flush_thread_pool_queue_size,
+                         [this]() { return _flush_pool->get_queue_size(); });
+    REGISTER_HOOK_METRIC(flush_thread_pool_thread_num,
+                         [this]() { return _flush_pool->num_threads(); })
+}
+
+void MemTableFlushExecutor::_deregister_metrics() {
+    DEREGISTER_HOOK_METRIC(flush_thread_pool_queue_size);
+    DEREGISTER_HOOK_METRIC(flush_thread_pool_thread_num);
+}
+
 } // namespace doris
diff --git a/be/src/olap/memtable_flush_executor.h 
b/be/src/olap/memtable_flush_executor.h
index 181b63de729..4c8a654c08c 100644
--- a/be/src/olap/memtable_flush_executor.h
+++ b/be/src/olap/memtable_flush_executor.h
@@ -17,9 +17,8 @@
 
 #pragma once
 
-#include <stdint.h>
-
 #include <atomic>
+#include <cstdint>
 #include <iosfwd>
 #include <memory>
 #include <utility>
@@ -97,8 +96,9 @@ private:
 //      ...
 class MemTableFlushExecutor {
 public:
-    MemTableFlushExecutor() {}
+    MemTableFlushExecutor() = default;
     ~MemTableFlushExecutor() {
+        _deregister_metrics();
         _flush_pool->shutdown();
         _high_prio_flush_pool->shutdown();
     }
@@ -111,6 +111,9 @@ public:
                               bool should_serial, bool is_high_priority);
 
 private:
+    void _register_metrics();
+    static void _deregister_metrics();
+
     std::unique_ptr<ThreadPool> _flush_pool;
     std::unique_ptr<ThreadPool> _high_prio_flush_pool;
 };
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 646a4449c0a..023cf75f5b4 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -227,6 +227,16 @@ public:
     UIntGauge* heavy_work_max_threads;
     UIntGauge* light_work_max_threads;
 
+    UIntGauge* flush_thread_pool_queue_size;
+    UIntGauge* flush_thread_pool_thread_num;
+
+    UIntGauge* local_scan_thread_pool_queue_size;
+    UIntGauge* local_scan_thread_pool_thread_num;
+    UIntGauge* remote_scan_thread_pool_queue_size;
+    UIntGauge* remote_scan_thread_pool_thread_num;
+    UIntGauge* limited_scan_thread_pool_queue_size;
+    UIntGauge* limited_scan_thread_pool_thread_num;
+
     static DorisMetrics* instance() {
         static DorisMetrics instance;
         return &instance;
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 504c46c4016..f63b04692fd 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -17,11 +17,11 @@
 
 #include "scanner_scheduler.h"
 
-#include <stdint.h>
-
 #include <algorithm>
+#include <cstdint>
 #include <functional>
 #include <list>
+#include <memory>
 #include <ostream>
 #include <string>
 #include <typeinfo>
@@ -40,6 +40,7 @@
 #include "util/blocking_queue.hpp"
 #include "util/cpu_info.h"
 #include "util/defer_op.h"
+#include "util/doris_metrics.h"
 #include "util/runtime_profile.h"
 #include "util/thread.h"
 #include "util/threadpool.h"
@@ -53,6 +54,13 @@
 
 namespace doris::vectorized {
 
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(local_scan_thread_pool_queue_size, 
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(local_scan_thread_pool_thread_num, 
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(remote_scan_thread_pool_queue_size, 
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(remote_scan_thread_pool_thread_num, 
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(limited_scan_thread_pool_queue_size, 
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(limited_scan_thread_pool_thread_num, 
MetricUnit::NOUNIT);
+
 ScannerScheduler::ScannerScheduler() = default;
 
 ScannerScheduler::~ScannerScheduler() {
@@ -66,6 +74,8 @@ ScannerScheduler::~ScannerScheduler() {
 
     _is_closed = true;
 
+    _deregister_metrics();
+
     _scheduler_pool->shutdown();
     _local_scan_thread_pool->shutdown();
     _remote_scan_thread_pool->shutdown();
@@ -94,9 +104,9 @@ Status ScannerScheduler::init(ExecEnv* env) {
     }
 
     // 2. local scan thread pool
-    _local_scan_thread_pool.reset(
-            new 
PriorityThreadPool(config::doris_scanner_thread_pool_thread_num,
-                                   
config::doris_scanner_thread_pool_queue_size, "local_scan"));
+    _local_scan_thread_pool = std::make_unique<PriorityThreadPool>(
+            config::doris_scanner_thread_pool_thread_num,
+            config::doris_scanner_thread_pool_queue_size, "local_scan");
 
     // 3. remote scan thread pool
     _remote_thread_pool_max_size = 
config::doris_max_remote_scanner_thread_pool_thread_num != -1
@@ -115,6 +125,8 @@ Status ScannerScheduler::init(ExecEnv* env) {
             .set_max_queue_size(config::doris_scanner_thread_pool_queue_size)
             .build(&_limited_scan_thread_pool);
 
+    _register_metrics();
+
     _is_init = true;
     return Status::OK();
 }
@@ -152,7 +164,7 @@ void ScannerScheduler::_schedule_thread(int queue_id) {
 }
 
 [[maybe_unused]] static void* run_scanner_bthread(void* arg) {
-    auto f = reinterpret_cast<std::function<void()>*>(arg);
+    auto* f = reinterpret_cast<std::function<void()>*>(arg);
     (*f)();
     delete f;
     return nullptr;
@@ -402,4 +414,28 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* 
scheduler, ScannerContext
     ctx->push_back_scanner_and_reschedule(scanner);
 }
 
+void ScannerScheduler::_register_metrics() {
+    REGISTER_HOOK_METRIC(local_scan_thread_pool_queue_size,
+                         [this]() { return 
_local_scan_thread_pool->get_queue_size(); });
+    REGISTER_HOOK_METRIC(local_scan_thread_pool_thread_num,
+                         [this]() { return 
_local_scan_thread_pool->get_active_threads(); });
+    REGISTER_HOOK_METRIC(remote_scan_thread_pool_queue_size,
+                         [this]() { return 
_remote_scan_thread_pool->get_queue_size(); });
+    REGISTER_HOOK_METRIC(remote_scan_thread_pool_thread_num,
+                         [this]() { return 
_remote_scan_thread_pool->num_threads(); });
+    REGISTER_HOOK_METRIC(limited_scan_thread_pool_queue_size,
+                         [this]() { return 
_limited_scan_thread_pool->get_queue_size(); });
+    REGISTER_HOOK_METRIC(limited_scan_thread_pool_thread_num,
+                         [this]() { return 
_limited_scan_thread_pool->num_threads(); });
+}
+
+void ScannerScheduler::_deregister_metrics() {
+    DEREGISTER_HOOK_METRIC(local_scan_thread_pool_queue_size);
+    DEREGISTER_HOOK_METRIC(local_scan_thread_pool_thread_num);
+    DEREGISTER_HOOK_METRIC(remote_scan_thread_pool_queue_size);
+    DEREGISTER_HOOK_METRIC(remote_scan_thread_pool_thread_num);
+    DEREGISTER_HOOK_METRIC(limited_scan_thread_pool_queue_size);
+    DEREGISTER_HOOK_METRIC(limited_scan_thread_pool_thread_num);
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h 
b/be/src/vec/exec/scan/scanner_scheduler.h
index 81dcf5f8b76..df81fcf8b47 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -77,7 +77,10 @@ private:
     // execution thread function
     void _scanner_scan(ScannerScheduler* scheduler, ScannerContext* ctx, 
VScannerSPtr scanner);
 
-private:
+    void _register_metrics();
+
+    static void _deregister_metrics();
+
     // Scheduling queue number.
     // TODO: make it configurable.
     static const int QUEUE_NUM = 4;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to