This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 673f4628eb6 [feat](metrics) Unify metrics of thread pool (#43144)
673f4628eb6 is described below

commit 673f4628eb645558c717fc2ea620e469ab96ecbb
Author: zhiqiang <hezhiqi...@selectdb.com>
AuthorDate: Wed Jan 1 16:06:59 2025 +0800

    [feat](metrics) Unify metrics of thread pool (#43144)
    
    ### What problem does this PR solve?
    
    Add metrics for all thread pool, more specifically, for all ThreadPool
    objects.
    All thread pool will have following metrics:
    1. thread_pool_active_threads
    2. thread_pool_queue_size
    3. thread_pool_max_queue_size
    4. thread_pool_max_threads
    5. task_execution_time_ns_avg_in_last_1000_times
    6. task_wait_worker_ns_avg_in_last_1000_times
    
    A new class `IntervalHistogramStat` is created for interval histogram
    calculation.
    
    Metrics is updated by `hook` method when they are needed by prometheus.
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 be/src/olap/memtable_flush_executor.cpp            | 16 -----
 be/src/olap/memtable_flush_executor.h              |  4 --
 be/src/runtime/exec_env.h                          |  3 +
 be/src/runtime/exec_env_init.cpp                   | 42 +-----------
 be/src/runtime/fragment_mgr.cpp                    | 10 +--
 be/src/runtime/workload_group/workload_group.cpp   |  4 +-
 be/src/util/doris_metrics.cpp                      |  6 --
 be/src/util/doris_metrics.h                        | 22 ------
 be/src/util/interval_histogram.cpp                 | 80 ++++++++++++++++++++++
 be/src/util/interval_histogram.h                   | 46 +++++++++++++
 be/src/util/metrics.cpp                            |  5 +-
 be/src/util/threadpool.cpp                         | 69 +++++++++++++++++--
 be/src/util/threadpool.h                           | 30 +++++++-
 be/src/vec/exec/scan/scanner_scheduler.cpp         | 54 +--------------
 be/src/vec/exec/scan/scanner_scheduler.h           | 19 +++--
 be/test/io/fs/buffered_reader_test.cpp             | 12 ++--
 be/test/io/fs/remote_file_system_test.cpp          |  9 +--
 be/test/io/fs/s3_file_writer_test.cpp              |  1 +
 be/test/olap/rowset/beta_rowset_test.cpp           |  2 +
 .../rowset/unique_rowset_id_generator_test.cpp     |  1 +
 be/test/testutil/run_all_tests.cpp                 | 12 ++++
 be/test/util/countdown_latch_test.cpp              |  1 +
 be/test/util/interval_histogram_test.cpp           | 78 +++++++++++++++++++++
 .../agg_linear_histogram_test.cpp                  |  4 +-
 .../workload_manager_p0/test_curd_wlg.groovy       | 56 +++++++++++++++
 25 files changed, 402 insertions(+), 184 deletions(-)

diff --git a/be/src/olap/memtable_flush_executor.cpp 
b/be/src/olap/memtable_flush_executor.cpp
index 5cdb45281b9..0181cc1d64d 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -37,9 +37,6 @@
 namespace doris {
 using namespace ErrorCode;
 
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(flush_thread_pool_queue_size, 
MetricUnit::NOUNIT);
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(flush_thread_pool_thread_num, 
MetricUnit::NOUNIT);
-
 bvar::Adder<int64_t> g_flush_task_num("memtable_flush_task_num");
 
 class MemtableFlushTask final : public Runnable {
@@ -239,7 +236,6 @@ void MemTableFlushExecutor::init(int num_disk) {
                               .set_min_threads(min_threads)
                               .set_max_threads(max_threads)
                               .build(&_high_prio_flush_pool));
-    _register_metrics();
 }
 
 // NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are 
flushed in order.
@@ -263,16 +259,4 @@ Status 
MemTableFlushExecutor::create_flush_token(std::shared_ptr<FlushToken>& fl
     }
 }
 
-void MemTableFlushExecutor::_register_metrics() {
-    REGISTER_HOOK_METRIC(flush_thread_pool_queue_size,
-                         [this]() { return _flush_pool->get_queue_size(); });
-    REGISTER_HOOK_METRIC(flush_thread_pool_thread_num,
-                         [this]() { return _flush_pool->num_threads(); })
-}
-
-void MemTableFlushExecutor::_deregister_metrics() {
-    DEREGISTER_HOOK_METRIC(flush_thread_pool_queue_size);
-    DEREGISTER_HOOK_METRIC(flush_thread_pool_thread_num);
-}
-
 } // namespace doris
diff --git a/be/src/olap/memtable_flush_executor.h 
b/be/src/olap/memtable_flush_executor.h
index 27e8e8a9b0e..753f1106646 100644
--- a/be/src/olap/memtable_flush_executor.h
+++ b/be/src/olap/memtable_flush_executor.h
@@ -127,7 +127,6 @@ class MemTableFlushExecutor {
 public:
     MemTableFlushExecutor() = default;
     ~MemTableFlushExecutor() {
-        _deregister_metrics();
         _flush_pool->shutdown();
         _high_prio_flush_pool->shutdown();
     }
@@ -141,9 +140,6 @@ public:
                               std::shared_ptr<WorkloadGroup> wg_sptr);
 
 private:
-    void _register_metrics();
-    static void _deregister_metrics();
-
     std::unique_ptr<ThreadPool> _flush_pool;
     std::unique_ptr<ThreadPool> _high_prio_flush_pool;
 };
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 0c9a4158ebc..12d625da3bf 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -300,6 +300,9 @@ public:
         _s_tracking_memory.store(tracking_memory, std::memory_order_release);
     }
     void set_orc_memory_pool(orc::MemoryPool* pool) { _orc_memory_pool = pool; 
}
+    void set_non_block_close_thread_pool(std::unique_ptr<ThreadPool>&& pool) {
+        _non_block_close_thread_pool = std::move(pool);
+    }
 #endif
     LoadStreamMapPool* load_stream_map_pool() { return 
_load_stream_map_pool.get(); }
 
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index df66315ff05..f0d8253254f 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -118,31 +118,10 @@
 #include "runtime/memory/tcmalloc_hook.h"
 #endif
 
