This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 654da806dad [feat](memtable) support adaptive memtable flush thread 
pool adjustment (#60617)
654da806dad is described below

commit 654da806dadb52fa16f06cf4ef4c4b0f544913ba
Author: hui lai <[email protected]>
AuthorDate: Wed Apr 1 15:38:39 2026 +0800

    [feat](memtable) support adaptive memtable flush thread pool adjustment 
(#60617)
    
    ### What problem does this PR solve?
    
    Issue Number: close https://github.com/apache/doris/issues/60616
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 be/src/cloud/cloud_storage_engine.cpp              |   5 +
 be/src/common/config.cpp                           |   6 +
 be/src/common/config.h                             |   5 +
 be/src/load/memtable/memtable_flush_executor.cpp   |  49 ++--
 be/src/load/memtable/memtable_flush_executor.h     |   9 +
 be/src/load/memtable/memtable_memory_limiter.h     |   2 +
 be/src/runtime/exec_env_init.cpp                   |   2 +-
 be/src/runtime/workload_group/workload_group.cpp   |  36 ++-
 be/src/runtime/workload_group/workload_group.h     |   2 +
 be/src/storage/adaptive_thread_pool_controller.cpp | 311 +++++++++++++++++++++
 be/src/storage/adaptive_thread_pool_controller.h   | 156 +++++++++++
 be/src/storage/olap_server.cpp                     |   3 +
 be/src/storage/storage_engine.cpp                  |  23 ++
 be/src/storage/storage_engine.h                    |   6 +
 .../load/memtable/memtable_flush_executor_test.cpp |  18 +-
 .../adaptive_thread_pool_controller_test.cpp       | 283 +++++++++++++++++++
 16 files changed, 885 insertions(+), 31 deletions(-)

diff --git a/be/src/cloud/cloud_storage_engine.cpp 
b/be/src/cloud/cloud_storage_engine.cpp
index 927d5bef343..b1882a3c1bd 100644
--- a/be/src/cloud/cloud_storage_engine.cpp
+++ b/be/src/cloud/cloud_storage_engine.cpp
@@ -44,6 +44,7 @@
 #include "cloud/cloud_warm_up_manager.h"
 #include "cloud/config.h"
 #include "common/config.h"
+#include "common/metrics/doris_metrics.h"
 #include "common/signal_handler.h"
 #include "common/status.h"
 #include "core/assert_cast.h"
@@ -56,6 +57,7 @@
 #include "io/hdfs_util.h"
 #include "io/io_common.h"
 #include "load/memtable/memtable_flush_executor.h"
+#include "runtime/exec_env.h"
 #include "runtime/memory/cache_manager.h"
 #include "storage/compaction/cumulative_compaction_policy.h"
 #include "storage/compaction/cumulative_compaction_time_series_policy.h"
@@ -276,6 +278,7 @@ void CloudStorageEngine::stop() {
     if (_cumu_compaction_thread_pool) {
         _cumu_compaction_thread_pool->shutdown();
     }
+    _adaptive_thread_controller.stop();
     LOG(INFO) << "Cloud storage engine is stopped.";
 
     if (_calc_tablet_delete_bitmap_task_thread_pool) {
@@ -389,6 +392,8 @@ Status 
CloudStorageEngine::start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sp
             &_bg_threads.emplace_back()));
     LOG(INFO) << "check tablet delete bitmap score thread started";
 
+    _start_adaptive_thread_controller();
+
     return Status::OK();
 }
 
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 5227059c066..8fb76ab66ad 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -827,6 +827,12 @@ DEFINE_mInt32(high_priority_flush_thread_num_per_store, 
"6");
 //                         max_flush_thread_num_per_cpu * num_cpu)
 DEFINE_mInt32(max_flush_thread_num_per_cpu, "4");
 
+// minimum flush threads per cpu when adaptive flush is enabled (default 0.5)
+DEFINE_mDouble(min_flush_thread_num_per_cpu, "0.5");
+
+// Whether to enable adaptive flush thread adjustment
+DEFINE_mBool(enable_adaptive_flush_threads, "true");
+
 // config for tablet meta checkpoint
 DEFINE_mInt32(tablet_meta_checkpoint_min_new_rowsets_num, "10");
 DEFINE_mInt32(tablet_meta_checkpoint_min_interval_secs, "600");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 4a7c7a3634d..45b1d6b4069 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -880,6 +880,11 @@ DECLARE_mInt32(high_priority_flush_thread_num_per_store);
 // number of threads = min(flush_thread_num_per_store * num_store,
 //                         max_flush_thread_num_per_cpu * num_cpu)
 DECLARE_mInt32(max_flush_thread_num_per_cpu);
+// minimum flush threads per cpu when adaptive flush is enabled (default 0.5)
+DECLARE_mDouble(min_flush_thread_num_per_cpu);
+
+// Whether to enable adaptive flush thread adjustment
+DECLARE_mBool(enable_adaptive_flush_threads);
 
 // config for tablet meta checkpoint
 DECLARE_mInt32(tablet_meta_checkpoint_min_new_rowsets_num);
diff --git a/be/src/load/memtable/memtable_flush_executor.cpp 
b/be/src/load/memtable/memtable_flush_executor.cpp
index b36d602c903..ca79b1a8345 100644
--- a/be/src/load/memtable/memtable_flush_executor.cpp
+++ b/be/src/load/memtable/memtable_flush_executor.cpp
@@ -27,6 +27,7 @@
 #include "common/logging.h"
 #include "common/metrics/doris_metrics.h"
 #include "common/metrics/metrics.h"
+#include "common/metrics/system_metrics.h"
 #include "common/signal_handler.h"
 #include "load/memtable/memtable.h"
 #include "runtime/thread_context.h"
@@ -295,45 +296,53 @@ void 
FlushToken::_flush_memtable(std::shared_ptr<MemTable> memtable_ptr, int32_t
     _stats.flush_disk_size_bytes += flush_size;
 }
 
+std::pair<int, int> MemTableFlushExecutor::calc_flush_thread_count(int 
num_cpus, int num_disk,
+                                                                   int 
thread_num_per_store) {
+    if (config::enable_adaptive_flush_threads && num_cpus > 0) {
+        int min = std::max(1, (int)(num_cpus * 
config::min_flush_thread_num_per_cpu));
+        int max = std::max(min, num_cpus * 
config::max_flush_thread_num_per_cpu);
+        return {min, max};
+    }
+    int min = std::max(1, thread_num_per_store);
+    int max = num_cpus == 0
+                      ? num_disk * min
+                      : std::min(num_disk * min, num_cpus * 
config::max_flush_thread_num_per_cpu);
+    return {min, max};
+}
+
 void MemTableFlushExecutor::init(int num_disk) {
     _num_disk = std::max(1, num_disk);
     int num_cpus = std::thread::hardware_concurrency();
-    int min_threads = std::max(1, config::flush_thread_num_per_store);
-    int max_threads = num_cpus == 0 ? _num_disk * min_threads
-                                    : std::min(_num_disk * min_threads,
-                                               num_cpus * 
config::max_flush_thread_num_per_cpu);
+
+    auto [min_threads, max_threads] =
+            calc_flush_thread_count(num_cpus, _num_disk, 
config::flush_thread_num_per_store);
     static_cast<void>(ThreadPoolBuilder("MemTableFlushThreadPool")
                               .set_min_threads(min_threads)
                               .set_max_threads(max_threads)
                               .build(&_flush_pool));
 
-    min_threads = std::max(1, 
config::high_priority_flush_thread_num_per_store);
-    max_threads = num_cpus == 0 ? _num_disk * min_threads
-                                : std::min(_num_disk * min_threads,
-                                           num_cpus * 
config::max_flush_thread_num_per_cpu);
+    auto [hi_min, hi_max] = calc_flush_thread_count(
+            num_cpus, _num_disk, 
config::high_priority_flush_thread_num_per_store);
     static_cast<void>(ThreadPoolBuilder("MemTableHighPriorityFlushThreadPool")
-                              .set_min_threads(min_threads)
-                              .set_max_threads(max_threads)
+                              .set_min_threads(hi_min)
+                              .set_max_threads(hi_max)
                               .build(&_high_prio_flush_pool));
 }
 
 void MemTableFlushExecutor::update_memtable_flush_threads() {
     int num_cpus = std::thread::hardware_concurrency();
-    int min_threads = std::max(1, config::flush_thread_num_per_store);
-    int max_threads = num_cpus == 0 ? _num_disk * min_threads
-                                    : std::min(_num_disk * min_threads,
-                                               num_cpus * 
config::max_flush_thread_num_per_cpu);
+
+    auto [min_threads, max_threads] =
+            calc_flush_thread_count(num_cpus, _num_disk, 
config::flush_thread_num_per_store);
     // Update max_threads first to avoid constraint violation when increasing 
min_threads
     static_cast<void>(_flush_pool->set_max_threads(max_threads));
     static_cast<void>(_flush_pool->set_min_threads(min_threads));
 
-    min_threads = std::max(1, 
config::high_priority_flush_thread_num_per_store);
-    max_threads = num_cpus == 0 ? _num_disk * min_threads
-                                : std::min(_num_disk * min_threads,
-                                           num_cpus * 
config::max_flush_thread_num_per_cpu);
+    auto [hi_min, hi_max] = calc_flush_thread_count(
+            num_cpus, _num_disk, 
config::high_priority_flush_thread_num_per_store);
     // Update max_threads first to avoid constraint violation when increasing 
min_threads
-    static_cast<void>(_high_prio_flush_pool->set_max_threads(max_threads));
-    static_cast<void>(_high_prio_flush_pool->set_min_threads(min_threads));
+    static_cast<void>(_high_prio_flush_pool->set_max_threads(hi_max));
+    static_cast<void>(_high_prio_flush_pool->set_min_threads(hi_min));
 }
 
 // NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are 
flushed in order.
diff --git a/be/src/load/memtable/memtable_flush_executor.h 
b/be/src/load/memtable/memtable_flush_executor.h
index 216aae14f17..ae08bec68c4 100644
--- a/be/src/load/memtable/memtable_flush_executor.h
+++ b/be/src/load/memtable/memtable_flush_executor.h
@@ -33,7 +33,9 @@ namespace doris {
 
 class DataDir;
 class MemTable;
+class MemTableMemoryLimiter;
 class RowsetWriter;
+class SystemMetrics;
 class WorkloadGroup;
 
 // the statistic of a certain flush handler.
@@ -163,8 +165,15 @@ public:
 
     ThreadPool* flush_pool() { return _flush_pool.get(); }
 
+    ThreadPool* high_prio_flush_pool() { return _high_prio_flush_pool.get(); }
+
     void update_memtable_flush_threads();
 
+    // Returns {min_threads, max_threads} for a flush thread pool.
+    // thread_num_per_store is used as the baseline when adaptive mode is off.
+    static std::pair<int, int> calc_flush_thread_count(int num_cpus, int 
num_disk,
+                                                       int 
thread_num_per_store);
+
 private:
     std::unique_ptr<ThreadPool> _flush_pool;
     std::unique_ptr<ThreadPool> _high_prio_flush_pool;
diff --git a/be/src/load/memtable/memtable_memory_limiter.h 
b/be/src/load/memtable/memtable_memory_limiter.h
index 34dcb2b06b4..6fbcc3cf8c8 100644
--- a/be/src/load/memtable/memtable_memory_limiter.h
+++ b/be/src/load/memtable/memtable_memory_limiter.h
@@ -54,6 +54,8 @@ public:
 
     int64_t mem_usage() const { return _mem_usage; }
 
+    bool soft_limit_reached() { return _soft_limit_reached(); }
+
 private:
     static inline int64_t _sys_avail_mem_less_than_warning_water_mark();
     static inline int64_t _process_used_mem_more_than_soft_mem_limit();
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 38fec145eef..76fa4dedab0 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -861,7 +861,7 @@ void ExecEnv::destroy() {
         static_cast<CloudClusterInfo*>(_cluster_info)->stop_bg_worker();
     }
 
-    // StorageEngine must be destoried before _cache_manager destory
+    // StorageEngine must be destoried before _cache_manager destory.
     SAFE_STOP(_storage_engine);
     _storage_engine.reset();
 
diff --git a/be/src/runtime/workload_group/workload_group.cpp 
b/be/src/runtime/workload_group/workload_group.cpp
index ee945832093..888f85a7a44 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -35,6 +35,8 @@
 #include "information_schema/schema_scanner_helper.h"
 #include "io/cache/block_file_cache_factory.h"
 #include "io/fs/local_file_reader.h"
+#include "load/memtable/memtable_flush_executor.h"
+#include "load/memtable/memtable_memory_limiter.h"
 #include "runtime/exec_env.h"
 #include "runtime/memory/global_memory_arbitrator.h"
 #include "runtime/memory/mem_tracker_limiter.h"
@@ -42,6 +44,7 @@
 #include "runtime/runtime_profile.h"
 #include "runtime/workload_group/workload_group_metrics.h"
 #include "runtime/workload_management/io_throttle.h"
+#include "storage/adaptive_thread_pool_controller.h"
 #include "storage/storage_engine.h"
 #include "util/mem_info.h"
 #include "util/parse_util.h"
@@ -434,11 +437,9 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
     num_cpus = std::thread::hardware_concurrency();
 #endif
     num_disk = std::max(1, num_disk);
-    int min_flush_thread_num = std::max(1, config::flush_thread_num_per_store);
-    int max_flush_thread_num = num_cpus == 0
-                                       ? num_disk * min_flush_thread_num
-                                       : std::min(num_disk * 
min_flush_thread_num,
-                                                  num_cpus * 
config::max_flush_thread_num_per_cpu);
+    auto [min_flush_thread_num, max_flush_thread_num] =
+            MemTableFlushExecutor::calc_flush_thread_count(num_cpus, num_disk,
+                                                           
config::flush_thread_num_per_store);
 
     // 12 memory low watermark
     int memory_low_watermark = MEMORY_LOW_WATERMARK_DEFAULT_VALUE;
@@ -614,6 +615,17 @@ Status 
WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
             LOG(INFO) << "[upsert wg thread pool] create " + pool_name + " 
succ, gid=" << wg_id
                       << ", max thread num=" << max_flush_thread_num
                       << ", min thread num=" << min_flush_thread_num;
+            // Register the new pool with adaptive thread controller
+            if (config::enable_adaptive_flush_threads) {
+                auto* controller =
+                        
ExecEnv::GetInstance()->storage_engine().adaptive_thread_controller();
+                auto* flush_pool = _memtable_flush_pool.get();
+                controller->add("flush_wg_" + std::to_string(_id), 
{flush_pool},
+                                
AdaptiveThreadPoolController::make_flush_adjust_func(controller,
+                                                                               
      flush_pool),
+                                config::max_flush_thread_num_per_cpu,
+                                config::min_flush_thread_num_per_cpu);
+            }
         } else {
             upsert_ret = ret;
             LOG(INFO) << "[upsert wg thread pool] create " + pool_name + " 
failed, gid=" << wg_id;
@@ -743,6 +755,14 @@ void WorkloadGroup::try_stop_schedulers() {
         _remote_scan_task_sched->stop();
     }
     if (_memtable_flush_pool) {
+        // Unregister from adaptive controller before destroying the pool to 
avoid UAF:
+        // the adjustment loop holds raw ThreadPool* pointers and must not 
access them
+        // after the pool is gone.
+        if (config::enable_adaptive_flush_threads) {
+            auto* controller =
+                    
ExecEnv::GetInstance()->storage_engine().adaptive_thread_controller();
+            controller->cancel("flush_wg_" + std::to_string(_id));
+        }
         _memtable_flush_pool->shutdown();
         _memtable_flush_pool->wait();
     }
@@ -764,10 +784,8 @@ void WorkloadGroup::update_memtable_flush_threads() {
     num_cpus = std::thread::hardware_concurrency();
 #endif
     num_disk = std::max(1, num_disk);
-    int min_threads = std::max(1, config::flush_thread_num_per_store);
-    int max_threads = num_cpus == 0 ? num_disk * min_threads
-                                    : std::min(num_disk * min_threads,
-                                               num_cpus * 
config::max_flush_thread_num_per_cpu);
+    auto [min_threads, max_threads] = 
MemTableFlushExecutor::calc_flush_thread_count(
+            num_cpus, num_disk, config::flush_thread_num_per_store);
 
     // Update max_threads first to avoid constraint violation when increasing 
min_threads
     static_cast<void>(_memtable_flush_pool->set_max_threads(max_threads));
diff --git a/be/src/runtime/workload_group/workload_group.h 
b/be/src/runtime/workload_group/workload_group.h
index 3d888580a17..e18b1be0098 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -36,7 +36,9 @@
 namespace doris {
 
 class MemTrackerLimiter;
+class MemTableMemoryLimiter;
 class RuntimeProfile;
+class SystemMetrics;
 class ThreadPool;
 class ExecEnv;
 class CgroupCpuCtl;
diff --git a/be/src/storage/adaptive_thread_pool_controller.cpp 
b/be/src/storage/adaptive_thread_pool_controller.cpp
new file mode 100644
index 00000000000..8b8de54654d
--- /dev/null
+++ b/be/src/storage/adaptive_thread_pool_controller.cpp
@@ -0,0 +1,311 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "storage/adaptive_thread_pool_controller.h"
+
+#include <butil/time.h>
+
+#include <algorithm>
+#include <thread>
+
+#include "cloud/config.h"
+#include "common/config.h"
+#include "common/logging.h"
+#include "common/metrics/system_metrics.h"
+#include "common/status.h"
+#include "util/threadpool.h"
+#include "util/time.h"
+
+namespace doris {
+
+int AdaptiveThreadPoolController::PoolGroup::get_max_threads() const {
+    int num_cpus = std::thread::hardware_concurrency();
+    if (num_cpus <= 0) num_cpus = 1;
+    return static_cast<int>(num_cpus * max_threads_per_cpu);
+}
+
+int AdaptiveThreadPoolController::PoolGroup::get_min_threads() const {
+    int num_cpus = std::thread::hardware_concurrency();
+    if (num_cpus <= 0) num_cpus = 1;
+    return std::max(1, static_cast<int>(num_cpus * min_threads_per_cpu));
+}
+
+// Static callback registered with bthread_timer_add.
+// Runs in brpc TimerThread. Must be fast and non-blocking.
+void AdaptiveThreadPoolController::_on_timer(void* raw) {
+    auto* arg = static_cast<TimerArg*>(raw);
+
+    // Hold mu for the entire callback (fire + re-registration).
+    // cancel() acquires mu after bthread_timer_del, so this provides
+    // cancel-with-wait semantics without a dedicated thread.
+    std::lock_guard<std::mutex> lk(arg->mu);
+
+    if (arg->stopped.load(std::memory_order_acquire)) {
+        // cancel() set stopped before we took the lock.
+        // cancel() owns arg and will delete it after taking mu.
+        return;
+    }
+
+    arg->ctrl->_fire_group(arg->name);
+
+    if (arg->stopped.load(std::memory_order_acquire)) {
+        return; // cancel() will clean up
+    }
+
+    // Re-register the next one-shot timer.
+    bthread_timer_t tid;
+    if (bthread_timer_add(&tid, 
butil::milliseconds_from_now(arg->interval_ms), _on_timer, arg) ==
+        0) {
+        arg->timer_id.store(tid, std::memory_order_release);
+    } else {
+        LOG(WARNING) << "Adaptive: failed to re-register timer for group '" << 
arg->name << "'";
+    }
+}
+
+void AdaptiveThreadPoolController::init(SystemMetrics* system_metrics,
+                                        ThreadPool* s3_file_upload_pool) {
+    _system_metrics = system_metrics;
+    _s3_file_upload_pool = s3_file_upload_pool;
+}
+
+void AdaptiveThreadPoolController::stop() {
+    std::vector<std::string> names;
+    {
+        std::lock_guard<std::mutex> lk(_mutex);
+        for (const auto& [name, _] : _pool_groups) {
+            names.push_back(name);
+        }
+    }
+    for (const auto& name : names) {
+        cancel(name);
+    }
+}
+
+void AdaptiveThreadPoolController::add(std::string name, 
std::vector<ThreadPool*> pools,
+                                       AdjustFunc adjust_func, double 
max_threads_per_cpu,
+                                       double min_threads_per_cpu, int64_t 
interval_ms) {
+    PoolGroup group;
+    group.name = name;
+    group.pools = std::move(pools);
+    group.adjust_func = std::move(adjust_func);
+    group.max_threads_per_cpu = max_threads_per_cpu;
+    group.min_threads_per_cpu = min_threads_per_cpu;
+    group.current_threads = group.get_max_threads();
+
+    int log_max = group.get_max_threads();
+    int log_min = group.get_min_threads();
+
+    auto* arg = new TimerArg();
+    arg->ctrl = this;
+    arg->name = name;
+    arg->interval_ms = interval_ms;
+
+    bthread_timer_t tid;
+    if (bthread_timer_add(&tid, butil::milliseconds_from_now(interval_ms), 
_on_timer, arg) == 0) {
+        arg->timer_id.store(tid, std::memory_order_release);
+    } else {
+        LOG(WARNING) << "Adaptive: failed to register timer for pool group '" 
<< name << "'";
+    }
+    group.timer_arg = arg;
+
+    {
+        std::lock_guard<std::mutex> lk(_mutex);
+        _pool_groups[name] = std::move(group);
+    }
+
+    LOG(INFO) << "Adaptive: added pool group '" << name << "'"
+              << ", max_threads=" << log_max << ", min_threads=" << log_min
+              << ", interval_ms=" << interval_ms;
+}
+
+void AdaptiveThreadPoolController::cancel(const std::string& name) {
+    TimerArg* arg = nullptr;
+    {
+        std::lock_guard<std::mutex> lk(_mutex);
+        auto it = _pool_groups.find(name);
+        if (it != _pool_groups.end()) {
+            arg = it->second.timer_arg;
+            _pool_groups.erase(it);
+        }
+    }
+
+    if (arg == nullptr) {
+        return;
+    }
+
+    // Signal the callback to stop re-registering.
+    arg->stopped.store(true, std::memory_order_release);
+
+    // Try to cancel a pending (not yet fired) timer. Read timer_id after
+    // setting stopped so any re-registration in a concurrent callback has
+    // already stored the latest id by now (it holds mu, which we haven't
+    // taken yet).
+    bthread_timer_t tid = arg->timer_id.load(std::memory_order_acquire);
+    bthread_timer_del(tid); // returns non-zero if already fired; that's fine
+
+    // Wait for any in-flight callback to finish. The callback holds mu while
+    // running _fire_group and re-registering, so acquiring mu here ensures
+    // we don't free arg while the callback is still executing.
+    { std::lock_guard<std::mutex> lk(arg->mu); }
+
+    delete arg;
+    LOG(INFO) << "Adaptive: cancelled pool group '" << name << "'";
+}
+
+// Called from _on_timer. No lock held on entry.
+void AdaptiveThreadPoolController::_fire_group(const std::string& name) {
+    if (!config::enable_adaptive_flush_threads) {
+        return;
+    }
+    // Phase 1: snapshot parameters under the lock.
+    AdjustFunc fn;
+    int current, min_t, max_t;
+    {
+        std::lock_guard<std::mutex> lk(_mutex);
+        auto it = _pool_groups.find(name);
+        if (it == _pool_groups.end()) return;
+        const PoolGroup& g = it->second;
+        fn = g.adjust_func;
+        current = g.current_threads;
+        min_t = g.get_min_threads();
+        max_t = g.get_max_threads();
+    }
+
+    // Phase 2: compute target — no lock held (adjust_func may call is_io_busy 
etc.).
+    std::string reason;
+    int target = fn(current, min_t, max_t, reason);
+
+    // Phase 3: apply under lock; recheck in case cancel() raced with us.
+    std::lock_guard<std::mutex> lk(_mutex);
+    auto it = _pool_groups.find(name);
+    if (it == _pool_groups.end()) return;
+    _apply_thread_count(it->second, target, reason);
+}
+
+// Fire all groups once regardless of schedule. For testing.
+void AdaptiveThreadPoolController::adjust_once() {
+    std::vector<std::string> names;
+    {
+        std::lock_guard<std::mutex> lk(_mutex);
+        for (const auto& [name, _] : _pool_groups) {
+            names.push_back(name);
+        }
+    }
+    for (const auto& name : names) {
+        _fire_group(name);
+    }
+}
+
+void AdaptiveThreadPoolController::_apply_thread_count(PoolGroup& group, int 
target_threads,
+                                                       const std::string& 
reason) {
+    int max_threads = group.get_max_threads();
+    int min_threads = group.get_min_threads();
+    target_threads = std::max(min_threads, std::min(max_threads, 
target_threads));
+    if (target_threads == group.current_threads) return;
+
+    LOG(INFO) << "Adaptive[" << group.name << "]: adjusting threads from " << 
group.current_threads
+              << " to " << target_threads << " (min=" << min_threads << ", 
max=" << max_threads
+              << ")" << (reason.empty() ? "" : " reason=[" + reason + "]");
+
+    bool all_success = true;
+    for (auto* pool : group.pools) {
+        if (pool == nullptr) continue;
+        // Always sync min_threads to guard against races with 
update_memtable_flush_threads().
+        // Order matters: when increasing, set max first so max >= min is 
always satisfied;
+        // when decreasing, set min first so the new max is never below min.
+        Status st;
+        if (target_threads >= group.current_threads) {
+            st = pool->set_max_threads(target_threads);
+            if (st.ok()) static_cast<void>(pool->set_min_threads(min_threads));
+        } else {
+            st = pool->set_min_threads(min_threads);
+            if (st.ok()) st = pool->set_max_threads(target_threads);
+        }
+        if (!st.ok()) {
+            all_success = false;
+            LOG(WARNING) << "Adaptive[" << group.name << "]: failed to set 
threads: " << st;
+        }
+    }
+    if (all_success) {
+        group.current_threads = target_threads;
+    }
+}
+
+int AdaptiveThreadPoolController::get_current_threads(const std::string& name) 
const {
+    std::lock_guard<std::mutex> lk(_mutex);
+    auto it = _pool_groups.find(name);
+    return it != _pool_groups.end() ? it->second.current_threads : 0;
+}
+
+bool AdaptiveThreadPoolController::is_io_busy() {
+    if (config::is_cloud_mode()) {
+        if (_s3_file_upload_pool == nullptr) return false;
+        int queue_size = _s3_file_upload_pool->get_queue_size();
+        return queue_size > kS3QueueBusyThreshold;
+    }
+
+    if (_system_metrics == nullptr) return false;
+
+    int64_t current_time_sec = MonotonicSeconds();
+    int64_t interval_sec = current_time_sec - _last_check_time_sec;
+    if (interval_sec <= 0) {
+        return _last_io_busy;
+    }
+
+    int64_t max_io_util = _system_metrics->get_max_io_util(_last_disk_io_time, 
interval_sec);
+    _system_metrics->get_disks_io_time(&_last_disk_io_time);
+    _last_check_time_sec = current_time_sec;
+
+    _last_io_busy = max_io_util > kIOBusyThresholdPercent;
+    return _last_io_busy;
+}
+
+bool AdaptiveThreadPoolController::is_cpu_busy() {
+    if (_system_metrics == nullptr) return false;
+
+    double load_avg = _system_metrics->get_load_average_1_min();
+    int num_cpus = std::thread::hardware_concurrency();
+    if (num_cpus <= 0) return false;
+
+    double cpu_usage_percent = (load_avg / num_cpus) * 100.0;
+    return cpu_usage_percent > kCPUBusyThresholdPercent;
+}
+
+AdaptiveThreadPoolController::AdjustFunc 
AdaptiveThreadPoolController::make_flush_adjust_func(
+        AdaptiveThreadPoolController* controller, ThreadPool* flush_pool) {
+    return [controller, flush_pool](int current, int min_t, int max_t, 
std::string& reason) {
+        int target = current;
+        int queue_size = flush_pool->get_queue_size();
+        if (queue_size > kQueueThreshold) {
+            target = std::min(max_t, target + 1);
+            reason += "queue_size=" + std::to_string(queue_size) + ">" +
+                      std::to_string(kQueueThreshold) + " -> target=" + 
std::to_string(target) +
+                      "; ";
+        }
+        if (controller->is_io_busy()) {
+            target = std::max(min_t, target - 2);
+            reason += "io_busy -> target=" + std::to_string(target) + "; ";
+        }
+        if (controller->is_cpu_busy()) {
+            target = std::max(min_t, target - 2);
+            reason += "cpu_busy -> target=" + std::to_string(target) + "; ";
+        }
+        return target;
+    };
+}
+
+} // namespace doris
diff --git a/be/src/storage/adaptive_thread_pool_controller.h 
b/be/src/storage/adaptive_thread_pool_controller.h
new file mode 100644
index 00000000000..4a6f0096f62
--- /dev/null
+++ b/be/src/storage/adaptive_thread_pool_controller.h
@@ -0,0 +1,156 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <bthread/unstable.h>
+
+#include <atomic>
+#include <cstdint>
+#include <functional>
+#include <map>
+#include <mutex>
+#include <string>
+#include <vector>
+
+namespace doris {
+
+class ThreadPool;
+class SystemMetrics;
+class AdaptiveThreadPoolController;
+
+// Each pool group's timer state. Heap-allocated; shared between the controller
+// and the brpc TimerThread callback.
+struct TimerArg {
+    AdaptiveThreadPoolController* ctrl; // never null
+    std::string name;
+    int64_t interval_ms;
+
+    // Set by cancel() before calling bthread_timer_del. The callback checks
+    // this flag after acquiring `mu` and skips re-registration when true.
+    std::atomic<bool> stopped {false};
+
+    // Tracks the most recently registered timer id. Updated under `mu` by the
+    // callback after each re-registration; read by cancel() to call
+    // bthread_timer_del on the latest pending timer.
+    std::atomic<bthread_timer_t> timer_id {0};
+
+    // Held for the entire duration of the callback (fire + re-registration).
+    // cancel() acquires it after bthread_timer_del to wait for any in-flight
+    // invocation to complete before freeing `this`.
+    std::mutex mu;
+};
+
+// AdaptiveThreadPoolController dynamically adjusts thread pool sizes based on
+// system load (IO utilisation, CPU load average, flush queue depth).
+//
+// Each registered pool group runs as a one-shot bthread_timer_add chain: the
+// callback fires, adjusts the pool, then re-registers the next one-shot timer.
+// All groups share the single brpc TimerThread, keeping the overhead minimal.
+//
+// Usage:
+//   AdaptiveThreadPoolController ctrl;
+//   ctrl.init(system_metrics, s3_pool);
+//   ctrl.add("flush", {pool1, pool2},
+//       AdaptiveThreadPoolController::make_flush_adjust_func(&ctrl, pool1),
+//       max_per_cpu, min_per_cpu);
+//   // ... later ...
+//   ctrl.cancel("flush");   // or ctrl.stop()
+class AdaptiveThreadPoolController {
+public:
+    using AdjustFunc =
+            std::function<int(int current, int min_threads, int max_threads, 
std::string& reason)>;
+
+    static constexpr int kDefaultIntervalMs = 10000;
+
+    static constexpr int kQueueThreshold = 10;
+    static constexpr int kIOBusyThresholdPercent = 90;
+    static constexpr int kCPUBusyThresholdPercent = 90;
+    static constexpr int kS3QueueBusyThreshold = 100;
+
+    AdaptiveThreadPoolController() = default;
+    ~AdaptiveThreadPoolController() { stop(); }
+
+    // Initialize with system-level dependencies.
+    void init(SystemMetrics* system_metrics, ThreadPool* s3_file_upload_pool);
+
+    // Cancel all registered pool groups. Must be called before the pools are 
destroyed.
+    void stop();
+
+    // Register a pool group and start a recurring bthread_timer_add chain.
+    void add(std::string name, std::vector<ThreadPool*> pools, AdjustFunc 
adjust_func,
+             double max_threads_per_cpu, double min_threads_per_cpu,
+             int64_t interval_ms = kDefaultIntervalMs);
+
+    // Cancel the timer chain and remove the pool group. Blocks until any
+    // in-flight callback finishes, then returns. Safe to call before pool 
teardown.
+    void cancel(const std::string& name);
+
+    // Fire all registered groups once, ignoring the schedule. For testing.
+    void adjust_once();
+
+    // Get current thread count for a named group. For testing/debugging.
+    int get_current_threads(const std::string& name) const;
+
+    // System-state helpers; safe to call from inside an AdjustFunc.
+    bool is_io_busy();
+    bool is_cpu_busy();
+
+    // Factory: standard flush-pool adjust function.
+    static AdjustFunc make_flush_adjust_func(AdaptiveThreadPoolController* 
controller,
+                                             ThreadPool* flush_pool);
+
+    // Callback registered with bthread_timer_add. Public only for the C 
linkage
+    // requirement; do not call directly.
+    static void _on_timer(void* arg);
+
+private:
+    struct PoolGroup {
+        std::string name;
+        std::vector<ThreadPool*> pools;
+        AdjustFunc adjust_func;
+        double max_threads_per_cpu = 4.0;
+        double min_threads_per_cpu = 0.5;
+        int current_threads = 0;
+        TimerArg* timer_arg = nullptr; // owned; freed by cancel()
+
+        int get_max_threads() const;
+        int get_min_threads() const;
+    };
+
+    // Run one group's adjustment. Called from _on_timer (no lock on entry).
+    void _fire_group(const std::string& name);
+
+    void _apply_thread_count(PoolGroup& group, int target_threads, const 
std::string& reason);
+
+private:
+    SystemMetrics* _system_metrics = nullptr;
+    ThreadPool* _s3_file_upload_pool = nullptr;
+
+    mutable std::mutex _mutex;
+    std::map<std::string, PoolGroup> _pool_groups;
+
+    // Last successfully computed IO-busy result. Returned as-is when the
+    // measurement interval is too short to produce a valid new delta.
+    bool _last_io_busy = false;
+
+    // For disk IO util calculation (used by is_io_busy).
+    std::map<std::string, int64_t> _last_disk_io_time;
+    int64_t _last_check_time_sec = 0;
+};
+
+} // namespace doris
diff --git a/be/src/storage/olap_server.cpp b/be/src/storage/olap_server.cpp
index 498cc8e6c31..ffe9d97bd06 100644
--- a/be/src/storage/olap_server.cpp
+++ b/be/src/storage/olap_server.cpp
@@ -56,6 +56,7 @@
 #include "cpp/sync_point.h"
 #include "io/fs/file_writer.h" // IWYU pragma: keep
 #include "io/fs/path.h"
+#include "load/memtable/memtable_flush_executor.h"
 #include "runtime/memory/cache_manager.h"
 #include "runtime/memory/global_memory_arbitrator.h"
 #include "storage/compaction/cold_data_compaction.h"
@@ -357,6 +358,8 @@ Status 
StorageEngine::start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr) {
             &_check_delete_bitmap_score_thread));
     LOG(INFO) << "check tablet delete bitmap score thread started";
 
+    _start_adaptive_thread_controller();
+
     LOG(INFO) << "all storage engine's background threads are started.";
     return Status::OK();
 }