-// Used for unit test
-namespace {
-std::once_flag flag;
-std::unique_ptr<doris::ThreadPool> non_block_close_thread_pool;
-void init_threadpool_for_test() {
-    static_cast<void>(doris::ThreadPoolBuilder("NonBlockCloseThreadPool")
-                              .set_min_threads(12)
-                              .set_max_threads(48)
-                              .build(&non_block_close_thread_pool));
-}
-
-[[maybe_unused]] doris::ThreadPool* get_non_block_close_thread_pool() {
-    std::call_once(flag, init_threadpool_for_test);
-    return non_block_close_thread_pool.get();
-}
-} // namespace
-
 namespace doris {
 class PBackendService_Stub;
 class PFunctionService_Stub;
 
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(scanner_thread_pool_queue_size, 
MetricUnit::NOUNIT);
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_thread_num, 
MetricUnit::NOUNIT);
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_queue_size, 
MetricUnit::NOUNIT);
-
 static void init_doris_metrics(const std::vector<StorePath>& store_paths) {
     bool init_system_metrics = config::enable_system_metrics;
     std::set<std::string> disk_devices;
@@ -178,11 +157,7 @@ static pair<size_t, size_t> get_num_threads(size_t 
min_num, size_t max_num) {
 }
 
 ThreadPool* ExecEnv::non_block_close_thread_pool() {
-#ifdef BE_TEST
-    return get_non_block_close_thread_pool();
-#else
     return _non_block_close_thread_pool.get();
-#endif
 }
 
 ExecEnv::ExecEnv() = default;
@@ -342,7 +317,6 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths,
     RETURN_IF_ERROR(_load_channel_mgr->init(MemInfo::mem_limit()));
     RETURN_IF_ERROR(_wal_manager->init());
     _heartbeat_flags = new HeartbeatFlags();
-    _register_metrics();
 
     _tablet_schema_cache =
             
TabletSchemaCache::create_global_schema_cache(config::tablet_schema_cache_capacity);
@@ -672,19 +646,6 @@ Status ExecEnv::_check_deploy_mode() {
     return Status::OK();
 }
 
-void ExecEnv::_register_metrics() {
-    REGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num,
-                         [this]() { return 
_send_batch_thread_pool->num_threads(); });
-
-    REGISTER_HOOK_METRIC(send_batch_thread_pool_queue_size,
-                         [this]() { return 
_send_batch_thread_pool->get_queue_size(); });
-}
-
-void ExecEnv::_deregister_metrics() {
-    DEREGISTER_HOOK_METRIC(scanner_thread_pool_queue_size);
-    DEREGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num);
-    DEREGISTER_HOOK_METRIC(send_batch_thread_pool_queue_size);
-}
 #ifdef BE_TEST
 void ExecEnv::set_new_load_stream_mgr(std::unique_ptr<NewLoadStreamMgr>&& 
new_load_stream_mgr) {
     this->_new_load_stream_mgr = std::move(new_load_stream_mgr);
@@ -740,6 +701,7 @@ void ExecEnv::destroy() {
     SAFE_STOP(_fragment_mgr);
     SAFE_STOP(_runtime_filter_timer_queue);
     // NewLoadStreamMgr should be destoried before storage_engine & after 
fragment_mgr stopped.
+    _load_stream_mgr.reset();
     _new_load_stream_mgr.reset();
     _stream_load_executor.reset();
     _memtable_memory_limiter.reset();
@@ -762,8 +724,8 @@ void ExecEnv::destroy() {
     SAFE_SHUTDOWN(_non_block_close_thread_pool);
     SAFE_SHUTDOWN(_s3_file_system_thread_pool);
     SAFE_SHUTDOWN(_send_batch_thread_pool);
+    SAFE_SHUTDOWN(_send_table_stats_thread_pool);
 
-    _deregister_metrics();
     SAFE_DELETE(_load_channel_mgr);
 
     SAFE_DELETE(_spill_stream_mgr);
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 19e8f76366c..60b7856d6aa 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -97,8 +97,7 @@ namespace doris {
 
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_instance_count, 
MetricUnit::NOUNIT);
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(timeout_canceled_fragment_count, 
MetricUnit::NOUNIT);
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_thread_pool_queue_size, 
MetricUnit::NOUNIT);
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_thread_pool_num_active_threads, 
MetricUnit::NOUNIT);
+
 bvar::LatencyRecorder g_fragmentmgr_prepare_latency("doris_FragmentMgr", 
"prepare");
 
 bvar::Adder<uint64_t> g_fragment_executing_count("fragment_executing_count");
@@ -243,11 +242,6 @@ FragmentMgr::FragmentMgr(ExecEnv* exec_env)
                 
.set_max_threads(config::fragment_mgr_asynic_work_pool_thread_num_max)
                 
.set_max_queue_size(config::fragment_mgr_asynic_work_pool_queue_size)
                 .build(&_thread_pool);
-
-    REGISTER_HOOK_METRIC(fragment_thread_pool_queue_size,
-                         [this]() { return _thread_pool->get_queue_size(); });
-    REGISTER_HOOK_METRIC(fragment_thread_pool_num_active_threads,
-                         [this]() { return _thread_pool->num_active_threads(); 
});
     CHECK(s.ok()) << s.to_string();
 }
 
@@ -255,8 +249,6 @@ FragmentMgr::~FragmentMgr() = default;
 
 void FragmentMgr::stop() {
     DEREGISTER_HOOK_METRIC(fragment_instance_count);
-    DEREGISTER_HOOK_METRIC(fragment_thread_pool_queue_size);
-    DEREGISTER_HOOK_METRIC(fragment_thread_pool_num_active_threads);
     _stop_background_threads_latch.count_down();
     if (_cancel_thread) {
         _cancel_thread->join();
diff --git a/be/src/runtime/workload_group/workload_group.cpp 
b/be/src/runtime/workload_group/workload_group.cpp
index 6b9388af30a..3ceeed2de19 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -487,7 +487,7 @@ void 
WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
     if (_scan_task_sched == nullptr) {
         std::unique_ptr<vectorized::SimplifiedScanScheduler> scan_scheduler =
                 std::make_unique<vectorized::SimplifiedScanScheduler>("Scan_" 
+ wg_name,
-                                                                      
cg_cpu_ctl_ptr);
+                                                                      
cg_cpu_ctl_ptr, wg_name);
         Status ret = 
scan_scheduler->start(config::doris_scanner_thread_pool_thread_num,
                                            
config::doris_scanner_thread_pool_thread_num,
                                            
config::doris_scanner_thread_pool_queue_size);
@@ -507,7 +507,7 @@ void 
WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
                 
vectorized::ScannerScheduler::get_remote_scan_thread_queue_size();
         std::unique_ptr<vectorized::SimplifiedScanScheduler> 
remote_scan_scheduler =
                 std::make_unique<vectorized::SimplifiedScanScheduler>("RScan_" 
+ wg_name,
-                                                                      
cg_cpu_ctl_ptr);
+                                                                      
cg_cpu_ctl_ptr, wg_name);
         Status ret = remote_scan_scheduler->start(remote_max_thread_num,
                                                   
config::doris_scanner_min_thread_pool_thread_num,
                                                   
remote_scan_thread_queue_size);
diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp
index e77ee1c36b6..39f246d98d3 100644
--- a/be/src/util/doris_metrics.cpp
+++ b/be/src/util/doris_metrics.cpp
@@ -191,9 +191,6 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(query_ctx_cnt, 
MetricUnit::NOUNIT);
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_ctx_cnt, MetricUnit::NOUNIT);
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_cnt, MetricUnit::NOUNIT);
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_cnt, MetricUnit::NOUNIT);
-DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_queued, MetricUnit::NOUNIT);
-DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_running, MetricUnit::NOUNIT);
-DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(scanner_task_submit_failed, 
MetricUnit::NOUNIT);
 
 const std::string DorisMetrics::_s_registry_name = "doris_be";
 const std::string DorisMetrics::_s_hook_name = "doris_metrics";
@@ -319,9 +316,6 @@ DorisMetrics::DorisMetrics() : 
_metric_registry(_s_registry_name) {
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_ctx_cnt);
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_cnt);
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_cnt);
-    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_queued);
-    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, scanner_task_running);
-    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
scanner_task_submit_failed);
 }
 
 void DorisMetrics::initialize(bool init_system_metrics, const 
std::set<std::string>& disk_devices,
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index d089758c21c..26bad02fdd2 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -197,13 +197,6 @@ public:
     UIntGauge* query_cache_sql_total_count = nullptr;
     UIntGauge* query_cache_partition_total_count = nullptr;
 
-    UIntGauge* scanner_thread_pool_queue_size = nullptr;
-    UIntGauge* add_batch_task_queue_size = nullptr;
-    UIntGauge* send_batch_thread_pool_thread_num = nullptr;
-    UIntGauge* send_batch_thread_pool_queue_size = nullptr;
-    UIntGauge* fragment_thread_pool_queue_size = nullptr;
-    UIntGauge* fragment_thread_pool_num_active_threads = nullptr;
-
     // Upload metrics
     UIntGauge* upload_total_byte = nullptr;
     IntCounter* upload_rowset_count = nullptr;
@@ -224,18 +217,6 @@ public:
     UIntGauge* arrow_flight_work_pool_max_queue_size = nullptr;
     UIntGauge* arrow_flight_work_max_threads = nullptr;
 
-    UIntGauge* flush_thread_pool_queue_size = nullptr;
-    UIntGauge* flush_thread_pool_thread_num = nullptr;
-
-    UIntGauge* local_scan_thread_pool_queue_size = nullptr;
-    UIntGauge* local_scan_thread_pool_thread_num = nullptr;
-    UIntGauge* remote_scan_thread_pool_queue_size = nullptr;
-    UIntGauge* remote_scan_thread_pool_thread_num = nullptr;
-    UIntGauge* limited_scan_thread_pool_queue_size = nullptr;
-    UIntGauge* limited_scan_thread_pool_thread_num = nullptr;
-    UIntGauge* group_local_scan_thread_pool_queue_size = nullptr;
-    UIntGauge* group_local_scan_thread_pool_thread_num = nullptr;
-
     IntCounter* num_io_bytes_read_total = nullptr;
     IntCounter* num_io_bytes_read_from_cache = nullptr;
     IntCounter* num_io_bytes_read_from_remote = nullptr;
@@ -244,9 +225,6 @@ public:
     IntCounter* scanner_ctx_cnt = nullptr;
     IntCounter* scanner_cnt = nullptr;
     IntCounter* scanner_task_cnt = nullptr;
-    IntCounter* scanner_task_queued = nullptr;
-    IntCounter* scanner_task_submit_failed = nullptr;
-    IntCounter* scanner_task_running = nullptr;
 
     static DorisMetrics* instance() {
         static DorisMetrics instance;
diff --git a/be/src/util/interval_histogram.cpp 
b/be/src/util/interval_histogram.cpp
new file mode 100644
index 00000000000..ec894fc69fa
--- /dev/null
+++ b/be/src/util/interval_histogram.cpp
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/interval_histogram.h"
+
+#include <algorithm>
+#include <mutex>
+#include <numeric>
+#include <vector>
+
+#include "gutil/integral_types.h"
+
+namespace doris {
+
+template <typename T>
+IntervalHistogramStat<T>::IntervalHistogramStat(size_t N) : window(N) {}
+
+template <typename T>
+void IntervalHistogramStat<T>::add(T value) {
+    std::unique_lock<std::shared_mutex> lock(mutex);
+    if (window.full()) {
+        window.pop_front();
+    }
+    window.push_back(value);
+}
+
+template <typename T>
+T IntervalHistogramStat<T>::mean() {
+    std::shared_lock<std::shared_mutex> lock(mutex);
+    if (window.empty()) {
+        return T();
+    }
+    T sum = std::accumulate(window.begin(), window.end(), T());
+    return sum / window.size();
+}
+
+template <typename T>
+T IntervalHistogramStat<T>::median() {
+    std::shared_lock<std::shared_mutex> lock(mutex);
+    if (window.empty()) {
+        return T();
+    }
+
+    std::vector<T> sorted(window.begin(), window.end());
+    std::sort(sorted.begin(), sorted.end());
+
+    size_t mid = sorted.size() / 2;
+    return sorted.size() % 2 == 0 ? (sorted[mid - 1] + sorted[mid]) / 2 : 
sorted[mid];
+}
+
+template <typename T>
+T IntervalHistogramStat<T>::max() {
+    std::shared_lock<std::shared_mutex> lock(mutex);
+    return *std::max_element(window.begin(), window.end());
+}
+
+template <typename T>
+T IntervalHistogramStat<T>::min() {
+    std::shared_lock<std::shared_mutex> lock(mutex);
+    return *std::min_element(window.begin(), window.end());
+}
+
+template class doris::IntervalHistogramStat<int64>;
+template class doris::IntervalHistogramStat<int32>;
+
+} // namespace doris
diff --git a/be/src/util/interval_histogram.h b/be/src/util/interval_histogram.h
new file mode 100644
index 00000000000..2d5d9e6c6d2
--- /dev/null
+++ b/be/src/util/interval_histogram.h
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <boost/circular_buffer.hpp>
+#include <shared_mutex>
+
+namespace doris {
+
+// A thread-safe interval histogram stat class.
+// IntervalHistogramStat will keep a FIXED-SIZE window of values and provide
+// statistics like mean, median, max, min.
+
+template <typename T>
+class IntervalHistogramStat {
+public:
+    explicit IntervalHistogramStat(size_t N);
+
+    void add(T value);
+
+    T mean();
+    T median();
+    T max();
+    T min();
+
+private:
+    boost::circular_buffer<T> window;
+    mutable std::shared_mutex mutex;
+};
+
+} // namespace doris
diff --git a/be/src/util/metrics.cpp b/be/src/util/metrics.cpp
index 23dbb628a0d..1a3aa51cd2b 100644
--- a/be/src/util/metrics.cpp
+++ b/be/src/util/metrics.cpp
@@ -169,6 +169,7 @@ std::string HistogramMetric::to_string() const {
 std::string HistogramMetric::to_prometheus(const std::string& display_name,
                                            const Labels& entity_labels,
                                            const Labels& metric_labels) const {
+    // TODO: Use std::string concate for better performance.
     std::stringstream ss;
     for (const auto& percentile : _s_output_percentiles) {
         auto quantile_lable = Labels({{"quantile", percentile.first}});
@@ -322,12 +323,12 @@ void MetricRegistry::trigger_all_hooks(bool force) const {
 std::string MetricRegistry::to_prometheus(bool with_tablet_metrics) const {
     // Reorder by MetricPrototype
     EntityMetricsByType entity_metrics_by_types;
-    std::lock_guard<std::mutex> l(_lock);
+    std::lock_guard<std::mutex> l1(_lock);
     for (const auto& entity : _entities) {
         if (entity.first->_type == MetricEntityType::kTablet && 
!with_tablet_metrics) {
             continue;
         }
-        std::lock_guard<std::mutex> l(entity.first->_lock);
+        std::lock_guard<std::mutex> l2(entity.first->_lock);
         entity.first->trigger_hook_unlocked(false);
         for (const auto& metric : entity.first->_metrics) {
             std::pair<MetricEntity*, Metric*> new_elem =
diff --git a/be/src/util/threadpool.cpp b/be/src/util/threadpool.cpp
index e9af13f556e..5548ad7f400 100644
--- a/be/src/util/threadpool.cpp
+++ b/be/src/util/threadpool.cpp
@@ -33,10 +33,23 @@
 #include "gutil/port.h"
 #include "gutil/strings/substitute.h"
 #include "util/debug/sanitizer_scopes.h"
+#include "util/doris_metrics.h"
+#include "util/metrics.h"
 #include "util/scoped_cleanup.h"
+#include "util/stopwatch.hpp"
 #include "util/thread.h"
 
 namespace doris {
+// The name of these varialbs will be useds as metric name in prometheus.
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_active_threads, 
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_queue_size, MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_max_queue_size, 
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(thread_pool_max_threads, 
MetricUnit::NOUNIT);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_pool_submit_failed, 
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(task_execution_time_ns_avg_in_last_1000_times,
+                                   MetricUnit::NANOSECONDS);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(task_wait_worker_ns_avg_in_last_1000_times,
+                                   MetricUnit::NANOSECONDS);
 using namespace ErrorCode;
 
 using std::string;
@@ -52,8 +65,9 @@ private:
     std::function<void()> _func;
 };
 
-ThreadPoolBuilder::ThreadPoolBuilder(string name)
+ThreadPoolBuilder::ThreadPoolBuilder(string name, string workload_group)
         : _name(std::move(name)),
+          _workload_group(std::move(workload_group)),
           _min_threads(0),
           _max_threads(std::thread::hardware_concurrency()),
           _max_queue_size(std::numeric_limits<int>::max()),
@@ -238,6 +252,7 @@ bool ThreadPoolToken::need_dispatch() {
 
 ThreadPool::ThreadPool(const ThreadPoolBuilder& builder)
         : _name(builder._name),
+          _workload_group(builder._workload_group),
           _min_threads(builder._min_threads),
           _max_threads(builder._max_threads),
           _max_queue_size(builder._max_queue_size),
@@ -248,7 +263,8 @@ ThreadPool::ThreadPool(const ThreadPoolBuilder& builder)
           _active_threads(0),
           _total_queued_tasks(0),
           _cgroup_cpu_ctl(builder._cgroup_cpu_ctl),
-          _tokenless(new_token(ExecutionMode::CONCURRENT)) {}
+          _tokenless(new_token(ExecutionMode::CONCURRENT)),
+          _id(UniqueId::gen_uid()) {}
 
 ThreadPool::~ThreadPool() {
     // There should only be one live token: the one used in tokenless 
submission.
@@ -270,10 +286,48 @@ Status ThreadPool::init() {
             return status;
         }
     }
+    // _id of thread pool is used to make sure when we create thread pool with 
same name, we can
+    // get different _metric_entity
+    // If not, we will have problem when we deregister entity and register 
hook.
+    _metric_entity = 
DorisMetrics::instance()->metric_registry()->register_entity(
+            fmt::format("thread_pool_{}", _name), {{"thread_pool_name", _name},
+                                                   {"workload_group", 
_workload_group},
+                                                   {"id", _id.to_string()}});
+
+    INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_active_threads);
+    INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_max_threads);
+    INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_queue_size);
+    INT_GAUGE_METRIC_REGISTER(_metric_entity, thread_pool_max_queue_size);
+    INT_GAUGE_METRIC_REGISTER(_metric_entity, 
task_execution_time_ns_avg_in_last_1000_times);
+    INT_GAUGE_METRIC_REGISTER(_metric_entity, 
task_wait_worker_ns_avg_in_last_1000_times);
+    INT_COUNTER_METRIC_REGISTER(_metric_entity, thread_pool_submit_failed);
+
+    _metric_entity->register_hook("update", [this]() {
+        {
+            std::lock_guard<std::mutex> l(_lock);
+            if (!_pool_status.ok()) {
+                return;
+            }
+        }
+
+        thread_pool_active_threads->set_value(num_active_threads());
+        thread_pool_queue_size->set_value(get_queue_size());
+        thread_pool_max_queue_size->set_value(get_max_queue_size());
+        thread_pool_max_threads->set_value(max_threads());
+        task_execution_time_ns_avg_in_last_1000_times->set_value(
+                _task_execution_time_ns_statistic.mean());
+        task_wait_worker_ns_avg_in_last_1000_times->set_value(
+                _task_wait_worker_time_ns_statistic.mean());
+    });
     return Status::OK();
 }
 
 void ThreadPool::shutdown() {
+    // Why access to doris_metrics is safe here?
+    // Since DorisMetrics is a singleton, it will be destroyed only after 
doris_main is exited.
+    // The shutdown/destroy of ThreadPool is guaranteed to take place before 
doris_main exits by
+    // ExecEnv::destroy().
+    
DorisMetrics::instance()->metric_registry()->deregister_entity(_metric_entity);
     debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
     std::unique_lock<std::mutex> l(_lock);
     check_not_pool_thread_unlocked();
@@ -357,8 +411,6 @@ Status ThreadPool::submit_func(std::function<void()> f) {
 
 Status ThreadPool::do_submit(std::shared_ptr<Runnable> r, ThreadPoolToken* 
token) {
     DCHECK(token);
-    std::chrono::time_point<std::chrono::system_clock> submit_time =
-            std::chrono::system_clock::now();
 
     std::unique_lock<std::mutex> l(_lock);
     if (PREDICT_FALSE(!_pool_status.ok())) {
@@ -373,6 +425,7 @@ Status ThreadPool::do_submit(std::shared_ptr<Runnable> r, 
ThreadPoolToken* token
     int64_t capacity_remaining = static_cast<int64_t>(_max_threads) - 
_active_threads +
                                  static_cast<int64_t>(_max_queue_size) - 
_total_queued_tasks;
     if (capacity_remaining < 1) {
+        thread_pool_submit_failed->increment(1);
         return Status::Error<SERVICE_UNAVAILABLE>(
                 "Thread pool {} is at capacity ({}/{} tasks running, {}/{} 
tasks queued)", _name,
                 _num_threads + _num_threads_pending_start, _max_threads, 
_total_queued_tasks,
@@ -408,7 +461,7 @@ Status ThreadPool::do_submit(std::shared_ptr<Runnable> r, 
ThreadPoolToken* token
 
     Task task;
     task.runnable = std::move(r);
-    task.submit_time = submit_time;
+    task.submit_time_wather.start();
 
     // Add the task to the token's queue.
     ThreadPoolToken::State state = token->state();
@@ -528,12 +581,15 @@ void ThreadPool::dispatch_thread() {
             continue;
         }
 
+        MonotonicStopWatch task_execution_time_watch;
+        task_execution_time_watch.start();
         // Get the next token and task to execute.
         ThreadPoolToken* token = _queue.front();
         _queue.pop_front();
         DCHECK_EQ(ThreadPoolToken::State::RUNNING, token->state());
         DCHECK(!token->_entries.empty());
         Task task = std::move(token->_entries.front());
+        
_task_wait_worker_time_ns_statistic.add(task.submit_time_wather.elapsed_time());
         token->_entries.pop_front();
         token->_active_threads++;
         --_total_queued_tasks;
@@ -543,7 +599,6 @@ void ThreadPool::dispatch_thread() {
 
         // Execute the task
         task.runnable->run();
-
         // Destruct the task while we do not hold the lock.
         //
         // The task's destructor may be expensive if it has a lot of bound
@@ -552,7 +607,7 @@ void ThreadPool::dispatch_thread() {
         // with this threadpool, and produce a deadlock.
         task.runnable.reset();
         l.lock();
-
+        
_task_execution_time_ns_statistic.add(task_execution_time_watch.elapsed_time());
         // Possible states:
         // 1. The token was shut down while we ran its task. Transition to 
QUIESCED.
         // 2. The token has no more queued tasks. Transition back to IDLE.
diff --git a/be/src/util/threadpool.h b/be/src/util/threadpool.h
index f822c307aa6..8c89f570a0a 100644
--- a/be/src/util/threadpool.h
+++ b/be/src/util/threadpool.h
@@ -20,6 +20,8 @@
 
 #pragma once
 
+#include <gen_cpp/Types_types.h>
+
 #include <boost/intrusive/detail/algo_type.hpp>
 #include <boost/intrusive/list.hpp>
 #include <boost/intrusive/list_hook.hpp>
@@ -38,6 +40,9 @@
 
 #include "agent/cgroup_cpu_ctl.h"
 #include "common/status.h"
+#include "util/interval_histogram.h"
+#include "util/metrics.h"
+#include "util/uid_util.h"
 #include "util/work_thread_pool.hpp"
 
 namespace doris {
@@ -99,7 +104,7 @@ public:
 //
 class ThreadPoolBuilder {
 public:
-    explicit ThreadPoolBuilder(std::string name);
+    explicit ThreadPoolBuilder(std::string name, std::string workload_group = 
"");
 
     // Note: We violate the style guide by returning mutable references here
     // in order to provide traditional Builder pattern conveniences.
@@ -132,6 +137,7 @@ public:
 private:
     friend class ThreadPool;
     const std::string _name;
+    const std::string _workload_group;
     int _min_threads;
     int _max_threads;
     int _max_queue_size;
@@ -255,6 +261,11 @@ public:
         return _total_queued_tasks;
     }
 
+    int get_max_queue_size() const {
+        std::lock_guard<std::mutex> l(_lock);
+        return _max_queue_size;
+    }
+
     std::vector<int> debug_info() const {
         std::lock_guard<std::mutex> l(_lock);
         std::vector<int> arr = {_num_threads, 
static_cast<int>(_threads.size()), _min_threads,
@@ -280,7 +291,7 @@ private:
         std::shared_ptr<Runnable> runnable;
 
         // Time at which the entry was submitted to the pool.
-        std::chrono::time_point<std::chrono::system_clock> submit_time;
+        MonotonicStopWatch submit_time_wather;
     };
 
     // Creates a new thread pool using a builder.
@@ -308,6 +319,7 @@ private:
     void release_token(ThreadPoolToken* t);
 
     const std::string _name;
+    const std::string _workload_group;
     int _min_threads;
     int _max_threads;
     const int _max_queue_size;
@@ -392,6 +404,20 @@ private:
 
     // ExecutionMode::CONCURRENT token used by the pool for tokenless 
submission.
     std::unique_ptr<ThreadPoolToken> _tokenless;
+    const UniqueId _id;
+
+    std::shared_ptr<MetricEntity> _metric_entity;
+    IntGauge* thread_pool_active_threads = nullptr;
+    IntGauge* thread_pool_queue_size = nullptr;
+    IntGauge* thread_pool_max_queue_size = nullptr;
+    IntGauge* thread_pool_max_threads = nullptr;
+    IntGauge* task_execution_time_ns_avg_in_last_1000_times = nullptr;
+    IntGauge* task_wait_worker_ns_avg_in_last_1000_times = nullptr;
+
+    IntervalHistogramStat<int64_t> _task_execution_time_ns_statistic {1000};
+    IntervalHistogramStat<int64_t> _task_wait_worker_time_ns_statistic {1000};
+
+    IntCounter* thread_pool_submit_failed = nullptr;
 };
 
 // Entry point for token-based task submission and blocking for a particular
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index f419f58037a..1b14d172790 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -56,24 +56,9 @@
 
 namespace doris::vectorized {
 
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(local_scan_thread_pool_queue_size, 
MetricUnit::NOUNIT);
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(local_scan_thread_pool_thread_num, 
MetricUnit::NOUNIT);
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(remote_scan_thread_pool_queue_size, 
MetricUnit::NOUNIT);
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(remote_scan_thread_pool_thread_num, 
MetricUnit::NOUNIT);
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(limited_scan_thread_pool_queue_size, 
MetricUnit::NOUNIT);
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(limited_scan_thread_pool_thread_num, 
MetricUnit::NOUNIT);
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(group_local_scan_thread_pool_queue_size, 
MetricUnit::NOUNIT);
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(group_local_scan_thread_pool_thread_num, 
MetricUnit::NOUNIT);
-
 ScannerScheduler::ScannerScheduler() = default;
 
-ScannerScheduler::~ScannerScheduler() {
-    if (!_is_init) {
-        return;
-    }
-
-    _deregister_metrics();
-}
+ScannerScheduler::~ScannerScheduler() = default;
 
 void ScannerScheduler::stop() {
     if (!_is_init) {
@@ -116,7 +101,6 @@ Status ScannerScheduler::init(ExecEnv* env) {
                             
.set_max_threads(config::doris_scanner_thread_pool_thread_num)
                             
.set_max_queue_size(config::doris_scanner_thread_pool_queue_size)
                             .build(&_limited_scan_thread_pool));
-    _register_metrics();
     _is_init = true;
     return Status::OK();
 }
@@ -141,11 +125,6 @@ Status 
ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
 
         scanner_delegate->_scanner->start_wait_worker_timer();
         auto s = ctx->thread_token->submit_func([scanner_ref = scan_task, 
ctx]() {
-            DorisMetrics::instance()->scanner_task_queued->increment(-1);
-            DorisMetrics::instance()->scanner_task_running->increment(1);
-            Defer metrics_defer(
-                    [&] { 
DorisMetrics::instance()->scanner_task_running->increment(-1); });
-
             auto status = [&] {
                 RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));
                 return Status::OK();
@@ -171,11 +150,6 @@ Status 
ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
         auto sumbit_task = [&]() {
             SimplifiedScanScheduler* scan_sched = ctx->get_scan_scheduler();
             auto work_func = [scanner_ref = scan_task, ctx]() {
-                DorisMetrics::instance()->scanner_task_queued->increment(-1);
-                DorisMetrics::instance()->scanner_task_running->increment(1);
-                Defer metrics_defer(
-                        [&] { 
DorisMetrics::instance()->scanner_task_running->increment(-1); });
-
                 auto status = [&] {
                     RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));
                     return Status::OK();
@@ -348,32 +322,6 @@ void 
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
     ctx->push_back_scan_task(scan_task);
 }
 
-void ScannerScheduler::_register_metrics() {
-    REGISTER_HOOK_METRIC(local_scan_thread_pool_queue_size,
-                         [this]() { return 
_local_scan_thread_pool->get_queue_size(); });
-    REGISTER_HOOK_METRIC(local_scan_thread_pool_thread_num,
-                         [this]() { return 
_local_scan_thread_pool->get_active_threads(); });
-    REGISTER_HOOK_METRIC(remote_scan_thread_pool_queue_size,
-                         [this]() { return 
_remote_scan_thread_pool->get_queue_size(); });
-    REGISTER_HOOK_METRIC(remote_scan_thread_pool_thread_num,
-                         [this]() { return 
_remote_scan_thread_pool->get_active_threads(); });
-    REGISTER_HOOK_METRIC(limited_scan_thread_pool_queue_size,
-                         [this]() { return 
_limited_scan_thread_pool->get_queue_size(); });
-    REGISTER_HOOK_METRIC(limited_scan_thread_pool_thread_num,
-                         [this]() { return 
_limited_scan_thread_pool->num_threads(); });
-}
-
-void ScannerScheduler::_deregister_metrics() {
-    DEREGISTER_HOOK_METRIC(local_scan_thread_pool_queue_size);
-    DEREGISTER_HOOK_METRIC(local_scan_thread_pool_thread_num);
-    DEREGISTER_HOOK_METRIC(remote_scan_thread_pool_queue_size);
-    DEREGISTER_HOOK_METRIC(remote_scan_thread_pool_thread_num);
-    DEREGISTER_HOOK_METRIC(limited_scan_thread_pool_queue_size);
-    DEREGISTER_HOOK_METRIC(limited_scan_thread_pool_thread_num);
-    DEREGISTER_HOOK_METRIC(group_local_scan_thread_pool_queue_size);
-    DEREGISTER_HOOK_METRIC(group_local_scan_thread_pool_thread_num);
-}
-
 int ScannerScheduler::get_remote_scan_thread_num() {
     int remote_max_thread_num = 
config::doris_max_remote_scanner_thread_pool_thread_num != -1
                                         ? 
config::doris_max_remote_scanner_thread_pool_thread_num
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h 
b/be/src/vec/exec/scan/scanner_scheduler.h
index 7731b3ba8f9..e94659c79d1 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -114,8 +114,12 @@ struct SimplifiedScanTask {
 
 class SimplifiedScanScheduler {
 public:
-    SimplifiedScanScheduler(std::string sched_name, 
std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl)
-            : _is_stop(false), _cgroup_cpu_ctl(cgroup_cpu_ctl), 
_sched_name(sched_name) {}
+    SimplifiedScanScheduler(std::string sched_name, 
std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl,
+                            std::string workload_group = "system")
+            : _is_stop(false),
+              _cgroup_cpu_ctl(cgroup_cpu_ctl),
+              _sched_name(sched_name),
+              _workload_group(workload_group) {}
 
     ~SimplifiedScanScheduler() {
         stop();
@@ -129,7 +133,7 @@ public:
     }
 
     Status start(int max_thread_num, int min_thread_num, int queue_size) {
-        RETURN_IF_ERROR(ThreadPoolBuilder(_sched_name)
+        RETURN_IF_ERROR(ThreadPoolBuilder(_sched_name, _workload_group)
                                 .set_min_threads(min_thread_num)
                                 .set_max_threads(max_thread_num)
                                 .set_max_queue_size(queue_size)
@@ -140,13 +144,7 @@ public:
 
     Status submit_scan_task(SimplifiedScanTask scan_task) {
         if (!_is_stop) {
-            DorisMetrics::instance()->scanner_task_queued->increment(1);
-            auto st = _scan_thread_pool->submit_func([scan_task] { 
scan_task.scan_func(); });
-            if (!st.ok()) {
-                DorisMetrics::instance()->scanner_task_queued->increment(-1);
-                
DorisMetrics::instance()->scanner_task_submit_failed->increment(1);
-            }
-            return st;
+            return _scan_thread_pool->submit_func([scan_task] { 
scan_task.scan_func(); });
         } else {
             return Status::InternalError<false>("scanner pool {} is 
shutdown.", _sched_name);
         }
@@ -216,6 +214,7 @@ private:
     std::atomic<bool> _is_stop;
     std::weak_ptr<CgroupCpuCtl> _cgroup_cpu_ctl;
     std::string _sched_name;
+    std::string _workload_group;
 };
 
 } // namespace doris::vectorized
diff --git a/be/test/io/fs/buffered_reader_test.cpp 
b/be/test/io/fs/buffered_reader_test.cpp
index 658c98ba514..1038f056aba 100644
--- a/be/test/io/fs/buffered_reader_test.cpp
+++ b/be/test/io/fs/buffered_reader_test.cpp
@@ -36,7 +36,10 @@ namespace doris {
 using io::FileReader;
 class BufferedReaderTest : public testing::Test {
 public:
-    BufferedReaderTest() {
+    BufferedReaderTest() = default;
+
+protected:
+    void SetUp() override {
         std::unique_ptr<ThreadPool> _pool;
         static_cast<void>(ThreadPoolBuilder("BufferedReaderPrefetchThreadPool")
                                   .set_min_threads(5)
@@ -44,10 +47,9 @@ public:
                                   .build(&_pool));
         ExecEnv::GetInstance()->_buffered_reader_prefetch_thread_pool = 
std::move(_pool);
     }
-
-protected:
-    virtual void SetUp() {}
-    virtual void TearDown() {}
+    void TearDown() override {
+        ExecEnv::GetInstance()->_buffered_reader_prefetch_thread_pool.reset();
+    }
 };
 
 class SyncLocalFileReader : public io::FileReader {
diff --git a/be/test/io/fs/remote_file_system_test.cpp 
b/be/test/io/fs/remote_file_system_test.cpp
index c5d80d1b65d..309a31eb97f 100644
--- a/be/test/io/fs/remote_file_system_test.cpp
+++ b/be/test/io/fs/remote_file_system_test.cpp
@@ -410,10 +410,10 @@ TEST_F(RemoteFileSystemTest, TestHdfsFileSystem) {
 
 TEST_F(RemoteFileSystemTest, TestS3FileSystem) {
     std::unique_ptr<ThreadPool> _pool;
-    ThreadPoolBuilder("S3FileUploadThreadPool")
-            .set_min_threads(5)
-            .set_max_threads(10)
-            .build(&_pool);
+    std::ignore = ThreadPoolBuilder("S3FileUploadThreadPool")
+                          .set_min_threads(5)
+                          .set_max_threads(10)
+                          .build(&_pool);
     ExecEnv::GetInstance()->_s3_file_upload_thread_pool = std::move(_pool);
     S3Conf s3_conf;
     S3URI s3_uri(s3_location);
@@ -563,6 +563,7 @@ TEST_F(RemoteFileSystemTest, TestS3FileSystem) {
     std::string download_content;
     CHECK_STATUS_OK(fs->direct_download(direct_remote_file, 
&download_content));
     ASSERT_EQ("abc", download_content);
+    ExecEnv::GetInstance()->_s3_file_upload_thread_pool.reset();
 }
 
 } // namespace doris
diff --git a/be/test/io/fs/s3_file_writer_test.cpp 
b/be/test/io/fs/s3_file_writer_test.cpp
index 7021346a704..6469559d0d8 100644
--- a/be/test/io/fs/s3_file_writer_test.cpp
+++ b/be/test/io/fs/s3_file_writer_test.cpp
@@ -306,6 +306,7 @@ public:
                           sp->clear_call_back(mockcallback.point_name);
                       });
         sp->disable_processing();
+        ExecEnv::GetInstance()->_s3_file_upload_thread_pool.reset();
     }
 };
 
diff --git a/be/test/olap/rowset/beta_rowset_test.cpp 
b/be/test/olap/rowset/beta_rowset_test.cpp
index 0c3001758f0..2e13436b3d3 100644
--- a/be/test/olap/rowset/beta_rowset_test.cpp
+++ b/be/test/olap/rowset/beta_rowset_test.cpp
@@ -236,6 +236,8 @@ TEST_F(BetaRowsetTest, ReadTest) {
                             .region = "region",
                             .ak = "ak",
                             .sk = "sk",
+                            .token = "",
+                            .bucket = "",
                     }};
     std::string resource_id = "10000";
     auto res = io::S3FileSystem::create(std::move(s3_conf), 
io::FileSystem::TMP_FS_ID);
diff --git a/be/test/olap/rowset/unique_rowset_id_generator_test.cpp 
b/be/test/olap/rowset/unique_rowset_id_generator_test.cpp
index aeb6d7bc8ae..7ef95e938e1 100644
--- a/be/test/olap/rowset/unique_rowset_id_generator_test.cpp
+++ b/be/test/olap/rowset/unique_rowset_id_generator_test.cpp
@@ -123,6 +123,7 @@ TEST_F(UniqueRowsetIdGeneratorTest, GenerateIdBenchmark) {
     hi <<= 56;
     RowsetId last_id = id_generator.next_id();
     EXPECT_EQ(last_id.hi, hi + kNumThreads * kIdPerThread + 1);
+    pool.reset();
 }
 
 } // namespace doris
diff --git a/be/test/testutil/run_all_tests.cpp 
b/be/test/testutil/run_all_tests.cpp
index 1208141a8fa..44cccb7fec1 100644
--- a/be/test/testutil/run_all_tests.cpp
+++ b/be/test/testutil/run_all_tests.cpp
@@ -50,6 +50,16 @@
 int main(int argc, char** argv) {
     doris::ThreadLocalHandle::create_thread_local_if_not_exits();
     doris::ExecEnv::GetInstance()->init_mem_tracker();
+    // Used for unit test
+    std::unique_ptr<doris::ThreadPool> non_block_close_thread_pool;
+
+    std::ignore = doris::ThreadPoolBuilder("NonBlockCloseThreadPool")
+                          .set_min_threads(12)
+                          .set_max_threads(48)
+                          .build(&non_block_close_thread_pool);
+    doris::ExecEnv::GetInstance()->set_non_block_close_thread_pool(
+            std::move(non_block_close_thread_pool));
+
     doris::thread_context()->thread_mem_tracker_mgr->init();
     std::shared_ptr<doris::MemTrackerLimiter> test_tracker =
             
doris::MemTrackerLimiter::create_shared(doris::MemTrackerLimiter::Type::GLOBAL,
@@ -96,5 +106,7 @@ int main(int argc, char** argv) {
     doris::ExecEnv::GetInstance()->set_tracking_memory(false);
 
     int res = RUN_ALL_TESTS();
+
+    doris::ExecEnv::GetInstance()->set_non_block_close_thread_pool(nullptr);
     return res;
 }
diff --git a/be/test/util/countdown_latch_test.cpp 
b/be/test/util/countdown_latch_test.cpp
index ff3c21c6e13..86999ca929d 100644
--- a/be/test/util/countdown_latch_test.cpp
+++ b/be/test/util/countdown_latch_test.cpp
@@ -59,6 +59,7 @@ TEST(TestCountDownLatch, TestLatch) {
     EXPECT_TRUE(pool->submit_func(std::bind(decrement_latch, &latch, 
1000)).ok());
     latch.wait();
     EXPECT_EQ(0, latch.count());
+    pool.reset();
 }
 
 // Test that resetting to zero while there are waiters lets the waiters
diff --git a/be/test/util/interval_histogram_test.cpp 
b/be/test/util/interval_histogram_test.cpp
new file mode 100644
index 00000000000..aa63c360165
--- /dev/null
+++ b/be/test/util/interval_histogram_test.cpp
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/interval_histogram.h"
+
+#include <gtest/gtest.h>
+
+#include <thread>
+
+namespace doris {
+
+TEST(IntervalHistogramStat, SerialTest) {
+    IntervalHistogramStat<int> stat(5);
+
+    stat.add(10);
+    stat.add(20);
+    stat.add(30);
+    stat.add(40);
+    stat.add(50);
+
+    EXPECT_EQ(stat.mean(), 30);
+    EXPECT_EQ(stat.median(), 30);
+    EXPECT_EQ(stat.max(), 50);
+    EXPECT_EQ(stat.min(), 10);
+
+    // Make window move forward
+    stat.add(60);
+    stat.add(70);
+
+    // window now contains [30, 40, 50, 60, 70]
+    EXPECT_EQ(stat.mean(), 50);
+    EXPECT_EQ(stat.median(), 50);
+    EXPECT_EQ(stat.max(), 70);
+    EXPECT_EQ(stat.min(), 30);
+}
+
+TEST(IntervalHistogramStatTest, ParallelTest) {
+    constexpr int thread_count = 10;
+    constexpr int values_per_thread = 10;
+    IntervalHistogramStat<int> stat(thread_count * values_per_thread);
+
+    auto add_values = [&stat](int start_value, int count) {
+        for (int i = 0; i < count; ++i) {
+            stat.add(start_value + i);
+        }
+    };
+
+    std::vector<std::thread> threads;
+    for (int i = 0; i < thread_count; ++i) {
+        threads.emplace_back(add_values, i * values_per_thread, 
values_per_thread);
+    }
+
+    for (auto& thread : threads) {
+        thread.join();
+    }
+
+    int total_values = thread_count * values_per_thread;
+    EXPECT_EQ(stat.mean(), (total_values - 1) / 2);
+    EXPECT_EQ(stat.max(), total_values - 1);
+    EXPECT_EQ(stat.min(), 0);
+    EXPECT_EQ(stat.median(), (total_values - 1) / 2);
+}
+
+} // namespace doris
diff --git a/be/test/vec/aggregate_functions/agg_linear_histogram_test.cpp 
b/be/test/vec/aggregate_functions/agg_linear_histogram_test.cpp
index 3dbf34a4dcb..fe94afe5118 100644
--- a/be/test/vec/aggregate_functions/agg_linear_histogram_test.cpp
+++ b/be/test/vec/aggregate_functions/agg_linear_histogram_test.cpp
@@ -204,8 +204,8 @@ public:
                          << "(" << data_types[0]->get_name() << ")";
 
         AggregateFunctionSimpleFactory factory = 
AggregateFunctionSimpleFactory::instance();
-        auto agg_function =
-                factory.get("linear_histogram", data_types, false, -1, 
{.enable_decimal256 = true});
+        auto agg_function = factory.get("linear_histogram", data_types, false, 
-1,
+                                        {.enable_decimal256 = true, 
.column_names = {""}});
         EXPECT_NE(agg_function, nullptr);
 
         std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
diff --git a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy 
b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
index be54bec5e2e..b1bbae2e3ce 100644
--- a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
+++ b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
@@ -15,6 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
+def getMetrics = { ip, port ->
+        def dst = 'http://' + ip + ':' + port
+        def conn = new URL(dst + "/metrics").openConnection()
+        conn.setRequestMethod("GET")
+        def encoding = 
Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" + 
+                (context.config.feHttpPassword == null ? "" : 
context.config.feHttpPassword)).getBytes("UTF-8"))
+        conn.setRequestProperty("Authorization", "Basic ${encoding}")
+        return conn.getInputStream().getText()
+    }
+
 suite("test_crud_wlg") {
     def table_name = "wlg_test_table"
     def table_name2 = "wlg_test_table2"
@@ -787,4 +797,50 @@ suite("test_crud_wlg") {
 
     sql "drop workload group if exists default_val_wg"
 
+    for (int i = 0; i < 20; i++) {
+        // 1. SHOW BACKENDS get be ip and http port
+        Map<String, String> backendId_to_backendIP = new HashMap<>();
+        Map<String, String> backendId_to_backendHttpPort = new HashMap<>();
+        getBackendIpHttpPort(backendId_to_backendIP, 
backendId_to_backendHttpPort);
+        // Print above maps in logger.
+        logger.info("backendId_to_backendIP: " + backendId_to_backendIP);
+        logger.info("backendId_to_backendHttpPort: " + 
backendId_to_backendHttpPort);
+
+        // 2. CREATE WORKLOAD GROUP
+        sql "drop workload group if exists test_wg_metrics;"
+        sql "create workload group if not exists test_wg_metrics " +
+                "properties ( " +
+                "    'cpu_share'='10', " +
+                "    'memory_limit'='10%', " +
+                "    'enable_memory_overcommit'='true' " +
+                ");"
+        sql "set workload_group=test_wg_metrics;"
+        wg = sql("select 
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num,tag,read_bytes_per_second,remote_read_bytes_per_second
 from information_schema.workload_groups where name = 'test_wg_metrics' order 
by name;");
+        logger.info("wg: " + wg);
+
+        // 3. EXECUTE A QUERY SO THAT THE WORKLOAD GROUP IS USED
+        sql "select count(*) from numbers(\"number\"=\"100\");"
+        
+        // curl backend http port to get metrics
+        // get first backendId
+        backendId = backendId_to_backendIP.keySet().iterator().next();
+        backendIP = backendId_to_backendIP.get(backendId);
+        backendHttpPort = backendId_to_backendHttpPort.get(backendId);
+        logger.info("backendId: " + backendId + ", backendIP: " + backendIP + 
", backendHttpPort: " + backendHttpPort);
+
+        // Create a for loop to get metrics 5 times
+        for (int j = 0; j < 5; j++) {
+            String metrics = getMetrics(backendIP, backendHttpPort);
+            String filteredMetrics = metrics.split('\n').findAll { line ->
+                line.startsWith('doris_be_thread_pool') && 
line.contains('workload_group="test_wg_metrics"') && 
line.contains('thread_pool_name="Scan_test_wg_metrics"')
+            }.join('\n')
+            // Filter metrics with name test_wg_metrics
+            logger.info("filteredMetrics: " + filteredMetrics);
+            List<String> lines = filteredMetrics.split('\n').findAll { 
it.trim() }
+            assert lines.size() == 5
+        }
+
+        sql "drop workload group if exists test_wg_metrics;"
+    }
+    sql "drop workload group if exists test_wg_metrics;"
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to