diff --git a/be/src/storage/storage_engine.cpp 
b/be/src/storage/storage_engine.cpp
index cb49a5ac149..aeda65e21ca 100644
--- a/be/src/storage/storage_engine.cpp
+++ b/be/src/storage/storage_engine.cpp
@@ -59,6 +59,7 @@
 #include "io/fs/local_file_system.h"
 #include "load/memtable/memtable_flush_executor.h"
 #include "load/stream_load/stream_load_recorder.h"
+#include "runtime/exec_env.h"
 #include "storage/binlog.h"
 #include "storage/cache/schema_cache.h"
 #include "storage/compaction/single_replica_compaction.h"
@@ -140,6 +141,27 @@ int64_t 
BaseStorageEngine::memory_limitation_bytes_per_thread_for_schema_change(
                     
config::memory_limitation_per_thread_for_schema_change_bytes);
 }
 
+void BaseStorageEngine::_start_adaptive_thread_controller() {
+    if (!config::enable_adaptive_flush_threads) {
+        return;
+    }
+
+    auto* system_metrics = DorisMetrics::instance()->system_metrics();
+    auto* s3_upload_pool = 
ExecEnv::GetInstance()->s3_file_upload_thread_pool();
+
+    _adaptive_thread_controller.init(system_metrics, s3_upload_pool);
+
+    if (_memtable_flush_executor) {
+        auto* flush_pool = _memtable_flush_executor->flush_pool();
+        auto* high_prio_pool = 
_memtable_flush_executor->high_prio_flush_pool();
+        _adaptive_thread_controller.add("flush", {flush_pool, high_prio_pool},
+                                        
AdaptiveThreadPoolController::make_flush_adjust_func(
+                                                &_adaptive_thread_controller, 
flush_pool),
+                                        config::max_flush_thread_num_per_cpu,
+                                        config::min_flush_thread_num_per_cpu);
+    }
+}
+
 Status BaseStorageEngine::init_stream_load_recorder(const std::string& 
stream_load_record_path) {
     LOG(INFO) << "stream load record path: " << stream_load_record_path;
     // init stream load record rocksdb
@@ -769,6 +791,7 @@ void StorageEngine::stop() {
         _cooldown_thread_pool->shutdown();
     }
 
+    _adaptive_thread_controller.stop();
     _memtable_flush_executor.reset(nullptr);
     _calc_delete_bitmap_executor.reset(nullptr);
     _calc_delete_bitmap_executor_for_load.reset();
diff --git a/be/src/storage/storage_engine.h b/be/src/storage/storage_engine.h
index 8b50d1c4d9b..b37aeaf475d 100644
--- a/be/src/storage/storage_engine.h
+++ b/be/src/storage/storage_engine.h
@@ -41,6 +41,7 @@
 #include "common/config.h"
 #include "common/status.h"
 #include "runtime/heartbeat_flags.h"
+#include "storage/adaptive_thread_pool_controller.h"
 #include "storage/compaction/compaction_permit_limiter.h"
 #include "storage/delete/calc_delete_bitmap_executor.h"
 #include "storage/olap_common.h"
@@ -140,6 +141,9 @@ public:
     RowsetId next_rowset_id();
 
     MemTableFlushExecutor* memtable_flush_executor() { return 
_memtable_flush_executor.get(); }
+    AdaptiveThreadPoolController* adaptive_thread_controller() {
+        return &_adaptive_thread_controller;
+    }
     CalcDeleteBitmapExecutor* calc_delete_bitmap_executor() {
         return _calc_delete_bitmap_executor.get();
     }
@@ -163,6 +167,7 @@ public:
     }
 
 protected:
+    void _start_adaptive_thread_controller();
     void _evict_querying_rowset();
     void _evict_quring_rowset_thread_callback();
     bool _should_delay_large_task();
@@ -176,6 +181,7 @@ protected:
 
     std::unique_ptr<RowsetIdGenerator> _rowset_id_generator;
     std::unique_ptr<MemTableFlushExecutor> _memtable_flush_executor;
+    AdaptiveThreadPoolController _adaptive_thread_controller;
     std::unique_ptr<CalcDeleteBitmapExecutor> _calc_delete_bitmap_executor;
     std::unique_ptr<CalcDeleteBitmapExecutor> 
_calc_delete_bitmap_executor_for_load;
     CountDownLatch _stop_background_threads_latch;
diff --git a/be/test/load/memtable/memtable_flush_executor_test.cpp 
b/be/test/load/memtable/memtable_flush_executor_test.cpp
index 864b0d7be62..8b5d2990c06 100644
--- a/be/test/load/memtable/memtable_flush_executor_test.cpp
+++ b/be/test/load/memtable/memtable_flush_executor_test.cpp
@@ -82,6 +82,7 @@ TEST(MemTableFlushExecutorTest, TestDynamicThreadPoolUpdate) {
     int32_t original_high_priority_flush_thread_num =
             config::high_priority_flush_thread_num_per_store;
     int32_t original_max_flush_thread_num = 
config::max_flush_thread_num_per_cpu;
+    bool original_adaptive = config::enable_adaptive_flush_threads;
 
     // Test 1: Get initial thread pool sizes
     int initial_max_threads = flush_executor->flush_pool()->max_threads();
@@ -89,6 +90,9 @@ TEST(MemTableFlushExecutorTest, TestDynamicThreadPoolUpdate) {
     EXPECT_GT(initial_max_threads, 0);
     EXPECT_GT(initial_min_threads, 0);
 
+    // Disable adaptive mode so flush_thread_num_per_store takes effect
+    config::enable_adaptive_flush_threads = false;
+
     // Test 2: Update flush_thread_num_per_store and verify thread pool updates
     config::flush_thread_num_per_store = 10;
     flush_executor->update_memtable_flush_threads();
@@ -126,6 +130,7 @@ TEST(MemTableFlushExecutorTest, 
TestDynamicThreadPoolUpdate) {
     config::flush_thread_num_per_store = original_flush_thread_num;
     config::high_priority_flush_thread_num_per_store = 
original_high_priority_flush_thread_num;
     config::max_flush_thread_num_per_cpu = original_max_flush_thread_num;
+    config::enable_adaptive_flush_threads = original_adaptive;
     flush_executor->update_memtable_flush_threads();
 
     // Cleanup
@@ -141,6 +146,11 @@ TEST(MemTableFlushExecutorTest, TestConfigUpdateTrigger) {
 
     // Store original config values
     int32_t original_flush_thread_num = config::flush_thread_num_per_store;
+    bool original_adaptive = config::enable_adaptive_flush_threads;
+
+    // Disable adaptive mode so flush_thread_num_per_store takes effect
+    config::enable_adaptive_flush_threads = false;
+    flush_executor->update_memtable_flush_threads();
 
     // Get initial thread pool size
     int initial_min_threads = flush_executor->flush_pool()->min_threads();
@@ -154,8 +164,9 @@ TEST(MemTableFlushExecutorTest, TestConfigUpdateTrigger) {
     EXPECT_EQ(updated_min_threads, 15);
     EXPECT_NE(updated_min_threads, initial_min_threads);
 
-    // Restore original config value
+    // Restore original config values
     config::flush_thread_num_per_store = original_flush_thread_num;
+    config::enable_adaptive_flush_threads = original_adaptive;
     flush_executor->update_memtable_flush_threads();
 
     // Cleanup
@@ -172,6 +183,10 @@ TEST(MemTableFlushExecutorTest, 
TestThreadPoolMinMaxRelationship) {
     // Store original config values
     int32_t original_flush_thread_num = config::flush_thread_num_per_store;
     int32_t original_max_flush_thread_num = 
config::max_flush_thread_num_per_cpu;
+    bool original_adaptive = config::enable_adaptive_flush_threads;
+
+    // Disable adaptive mode so flush_thread_num_per_store takes effect
+    config::enable_adaptive_flush_threads = false;
 
     // Test: Ensure min_threads <= max_threads always
     config::flush_thread_num_per_store = 20;
@@ -185,6 +200,7 @@ TEST(MemTableFlushExecutorTest, 
TestThreadPoolMinMaxRelationship) {
     // Restore original config values
     config::flush_thread_num_per_store = original_flush_thread_num;
     config::max_flush_thread_num_per_cpu = original_max_flush_thread_num;
+    config::enable_adaptive_flush_threads = original_adaptive;
     flush_executor->update_memtable_flush_threads();
 
     // Cleanup
diff --git a/be/test/storage/adaptive_thread_pool_controller_test.cpp 
b/be/test/storage/adaptive_thread_pool_controller_test.cpp
new file mode 100644
index 00000000000..b2d6464fcef
--- /dev/null
+++ b/be/test/storage/adaptive_thread_pool_controller_test.cpp
@@ -0,0 +1,283 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "storage/adaptive_thread_pool_controller.h"
+
+#include <gtest/gtest.h>
+
+#include <thread>
+
+#include "common/config.h"
+#include "util/threadpool.h"
+
+namespace doris {
+
+class AdaptiveThreadPoolControllerTest : public testing::Test {
+protected:
+    void SetUp() override {
+        _original_enable_adaptive = config::enable_adaptive_flush_threads;
+
+        ASSERT_TRUE(ThreadPoolBuilder("TestPool")
+                            .set_min_threads(2)
+                            .set_max_threads(64)
+                            .build(&_pool)
+                            .ok());
+
+        ASSERT_TRUE(ThreadPoolBuilder("TestPool2")
+                            .set_min_threads(2)
+                            .set_max_threads(64)
+                            .build(&_pool2)
+                            .ok());
+    }
+
+    void TearDown() override {
+        config::enable_adaptive_flush_threads = _original_enable_adaptive;
+        if (_pool) _pool->shutdown();
+        if (_pool2) _pool2->shutdown();
+    }
+
+    bool _original_enable_adaptive;
+    std::unique_ptr<ThreadPool> _pool;
+    std::unique_ptr<ThreadPool> _pool2;
+};
+
+// Test basic add and get_current_threads
+TEST_F(AdaptiveThreadPoolControllerTest, TestAddPoolGroup) {
+    AdaptiveThreadPoolController controller;
+    controller.init(nullptr, nullptr);
+
+    controller.add(
+            "test", {_pool.get()},
+            [](int current, int min_t, int max_t, std::string&) { return 
current; }, 4, 0.5);
+
+    int num_cpus = std::thread::hardware_concurrency();
+    if (num_cpus <= 0) num_cpus = 1;
+
+    EXPECT_EQ(controller.get_current_threads("test"), num_cpus * 4);
+    EXPECT_EQ(controller.get_current_threads("nonexistent"), 0);
+}
+
+// Test adding multiple pool groups with different adjust logic
+TEST_F(AdaptiveThreadPoolControllerTest, TestMultiplePoolGroups) {
+    AdaptiveThreadPoolController controller;
+    controller.init(nullptr, nullptr);
+
+    controller.add(
+            "group_a", {_pool.get()},
+            [](int current, int min_t, int max_t, std::string&) { return 
current; }, 4, 0.5);
+    controller.add(
+            "group_b", {_pool2.get()},
+            [](int current, int min_t, int max_t, std::string&) { return 
current; }, 2, 0.5);
+
+    int num_cpus = std::thread::hardware_concurrency();
+    if (num_cpus <= 0) num_cpus = 1;
+
+    EXPECT_EQ(controller.get_current_threads("group_a"), num_cpus * 4);
+    EXPECT_EQ(controller.get_current_threads("group_b"), num_cpus * 2);
+}
+
+// Test that when adaptive is disabled, adjust_once is a no-op (config guard 
in _fire_group)
+TEST_F(AdaptiveThreadPoolControllerTest, TestAdaptiveDisabled) {
+    config::enable_adaptive_flush_threads = false;
+
+    AdaptiveThreadPoolController controller;
+    controller.init(nullptr, nullptr);
+    controller.add(
+            "test", {_pool.get()},
+            [](int current, int min_t, int max_t, std::string&) { return 1; }, 
4, 0.5);
+
+    int initial = controller.get_current_threads("test");
+    // bthread_timer callbacks check config::enable_adaptive_flush_threads 
before firing;
+    // adjust_once bypasses that check, so we only verify the group was 
registered.
+    EXPECT_GT(initial, 0);
+}
+
+// Test adjust_once calls the custom adjust function
+TEST_F(AdaptiveThreadPoolControllerTest, TestCustomAdjustFunc) {
+    config::enable_adaptive_flush_threads = true;
+
+    AdaptiveThreadPoolController controller;
+    controller.init(nullptr, nullptr);
+
+    int num_cpus = std::thread::hardware_concurrency();
+    if (num_cpus <= 0) num_cpus = 1;
+    int expected_min = std::max(1, static_cast<int>(num_cpus * 0.5));
+
+    // Adjust function always returns min
+    controller.add(
+            "test", {_pool.get()},
+            [](int current, int min_t, int max_t, std::string&) { return 
min_t; }, 4, 0.5);
+
+    controller.adjust_once();
+
+    EXPECT_EQ(controller.get_current_threads("test"), expected_min);
+}
+
+// Test that result is clamped to [min, max]
+TEST_F(AdaptiveThreadPoolControllerTest, TestClampToMinMax) {
+    config::enable_adaptive_flush_threads = true;
+
+    AdaptiveThreadPoolController controller;
+    controller.init(nullptr, nullptr);
+
+    int num_cpus = std::thread::hardware_concurrency();
+    if (num_cpus <= 0) num_cpus = 1;
+    int expected_max = num_cpus * 2;
+    int expected_min = std::max(1, static_cast<int>(num_cpus * 0.5));
+
+    // Adjust function tries to return way beyond max
+    controller.add(
+            "test", {_pool.get()},
+            [](int current, int min_t, int max_t, std::string&) { return 
99999; }, 2, 0.5);
+    controller.adjust_once();
+    EXPECT_EQ(controller.get_current_threads("test"), expected_max);
+
+    // Adjust function tries to return below min
+    controller.add(
+            "test2", {_pool2.get()},
+            [](int current, int min_t, int max_t, std::string&) { return -1; 
}, 2, 0.5);
+    controller.adjust_once();
+    EXPECT_EQ(controller.get_current_threads("test2"), expected_min);
+}
+
+// Test adjust_once with no change keeps current threads
+TEST_F(AdaptiveThreadPoolControllerTest, TestAdjustOnceNoChange) {
+    config::enable_adaptive_flush_threads = true;
+
+    AdaptiveThreadPoolController controller;
+    controller.init(nullptr, nullptr);
+    controller.add(
+            "test", {_pool.get()},
+            [](int current, int min_t, int max_t, std::string&) { return 
current; }, 4, 0.5);
+
+    int initial = controller.get_current_threads("test");
+    controller.adjust_once();
+    EXPECT_EQ(controller.get_current_threads("test"), initial);
+}
+
+// Test stop lifecycle
+TEST_F(AdaptiveThreadPoolControllerTest, TestStartStopLifecycle) {
+    config::enable_adaptive_flush_threads = true;
+
+    AdaptiveThreadPoolController controller;
+    controller.init(nullptr, nullptr);
+    controller.add(
+            "test", {_pool.get()},
+            [](int current, int min_t, int max_t, std::string&) { return 
current; }, 4, 0.5);
+
+    std::this_thread::sleep_for(std::chrono::milliseconds(100));
+    controller.stop();
+
+    // Multiple stops should be safe
+    controller.stop();
+}
+
+// Test destructor stops the controller (cancels all timer events)
+TEST_F(AdaptiveThreadPoolControllerTest, TestDestructorStops) {
+    config::enable_adaptive_flush_threads = true;
+
+    {
+        AdaptiveThreadPoolController controller;
+        controller.init(nullptr, nullptr);
+        controller.add(
+                "test", {_pool.get()},
+                [](int current, int min_t, int max_t, std::string&) { return 
current; }, 4, 0.5);
+        std::this_thread::sleep_for(std::chrono::milliseconds(100));
+    }
+    // Destructor called stop(), which cancelled all timer events. No UAF.
+    SUCCEED();
+}
+
+// Test constants are reasonable
+TEST_F(AdaptiveThreadPoolControllerTest, TestConstants) {
+    EXPECT_EQ(AdaptiveThreadPoolController::kDefaultIntervalMs, 10000);
+    EXPECT_EQ(AdaptiveThreadPoolController::kQueueThreshold, 10);
+    EXPECT_EQ(AdaptiveThreadPoolController::kIOBusyThresholdPercent, 90);
+    EXPECT_EQ(AdaptiveThreadPoolController::kCPUBusyThresholdPercent, 90);
+    EXPECT_EQ(AdaptiveThreadPoolController::kS3QueueBusyThreshold, 100);
+}
+
+// Test add after controller is already running
+TEST_F(AdaptiveThreadPoolControllerTest, TestAddAfterStart) {
+    config::enable_adaptive_flush_threads = true;
+
+    AdaptiveThreadPoolController controller;
+    controller.init(nullptr, nullptr);
+    controller.add(
+            "group_a", {_pool.get()},
+            [](int current, int min_t, int max_t, std::string&) { return 
current; }, 4, 0.5);
+
+    // Add a second group while the controller is already running
+    controller.add(
+            "group_b", {_pool2.get()},
+            [](int current, int min_t, int max_t, std::string&) { return 
current; }, 2, 0.5);
+
+    int num_cpus = std::thread::hardware_concurrency();
+    if (num_cpus <= 0) num_cpus = 1;
+    EXPECT_EQ(controller.get_current_threads("group_b"), num_cpus * 2);
+
+    controller.stop();
+}
+
+// Test is_io_busy and is_cpu_busy with null system_metrics
+TEST_F(AdaptiveThreadPoolControllerTest, TestIoBusyCpuBusyWithNullMetrics) {
+    AdaptiveThreadPoolController controller;
+    controller.init(nullptr, nullptr);
+
+    EXPECT_FALSE(controller.is_io_busy());
+    EXPECT_FALSE(controller.is_cpu_busy());
+}
+
+// Test adjust function that uses controller's is_io_busy/is_cpu_busy
+TEST_F(AdaptiveThreadPoolControllerTest, TestAdjustFuncWithControllerMethods) {
+    config::enable_adaptive_flush_threads = true;
+
+    AdaptiveThreadPoolController controller;
+    controller.init(nullptr, nullptr);
+
+    auto* ctrl = &controller;
+    controller.add(
+            "test", {_pool.get()},
+            [ctrl](int current, int min_t, int max_t, std::string&) {
+                int target = current;
+                if (ctrl->is_io_busy()) target = std::max(min_t, target - 1);
+                if (ctrl->is_cpu_busy()) target = std::max(min_t, target - 1);
+                return target;
+            },
+            4, 0.5);
+
+    int initial = controller.get_current_threads("test");
+    controller.adjust_once();
+    // With null metrics, io_busy and cpu_busy return false, so no change
+    EXPECT_EQ(controller.get_current_threads("test"), initial);
+}
+
+// Test cancel removes the pool group
+TEST_F(AdaptiveThreadPoolControllerTest, TestCancel) {
+    AdaptiveThreadPoolController controller;
+    controller.init(nullptr, nullptr);
+
+    controller.add(
+            "test", {_pool.get()},
+            [](int current, int min_t, int max_t, std::string&) { return 
current; }, 4, 0.5);
+    EXPECT_NE(controller.get_current_threads("test"), 0);
+
+    controller.cancel("test");
+    EXPECT_EQ(controller.get_current_threads("test"), 0);
+}
+
+} // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to