This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 654da806dad [feat](memtable) support adaptive memtable flush thread
pool adjustment (#60617)
654da806dad is described below
commit 654da806dadb52fa16f06cf4ef4c4b0f544913ba
Author: hui lai <[email protected]>
AuthorDate: Wed Apr 1 15:38:39 2026 +0800
[feat](memtable) support adaptive memtable flush thread pool adjustment
(#60617)
### What problem does this PR solve?
Issue Number: close https://github.com/apache/doris/issues/60616
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
be/src/cloud/cloud_storage_engine.cpp | 5 +
be/src/common/config.cpp | 6 +
be/src/common/config.h | 5 +
be/src/load/memtable/memtable_flush_executor.cpp | 49 ++--
be/src/load/memtable/memtable_flush_executor.h | 9 +
be/src/load/memtable/memtable_memory_limiter.h | 2 +
be/src/runtime/exec_env_init.cpp | 2 +-
be/src/runtime/workload_group/workload_group.cpp | 36 ++-
be/src/runtime/workload_group/workload_group.h | 2 +
be/src/storage/adaptive_thread_pool_controller.cpp | 311 +++++++++++++++++++++
be/src/storage/adaptive_thread_pool_controller.h | 156 +++++++++++
be/src/storage/olap_server.cpp | 3 +
be/src/storage/storage_engine.cpp | 23 ++
be/src/storage/storage_engine.h | 6 +
.../load/memtable/memtable_flush_executor_test.cpp | 18 +-
.../adaptive_thread_pool_controller_test.cpp | 283 +++++++++++++++++++
16 files changed, 885 insertions(+), 31 deletions(-)
diff --git a/be/src/cloud/cloud_storage_engine.cpp
b/be/src/cloud/cloud_storage_engine.cpp
index 927d5bef343..b1882a3c1bd 100644
--- a/be/src/cloud/cloud_storage_engine.cpp
+++ b/be/src/cloud/cloud_storage_engine.cpp
@@ -44,6 +44,7 @@
#include "cloud/cloud_warm_up_manager.h"
#include "cloud/config.h"
#include "common/config.h"
+#include "common/metrics/doris_metrics.h"
#include "common/signal_handler.h"
#include "common/status.h"
#include "core/assert_cast.h"
@@ -56,6 +57,7 @@
#include "io/hdfs_util.h"
#include "io/io_common.h"
#include "load/memtable/memtable_flush_executor.h"
+#include "runtime/exec_env.h"
#include "runtime/memory/cache_manager.h"
#include "storage/compaction/cumulative_compaction_policy.h"
#include "storage/compaction/cumulative_compaction_time_series_policy.h"
@@ -276,6 +278,7 @@ void CloudStorageEngine::stop() {
if (_cumu_compaction_thread_pool) {
_cumu_compaction_thread_pool->shutdown();
}
+ _adaptive_thread_controller.stop();
LOG(INFO) << "Cloud storage engine is stopped.";
if (_calc_tablet_delete_bitmap_task_thread_pool) {
@@ -389,6 +392,8 @@ Status
CloudStorageEngine::start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sp
&_bg_threads.emplace_back()));
LOG(INFO) << "check tablet delete bitmap score thread started";
+ _start_adaptive_thread_controller();
+
return Status::OK();
}
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 5227059c066..8fb76ab66ad 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -827,6 +827,12 @@ DEFINE_mInt32(high_priority_flush_thread_num_per_store,
"6");
// max_flush_thread_num_per_cpu * num_cpu)
DEFINE_mInt32(max_flush_thread_num_per_cpu, "4");
+// minimum flush threads per cpu when adaptive flush is enabled (default 0.5)
+DEFINE_mDouble(min_flush_thread_num_per_cpu, "0.5");
+
+// Whether to enable adaptive flush thread adjustment
+DEFINE_mBool(enable_adaptive_flush_threads, "true");
+
// config for tablet meta checkpoint
DEFINE_mInt32(tablet_meta_checkpoint_min_new_rowsets_num, "10");
DEFINE_mInt32(tablet_meta_checkpoint_min_interval_secs, "600");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 4a7c7a3634d..45b1d6b4069 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -880,6 +880,11 @@ DECLARE_mInt32(high_priority_flush_thread_num_per_store);
// number of threads = min(flush_thread_num_per_store * num_store,
// max_flush_thread_num_per_cpu * num_cpu)
DECLARE_mInt32(max_flush_thread_num_per_cpu);
+// minimum flush threads per cpu when adaptive flush is enabled (default 0.5)
+DECLARE_mDouble(min_flush_thread_num_per_cpu);
+
+// Whether to enable adaptive flush thread adjustment
+DECLARE_mBool(enable_adaptive_flush_threads);
// config for tablet meta checkpoint
DECLARE_mInt32(tablet_meta_checkpoint_min_new_rowsets_num);
diff --git a/be/src/load/memtable/memtable_flush_executor.cpp
b/be/src/load/memtable/memtable_flush_executor.cpp
index b36d602c903..ca79b1a8345 100644
--- a/be/src/load/memtable/memtable_flush_executor.cpp
+++ b/be/src/load/memtable/memtable_flush_executor.cpp
@@ -27,6 +27,7 @@
#include "common/logging.h"
#include "common/metrics/doris_metrics.h"
#include "common/metrics/metrics.h"
+#include "common/metrics/system_metrics.h"
#include "common/signal_handler.h"
#include "load/memtable/memtable.h"
#include "runtime/thread_context.h"
@@ -295,45 +296,53 @@ void
FlushToken::_flush_memtable(std::shared_ptr<MemTable> memtable_ptr, int32_t
_stats.flush_disk_size_bytes += flush_size;
}
+std::pair<int, int> MemTableFlushExecutor::calc_flush_thread_count(int
num_cpus, int num_disk,
+ int
thread_num_per_store) {
+ if (config::enable_adaptive_flush_threads && num_cpus > 0) {
+ int min = std::max(1, (int)(num_cpus *
config::min_flush_thread_num_per_cpu));
+ int max = std::max(min, num_cpus *
config::max_flush_thread_num_per_cpu);
+ return {min, max};
+ }
+ int min = std::max(1, thread_num_per_store);
+ int max = num_cpus == 0
+ ? num_disk * min
+ : std::min(num_disk * min, num_cpus *
config::max_flush_thread_num_per_cpu);
+ return {min, max};
+}
+
void MemTableFlushExecutor::init(int num_disk) {
_num_disk = std::max(1, num_disk);
int num_cpus = std::thread::hardware_concurrency();
- int min_threads = std::max(1, config::flush_thread_num_per_store);
- int max_threads = num_cpus == 0 ? _num_disk * min_threads
- : std::min(_num_disk * min_threads,
- num_cpus *
config::max_flush_thread_num_per_cpu);
+
+ auto [min_threads, max_threads] =
+ calc_flush_thread_count(num_cpus, _num_disk,
config::flush_thread_num_per_store);
static_cast<void>(ThreadPoolBuilder("MemTableFlushThreadPool")
.set_min_threads(min_threads)
.set_max_threads(max_threads)
.build(&_flush_pool));
- min_threads = std::max(1,
config::high_priority_flush_thread_num_per_store);
- max_threads = num_cpus == 0 ? _num_disk * min_threads
- : std::min(_num_disk * min_threads,
- num_cpus *
config::max_flush_thread_num_per_cpu);
+ auto [hi_min, hi_max] = calc_flush_thread_count(
+ num_cpus, _num_disk,
config::high_priority_flush_thread_num_per_store);
static_cast<void>(ThreadPoolBuilder("MemTableHighPriorityFlushThreadPool")
- .set_min_threads(min_threads)
- .set_max_threads(max_threads)
+ .set_min_threads(hi_min)
+ .set_max_threads(hi_max)
.build(&_high_prio_flush_pool));
}
void MemTableFlushExecutor::update_memtable_flush_threads() {
int num_cpus = std::thread::hardware_concurrency();
- int min_threads = std::max(1, config::flush_thread_num_per_store);
- int max_threads = num_cpus == 0 ? _num_disk * min_threads
- : std::min(_num_disk * min_threads,
- num_cpus *
config::max_flush_thread_num_per_cpu);
+
+ auto [min_threads, max_threads] =
+ calc_flush_thread_count(num_cpus, _num_disk,
config::flush_thread_num_per_store);
// Update max_threads first to avoid constraint violation when increasing
min_threads
static_cast<void>(_flush_pool->set_max_threads(max_threads));
static_cast<void>(_flush_pool->set_min_threads(min_threads));
- min_threads = std::max(1,
config::high_priority_flush_thread_num_per_store);
- max_threads = num_cpus == 0 ? _num_disk * min_threads
- : std::min(_num_disk * min_threads,
- num_cpus *
config::max_flush_thread_num_per_cpu);
+ auto [hi_min, hi_max] = calc_flush_thread_count(
+ num_cpus, _num_disk,
config::high_priority_flush_thread_num_per_store);
// Update max_threads first to avoid constraint violation when increasing
min_threads
- static_cast<void>(_high_prio_flush_pool->set_max_threads(max_threads));
- static_cast<void>(_high_prio_flush_pool->set_min_threads(min_threads));
+ static_cast<void>(_high_prio_flush_pool->set_max_threads(hi_max));
+ static_cast<void>(_high_prio_flush_pool->set_min_threads(hi_min));
}
// NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are
flushed in order.
diff --git a/be/src/load/memtable/memtable_flush_executor.h
b/be/src/load/memtable/memtable_flush_executor.h
index 216aae14f17..ae08bec68c4 100644
--- a/be/src/load/memtable/memtable_flush_executor.h
+++ b/be/src/load/memtable/memtable_flush_executor.h
@@ -33,7 +33,9 @@ namespace doris {
class DataDir;
class MemTable;
+class MemTableMemoryLimiter;
class RowsetWriter;
+class SystemMetrics;
class WorkloadGroup;
// the statistic of a certain flush handler.
@@ -163,8 +165,15 @@ public:
ThreadPool* flush_pool() { return _flush_pool.get(); }
+ ThreadPool* high_prio_flush_pool() { return _high_prio_flush_pool.get(); }
+
void update_memtable_flush_threads();
+ // Returns {min_threads, max_threads} for a flush thread pool.
+ // thread_num_per_store is used as the baseline when adaptive mode is off.
+ static std::pair<int, int> calc_flush_thread_count(int num_cpus, int
num_disk,
+ int
thread_num_per_store);
+
private:
std::unique_ptr<ThreadPool> _flush_pool;
std::unique_ptr<ThreadPool> _high_prio_flush_pool;
diff --git a/be/src/load/memtable/memtable_memory_limiter.h
b/be/src/load/memtable/memtable_memory_limiter.h
index 34dcb2b06b4..6fbcc3cf8c8 100644
--- a/be/src/load/memtable/memtable_memory_limiter.h
+++ b/be/src/load/memtable/memtable_memory_limiter.h
@@ -54,6 +54,8 @@ public:
int64_t mem_usage() const { return _mem_usage; }
+ bool soft_limit_reached() { return _soft_limit_reached(); }
+
private:
static inline int64_t _sys_avail_mem_less_than_warning_water_mark();
static inline int64_t _process_used_mem_more_than_soft_mem_limit();
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 38fec145eef..76fa4dedab0 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -861,7 +861,7 @@ void ExecEnv::destroy() {
static_cast<CloudClusterInfo*>(_cluster_info)->stop_bg_worker();
}
- // StorageEngine must be destoried before _cache_manager destory
+ // StorageEngine must be destoried before _cache_manager destory.
SAFE_STOP(_storage_engine);
_storage_engine.reset();
diff --git a/be/src/runtime/workload_group/workload_group.cpp
b/be/src/runtime/workload_group/workload_group.cpp
index ee945832093..888f85a7a44 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -35,6 +35,8 @@
#include "information_schema/schema_scanner_helper.h"
#include "io/cache/block_file_cache_factory.h"
#include "io/fs/local_file_reader.h"
+#include "load/memtable/memtable_flush_executor.h"
+#include "load/memtable/memtable_memory_limiter.h"
#include "runtime/exec_env.h"
#include "runtime/memory/global_memory_arbitrator.h"
#include "runtime/memory/mem_tracker_limiter.h"
@@ -42,6 +44,7 @@
#include "runtime/runtime_profile.h"
#include "runtime/workload_group/workload_group_metrics.h"
#include "runtime/workload_management/io_throttle.h"
+#include "storage/adaptive_thread_pool_controller.h"
#include "storage/storage_engine.h"
#include "util/mem_info.h"
#include "util/parse_util.h"
@@ -434,11 +437,9 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
num_cpus = std::thread::hardware_concurrency();
#endif
num_disk = std::max(1, num_disk);
- int min_flush_thread_num = std::max(1, config::flush_thread_num_per_store);
- int max_flush_thread_num = num_cpus == 0
- ? num_disk * min_flush_thread_num
- : std::min(num_disk *
min_flush_thread_num,
- num_cpus *
config::max_flush_thread_num_per_cpu);
+ auto [min_flush_thread_num, max_flush_thread_num] =
+ MemTableFlushExecutor::calc_flush_thread_count(num_cpus, num_disk,
+
config::flush_thread_num_per_store);
// 12 memory low watermark
int memory_low_watermark = MEMORY_LOW_WATERMARK_DEFAULT_VALUE;
@@ -614,6 +615,17 @@ Status
WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
LOG(INFO) << "[upsert wg thread pool] create " + pool_name + "
succ, gid=" << wg_id
<< ", max thread num=" << max_flush_thread_num
<< ", min thread num=" << min_flush_thread_num;
+ // Register the new pool with adaptive thread controller
+ if (config::enable_adaptive_flush_threads) {
+ auto* controller =
+
ExecEnv::GetInstance()->storage_engine().adaptive_thread_controller();
+ auto* flush_pool = _memtable_flush_pool.get();
+ controller->add("flush_wg_" + std::to_string(_id),
{flush_pool},
+
AdaptiveThreadPoolController::make_flush_adjust_func(controller,
+
flush_pool),
+ config::max_flush_thread_num_per_cpu,
+ config::min_flush_thread_num_per_cpu);
+ }
} else {
upsert_ret = ret;
LOG(INFO) << "[upsert wg thread pool] create " + pool_name + "
failed, gid=" << wg_id;
@@ -743,6 +755,14 @@ void WorkloadGroup::try_stop_schedulers() {
_remote_scan_task_sched->stop();
}
if (_memtable_flush_pool) {
+ // Unregister from adaptive controller before destroying the pool to
avoid UAF:
+ // the adjustment loop holds raw ThreadPool* pointers and must not
access them
+ // after the pool is gone.
+ if (config::enable_adaptive_flush_threads) {
+ auto* controller =
+
ExecEnv::GetInstance()->storage_engine().adaptive_thread_controller();
+ controller->cancel("flush_wg_" + std::to_string(_id));
+ }
_memtable_flush_pool->shutdown();
_memtable_flush_pool->wait();
}
@@ -764,10 +784,8 @@ void WorkloadGroup::update_memtable_flush_threads() {
num_cpus = std::thread::hardware_concurrency();
#endif
num_disk = std::max(1, num_disk);
- int min_threads = std::max(1, config::flush_thread_num_per_store);
- int max_threads = num_cpus == 0 ? num_disk * min_threads
- : std::min(num_disk * min_threads,
- num_cpus *
config::max_flush_thread_num_per_cpu);
+ auto [min_threads, max_threads] =
MemTableFlushExecutor::calc_flush_thread_count(
+ num_cpus, num_disk, config::flush_thread_num_per_store);
// Update max_threads first to avoid constraint violation when increasing
min_threads
static_cast<void>(_memtable_flush_pool->set_max_threads(max_threads));
diff --git a/be/src/runtime/workload_group/workload_group.h
b/be/src/runtime/workload_group/workload_group.h
index 3d888580a17..e18b1be0098 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -36,7 +36,9 @@
namespace doris {
class MemTrackerLimiter;
+class MemTableMemoryLimiter;
class RuntimeProfile;
+class SystemMetrics;
class ThreadPool;
class ExecEnv;
class CgroupCpuCtl;
diff --git a/be/src/storage/adaptive_thread_pool_controller.cpp
b/be/src/storage/adaptive_thread_pool_controller.cpp
new file mode 100644
index 00000000000..8b8de54654d
--- /dev/null
+++ b/be/src/storage/adaptive_thread_pool_controller.cpp
@@ -0,0 +1,311 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "storage/adaptive_thread_pool_controller.h"
+
+#include <butil/time.h>
+
+#include <algorithm>
+#include <thread>
+
+#include "cloud/config.h"
+#include "common/config.h"
+#include "common/logging.h"
+#include "common/metrics/system_metrics.h"
+#include "common/status.h"
+#include "util/threadpool.h"
+#include "util/time.h"
+
+namespace doris {
+
+int AdaptiveThreadPoolController::PoolGroup::get_max_threads() const {
+ int num_cpus = std::thread::hardware_concurrency();
+ if (num_cpus <= 0) num_cpus = 1;
+ return static_cast<int>(num_cpus * max_threads_per_cpu);
+}
+
+int AdaptiveThreadPoolController::PoolGroup::get_min_threads() const {
+ int num_cpus = std::thread::hardware_concurrency();
+ if (num_cpus <= 0) num_cpus = 1;
+ return std::max(1, static_cast<int>(num_cpus * min_threads_per_cpu));
+}
+
+// Static callback registered with bthread_timer_add.
+// Runs in brpc TimerThread. Must be fast and non-blocking.
+void AdaptiveThreadPoolController::_on_timer(void* raw) {
+ auto* arg = static_cast<TimerArg*>(raw);
+
+ // Hold mu for the entire callback (fire + re-registration).
+ // cancel() acquires mu after bthread_timer_del, so this provides
+ // cancel-with-wait semantics without a dedicated thread.
+ std::lock_guard<std::mutex> lk(arg->mu);
+
+ if (arg->stopped.load(std::memory_order_acquire)) {
+ // cancel() set stopped before we took the lock.
+ // cancel() owns arg and will delete it after taking mu.
+ return;
+ }
+
+ arg->ctrl->_fire_group(arg->name);
+
+ if (arg->stopped.load(std::memory_order_acquire)) {
+ return; // cancel() will clean up
+ }
+
+ // Re-register the next one-shot timer.
+ bthread_timer_t tid;
+ if (bthread_timer_add(&tid,
butil::milliseconds_from_now(arg->interval_ms), _on_timer, arg) ==
+ 0) {
+ arg->timer_id.store(tid, std::memory_order_release);
+ } else {
+ LOG(WARNING) << "Adaptive: failed to re-register timer for group '" <<
arg->name << "'";
+ }
+}
+
+void AdaptiveThreadPoolController::init(SystemMetrics* system_metrics,
+ ThreadPool* s3_file_upload_pool) {
+ _system_metrics = system_metrics;
+ _s3_file_upload_pool = s3_file_upload_pool;
+}
+
+void AdaptiveThreadPoolController::stop() {
+ std::vector<std::string> names;
+ {
+ std::lock_guard<std::mutex> lk(_mutex);
+ for (const auto& [name, _] : _pool_groups) {
+ names.push_back(name);
+ }
+ }
+ for (const auto& name : names) {
+ cancel(name);
+ }
+}
+
+void AdaptiveThreadPoolController::add(std::string name,
std::vector<ThreadPool*> pools,
+ AdjustFunc adjust_func, double
max_threads_per_cpu,
+ double min_threads_per_cpu, int64_t
interval_ms) {
+ PoolGroup group;
+ group.name = name;
+ group.pools = std::move(pools);
+ group.adjust_func = std::move(adjust_func);
+ group.max_threads_per_cpu = max_threads_per_cpu;
+ group.min_threads_per_cpu = min_threads_per_cpu;
+ group.current_threads = group.get_max_threads();
+
+ int log_max = group.get_max_threads();
+ int log_min = group.get_min_threads();
+
+ auto* arg = new TimerArg();
+ arg->ctrl = this;
+ arg->name = name;
+ arg->interval_ms = interval_ms;
+
+ bthread_timer_t tid;
+ if (bthread_timer_add(&tid, butil::milliseconds_from_now(interval_ms),
_on_timer, arg) == 0) {
+ arg->timer_id.store(tid, std::memory_order_release);
+ } else {
+ LOG(WARNING) << "Adaptive: failed to register timer for pool group '"
<< name << "'";
+ }
+ group.timer_arg = arg;
+
+ {
+ std::lock_guard<std::mutex> lk(_mutex);
+ _pool_groups[name] = std::move(group);
+ }
+
+ LOG(INFO) << "Adaptive: added pool group '" << name << "'"
+ << ", max_threads=" << log_max << ", min_threads=" << log_min
+ << ", interval_ms=" << interval_ms;
+}
+
+void AdaptiveThreadPoolController::cancel(const std::string& name) {
+ TimerArg* arg = nullptr;
+ {
+ std::lock_guard<std::mutex> lk(_mutex);
+ auto it = _pool_groups.find(name);
+ if (it != _pool_groups.end()) {
+ arg = it->second.timer_arg;
+ _pool_groups.erase(it);
+ }
+ }
+
+ if (arg == nullptr) {
+ return;
+ }
+
+ // Signal the callback to stop re-registering.
+ arg->stopped.store(true, std::memory_order_release);
+
+ // Try to cancel a pending (not yet fired) timer. Read timer_id after
+ // setting stopped so any re-registration in a concurrent callback has
+ // already stored the latest id by now (it holds mu, which we haven't
+ // taken yet).
+ bthread_timer_t tid = arg->timer_id.load(std::memory_order_acquire);
+ bthread_timer_del(tid); // returns non-zero if already fired; that's fine
+
+ // Wait for any in-flight callback to finish. The callback holds mu while
+ // running _fire_group and re-registering, so acquiring mu here ensures
+ // we don't free arg while the callback is still executing.
+ { std::lock_guard<std::mutex> lk(arg->mu); }
+
+ delete arg;
+ LOG(INFO) << "Adaptive: cancelled pool group '" << name << "'";
+}
+
+// Called from _on_timer. No lock held on entry.
+void AdaptiveThreadPoolController::_fire_group(const std::string& name) {
+ if (!config::enable_adaptive_flush_threads) {
+ return;
+ }
+ // Phase 1: snapshot parameters under the lock.
+ AdjustFunc fn;
+ int current, min_t, max_t;
+ {
+ std::lock_guard<std::mutex> lk(_mutex);
+ auto it = _pool_groups.find(name);
+ if (it == _pool_groups.end()) return;
+ const PoolGroup& g = it->second;
+ fn = g.adjust_func;
+ current = g.current_threads;
+ min_t = g.get_min_threads();
+ max_t = g.get_max_threads();
+ }
+
+ // Phase 2: compute target — no lock held (adjust_func may call is_io_busy
etc.).
+ std::string reason;
+ int target = fn(current, min_t, max_t, reason);
+
+ // Phase 3: apply under lock; recheck in case cancel() raced with us.
+ std::lock_guard<std::mutex> lk(_mutex);
+ auto it = _pool_groups.find(name);
+ if (it == _pool_groups.end()) return;
+ _apply_thread_count(it->second, target, reason);
+}
+
+// Fire all groups once regardless of schedule. For testing.
+void AdaptiveThreadPoolController::adjust_once() {
+ std::vector<std::string> names;
+ {
+ std::lock_guard<std::mutex> lk(_mutex);
+ for (const auto& [name, _] : _pool_groups) {
+ names.push_back(name);
+ }
+ }
+ for (const auto& name : names) {
+ _fire_group(name);
+ }
+}
+
+void AdaptiveThreadPoolController::_apply_thread_count(PoolGroup& group, int
target_threads,
+ const std::string&
reason) {
+ int max_threads = group.get_max_threads();
+ int min_threads = group.get_min_threads();
+ target_threads = std::max(min_threads, std::min(max_threads,
target_threads));
+ if (target_threads == group.current_threads) return;
+
+ LOG(INFO) << "Adaptive[" << group.name << "]: adjusting threads from " <<
group.current_threads
+ << " to " << target_threads << " (min=" << min_threads << ",
max=" << max_threads
+ << ")" << (reason.empty() ? "" : " reason=[" + reason + "]");
+
+ bool all_success = true;
+ for (auto* pool : group.pools) {
+ if (pool == nullptr) continue;
+ // Always sync min_threads to guard against races with
update_memtable_flush_threads().
+ // Order matters: when increasing, set max first so max >= min is
always satisfied;
+ // when decreasing, set min first so the new max is never below min.
+ Status st;
+ if (target_threads >= group.current_threads) {
+ st = pool->set_max_threads(target_threads);
+ if (st.ok()) static_cast<void>(pool->set_min_threads(min_threads));
+ } else {
+ st = pool->set_min_threads(min_threads);
+ if (st.ok()) st = pool->set_max_threads(target_threads);
+ }
+ if (!st.ok()) {
+ all_success = false;
+ LOG(WARNING) << "Adaptive[" << group.name << "]: failed to set
threads: " << st;
+ }
+ }
+ if (all_success) {
+ group.current_threads = target_threads;
+ }
+}
+
+int AdaptiveThreadPoolController::get_current_threads(const std::string& name)
const {
+ std::lock_guard<std::mutex> lk(_mutex);
+ auto it = _pool_groups.find(name);
+ return it != _pool_groups.end() ? it->second.current_threads : 0;
+}
+
+bool AdaptiveThreadPoolController::is_io_busy() {
+ if (config::is_cloud_mode()) {
+ if (_s3_file_upload_pool == nullptr) return false;
+ int queue_size = _s3_file_upload_pool->get_queue_size();
+ return queue_size > kS3QueueBusyThreshold;
+ }
+
+ if (_system_metrics == nullptr) return false;
+
+ int64_t current_time_sec = MonotonicSeconds();
+ int64_t interval_sec = current_time_sec - _last_check_time_sec;
+ if (interval_sec <= 0) {
+ return _last_io_busy;
+ }
+
+ int64_t max_io_util = _system_metrics->get_max_io_util(_last_disk_io_time,
interval_sec);
+ _system_metrics->get_disks_io_time(&_last_disk_io_time);
+ _last_check_time_sec = current_time_sec;
+
+ _last_io_busy = max_io_util > kIOBusyThresholdPercent;
+ return _last_io_busy;
+}
+
+bool AdaptiveThreadPoolController::is_cpu_busy() {
+ if (_system_metrics == nullptr) return false;
+
+ double load_avg = _system_metrics->get_load_average_1_min();
+ int num_cpus = std::thread::hardware_concurrency();
+ if (num_cpus <= 0) return false;
+
+ double cpu_usage_percent = (load_avg / num_cpus) * 100.0;
+ return cpu_usage_percent > kCPUBusyThresholdPercent;
+}
+
+AdaptiveThreadPoolController::AdjustFunc
AdaptiveThreadPoolController::make_flush_adjust_func(
+ AdaptiveThreadPoolController* controller, ThreadPool* flush_pool) {
+ return [controller, flush_pool](int current, int min_t, int max_t,
std::string& reason) {
+ int target = current;
+ int queue_size = flush_pool->get_queue_size();
+ if (queue_size > kQueueThreshold) {
+ target = std::min(max_t, target + 1);
+ reason += "queue_size=" + std::to_string(queue_size) + ">" +
+ std::to_string(kQueueThreshold) + " -> target=" +
std::to_string(target) +
+ "; ";
+ }
+ if (controller->is_io_busy()) {
+ target = std::max(min_t, target - 2);
+ reason += "io_busy -> target=" + std::to_string(target) + "; ";
+ }
+ if (controller->is_cpu_busy()) {
+ target = std::max(min_t, target - 2);
+ reason += "cpu_busy -> target=" + std::to_string(target) + "; ";
+ }
+ return target;
+ };
+}
+
+} // namespace doris
diff --git a/be/src/storage/adaptive_thread_pool_controller.h
b/be/src/storage/adaptive_thread_pool_controller.h
new file mode 100644
index 00000000000..4a6f0096f62
--- /dev/null
+++ b/be/src/storage/adaptive_thread_pool_controller.h
@@ -0,0 +1,156 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <bthread/unstable.h>
+
+#include <atomic>
+#include <cstdint>
+#include <functional>
+#include <map>
+#include <mutex>
+#include <string>
+#include <vector>
+
+namespace doris {
+
+class ThreadPool;
+class SystemMetrics;
+class AdaptiveThreadPoolController;
+
+// Each pool group's timer state. Heap-allocated; shared between the controller
+// and the brpc TimerThread callback.
+struct TimerArg {
+ AdaptiveThreadPoolController* ctrl; // never null
+ std::string name;
+ int64_t interval_ms;
+
+ // Set by cancel() before calling bthread_timer_del. The callback checks
+ // this flag after acquiring `mu` and skips re-registration when true.
+ std::atomic<bool> stopped {false};
+
+ // Tracks the most recently registered timer id. Updated under `mu` by the
+ // callback after each re-registration; read by cancel() to call
+ // bthread_timer_del on the latest pending timer.
+ std::atomic<bthread_timer_t> timer_id {0};
+
+ // Held for the entire duration of the callback (fire + re-registration).
+ // cancel() acquires it after bthread_timer_del to wait for any in-flight
+ // invocation to complete before freeing `this`.
+ std::mutex mu;
+};
+
+// AdaptiveThreadPoolController dynamically adjusts thread pool sizes based on
+// system load (IO utilisation, CPU load average, flush queue depth).
+//
+// Each registered pool group runs as a one-shot bthread_timer_add chain: the
+// callback fires, adjusts the pool, then re-registers the next one-shot timer.
+// All groups share the single brpc TimerThread, keeping the overhead minimal.
+//
+// Usage:
+// AdaptiveThreadPoolController ctrl;
+// ctrl.init(system_metrics, s3_pool);
+// ctrl.add("flush", {pool1, pool2},
+// AdaptiveThreadPoolController::make_flush_adjust_func(&ctrl, pool1),
+// max_per_cpu, min_per_cpu);
+// // ... later ...
+// ctrl.cancel("flush"); // or ctrl.stop()
+class AdaptiveThreadPoolController {
+public:
+ using AdjustFunc =
+ std::function<int(int current, int min_threads, int max_threads,
std::string& reason)>;
+
+ static constexpr int kDefaultIntervalMs = 10000;
+
+ static constexpr int kQueueThreshold = 10;
+ static constexpr int kIOBusyThresholdPercent = 90;
+ static constexpr int kCPUBusyThresholdPercent = 90;
+ static constexpr int kS3QueueBusyThreshold = 100;
+
+ AdaptiveThreadPoolController() = default;
+ ~AdaptiveThreadPoolController() { stop(); }
+
+ // Initialize with system-level dependencies.
+ void init(SystemMetrics* system_metrics, ThreadPool* s3_file_upload_pool);
+
+ // Cancel all registered pool groups. Must be called before the pools are
destroyed.
+ void stop();
+
+ // Register a pool group and start a recurring bthread_timer_add chain.
+ void add(std::string name, std::vector<ThreadPool*> pools, AdjustFunc
adjust_func,
+ double max_threads_per_cpu, double min_threads_per_cpu,
+ int64_t interval_ms = kDefaultIntervalMs);
+
+ // Cancel the timer chain and remove the pool group. Blocks until any
+ // in-flight callback finishes, then returns. Safe to call before pool
teardown.
+ void cancel(const std::string& name);
+
+ // Fire all registered groups once, ignoring the schedule. For testing.
+ void adjust_once();
+
+ // Get current thread count for a named group. For testing/debugging.
+ int get_current_threads(const std::string& name) const;
+
+ // System-state helpers; safe to call from inside an AdjustFunc.
+ bool is_io_busy();
+ bool is_cpu_busy();
+
+ // Factory: standard flush-pool adjust function.
+ static AdjustFunc make_flush_adjust_func(AdaptiveThreadPoolController*
controller,
+ ThreadPool* flush_pool);
+
+ // Callback registered with bthread_timer_add. Public only for the C
linkage
+ // requirement; do not call directly.
+ static void _on_timer(void* arg);
+
+private:
+ struct PoolGroup {
+ std::string name;
+ std::vector<ThreadPool*> pools;
+ AdjustFunc adjust_func;
+ double max_threads_per_cpu = 4.0;
+ double min_threads_per_cpu = 0.5;
+ int current_threads = 0;
+ TimerArg* timer_arg = nullptr; // owned; freed by cancel()
+
+ int get_max_threads() const;
+ int get_min_threads() const;
+ };
+
+ // Run one group's adjustment. Called from _on_timer (no lock on entry).
+ void _fire_group(const std::string& name);
+
+ void _apply_thread_count(PoolGroup& group, int target_threads, const
std::string& reason);
+
+private:
+ SystemMetrics* _system_metrics = nullptr;
+ ThreadPool* _s3_file_upload_pool = nullptr;
+
+ mutable std::mutex _mutex;
+ std::map<std::string, PoolGroup> _pool_groups;
+
+ // Last successfully computed IO-busy result. Returned as-is when the
+ // measurement interval is too short to produce a valid new delta.
+ bool _last_io_busy = false;
+
+ // For disk IO util calculation (used by is_io_busy).
+ std::map<std::string, int64_t> _last_disk_io_time;
+ int64_t _last_check_time_sec = 0;
+};
+
+} // namespace doris
diff --git a/be/src/storage/olap_server.cpp b/be/src/storage/olap_server.cpp
index 498cc8e6c31..ffe9d97bd06 100644
--- a/be/src/storage/olap_server.cpp
+++ b/be/src/storage/olap_server.cpp
@@ -56,6 +56,7 @@
#include "cpp/sync_point.h"
#include "io/fs/file_writer.h" // IWYU pragma: keep
#include "io/fs/path.h"
+#include "load/memtable/memtable_flush_executor.h"
#include "runtime/memory/cache_manager.h"
#include "runtime/memory/global_memory_arbitrator.h"
#include "storage/compaction/cold_data_compaction.h"
@@ -357,6 +358,8 @@ Status
StorageEngine::start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr) {
&_check_delete_bitmap_score_thread));
LOG(INFO) << "check tablet delete bitmap score thread started";
+ _start_adaptive_thread_controller();
+
LOG(INFO) << "all storage engine's background threads are started.";
return Status::OK();
}
diff --git a/be/src/storage/storage_engine.cpp
b/be/src/storage/storage_engine.cpp
index cb49a5ac149..aeda65e21ca 100644
--- a/be/src/storage/storage_engine.cpp
+++ b/be/src/storage/storage_engine.cpp
@@ -59,6 +59,7 @@
#include "io/fs/local_file_system.h"
#include "load/memtable/memtable_flush_executor.h"
#include "load/stream_load/stream_load_recorder.h"
+#include "runtime/exec_env.h"
#include "storage/binlog.h"
#include "storage/cache/schema_cache.h"
#include "storage/compaction/single_replica_compaction.h"
@@ -140,6 +141,27 @@ int64_t
BaseStorageEngine::memory_limitation_bytes_per_thread_for_schema_change(
config::memory_limitation_per_thread_for_schema_change_bytes);
}
+void BaseStorageEngine::_start_adaptive_thread_controller() {
+ if (!config::enable_adaptive_flush_threads) {
+ return;
+ }
+
+ auto* system_metrics = DorisMetrics::instance()->system_metrics();
+ auto* s3_upload_pool =
ExecEnv::GetInstance()->s3_file_upload_thread_pool();
+
+ _adaptive_thread_controller.init(system_metrics, s3_upload_pool);
+
+ if (_memtable_flush_executor) {
+ auto* flush_pool = _memtable_flush_executor->flush_pool();
+ auto* high_prio_pool =
_memtable_flush_executor->high_prio_flush_pool();
+ _adaptive_thread_controller.add("flush", {flush_pool, high_prio_pool},
+
AdaptiveThreadPoolController::make_flush_adjust_func(
+ &_adaptive_thread_controller,
flush_pool),
+ config::max_flush_thread_num_per_cpu,
+ config::min_flush_thread_num_per_cpu);
+ }
+}
+
Status BaseStorageEngine::init_stream_load_recorder(const std::string&
stream_load_record_path) {
LOG(INFO) << "stream load record path: " << stream_load_record_path;
// init stream load record rocksdb
@@ -769,6 +791,7 @@ void StorageEngine::stop() {
_cooldown_thread_pool->shutdown();
}
+ _adaptive_thread_controller.stop();
_memtable_flush_executor.reset(nullptr);
_calc_delete_bitmap_executor.reset(nullptr);
_calc_delete_bitmap_executor_for_load.reset();
diff --git a/be/src/storage/storage_engine.h b/be/src/storage/storage_engine.h
index 8b50d1c4d9b..b37aeaf475d 100644
--- a/be/src/storage/storage_engine.h
+++ b/be/src/storage/storage_engine.h
@@ -41,6 +41,7 @@
#include "common/config.h"
#include "common/status.h"
#include "runtime/heartbeat_flags.h"
+#include "storage/adaptive_thread_pool_controller.h"
#include "storage/compaction/compaction_permit_limiter.h"
#include "storage/delete/calc_delete_bitmap_executor.h"
#include "storage/olap_common.h"
@@ -140,6 +141,9 @@ public:
RowsetId next_rowset_id();
MemTableFlushExecutor* memtable_flush_executor() { return
_memtable_flush_executor.get(); }
+ AdaptiveThreadPoolController* adaptive_thread_controller() {
+ return &_adaptive_thread_controller;
+ }
CalcDeleteBitmapExecutor* calc_delete_bitmap_executor() {
return _calc_delete_bitmap_executor.get();
}
@@ -163,6 +167,7 @@ public:
}
protected:
+ void _start_adaptive_thread_controller();
void _evict_querying_rowset();
void _evict_quring_rowset_thread_callback();
bool _should_delay_large_task();
@@ -176,6 +181,7 @@ protected:
std::unique_ptr<RowsetIdGenerator> _rowset_id_generator;
std::unique_ptr<MemTableFlushExecutor> _memtable_flush_executor;
+ AdaptiveThreadPoolController _adaptive_thread_controller;
std::unique_ptr<CalcDeleteBitmapExecutor> _calc_delete_bitmap_executor;
std::unique_ptr<CalcDeleteBitmapExecutor>
_calc_delete_bitmap_executor_for_load;
CountDownLatch _stop_background_threads_latch;
diff --git a/be/test/load/memtable/memtable_flush_executor_test.cpp
b/be/test/load/memtable/memtable_flush_executor_test.cpp
index 864b0d7be62..8b5d2990c06 100644
--- a/be/test/load/memtable/memtable_flush_executor_test.cpp
+++ b/be/test/load/memtable/memtable_flush_executor_test.cpp
@@ -82,6 +82,7 @@ TEST(MemTableFlushExecutorTest, TestDynamicThreadPoolUpdate) {
int32_t original_high_priority_flush_thread_num =
config::high_priority_flush_thread_num_per_store;
int32_t original_max_flush_thread_num =
config::max_flush_thread_num_per_cpu;
+ bool original_adaptive = config::enable_adaptive_flush_threads;
// Test 1: Get initial thread pool sizes
int initial_max_threads = flush_executor->flush_pool()->max_threads();
@@ -89,6 +90,9 @@ TEST(MemTableFlushExecutorTest, TestDynamicThreadPoolUpdate) {
EXPECT_GT(initial_max_threads, 0);
EXPECT_GT(initial_min_threads, 0);
+ // Disable adaptive mode so flush_thread_num_per_store takes effect
+ config::enable_adaptive_flush_threads = false;
+
// Test 2: Update flush_thread_num_per_store and verify thread pool updates
config::flush_thread_num_per_store = 10;
flush_executor->update_memtable_flush_threads();
@@ -126,6 +130,7 @@ TEST(MemTableFlushExecutorTest,
TestDynamicThreadPoolUpdate) {
config::flush_thread_num_per_store = original_flush_thread_num;
config::high_priority_flush_thread_num_per_store =
original_high_priority_flush_thread_num;
config::max_flush_thread_num_per_cpu = original_max_flush_thread_num;
+ config::enable_adaptive_flush_threads = original_adaptive;
flush_executor->update_memtable_flush_threads();
// Cleanup
@@ -141,6 +146,11 @@ TEST(MemTableFlushExecutorTest, TestConfigUpdateTrigger) {
// Store original config values
int32_t original_flush_thread_num = config::flush_thread_num_per_store;
+ bool original_adaptive = config::enable_adaptive_flush_threads;
+
+ // Disable adaptive mode so flush_thread_num_per_store takes effect
+ config::enable_adaptive_flush_threads = false;
+ flush_executor->update_memtable_flush_threads();
// Get initial thread pool size
int initial_min_threads = flush_executor->flush_pool()->min_threads();
@@ -154,8 +164,9 @@ TEST(MemTableFlushExecutorTest, TestConfigUpdateTrigger) {
EXPECT_EQ(updated_min_threads, 15);
EXPECT_NE(updated_min_threads, initial_min_threads);
- // Restore original config value
+ // Restore original config values
config::flush_thread_num_per_store = original_flush_thread_num;
+ config::enable_adaptive_flush_threads = original_adaptive;
flush_executor->update_memtable_flush_threads();
// Cleanup
@@ -172,6 +183,10 @@ TEST(MemTableFlushExecutorTest,
TestThreadPoolMinMaxRelationship) {
// Store original config values
int32_t original_flush_thread_num = config::flush_thread_num_per_store;
int32_t original_max_flush_thread_num =
config::max_flush_thread_num_per_cpu;
+ bool original_adaptive = config::enable_adaptive_flush_threads;
+
+ // Disable adaptive mode so flush_thread_num_per_store takes effect
+ config::enable_adaptive_flush_threads = false;
// Test: Ensure min_threads <= max_threads always
config::flush_thread_num_per_store = 20;
@@ -185,6 +200,7 @@ TEST(MemTableFlushExecutorTest,
TestThreadPoolMinMaxRelationship) {
// Restore original config values
config::flush_thread_num_per_store = original_flush_thread_num;
config::max_flush_thread_num_per_cpu = original_max_flush_thread_num;
+ config::enable_adaptive_flush_threads = original_adaptive;
flush_executor->update_memtable_flush_threads();
// Cleanup
diff --git a/be/test/storage/adaptive_thread_pool_controller_test.cpp
b/be/test/storage/adaptive_thread_pool_controller_test.cpp
new file mode 100644
index 00000000000..b2d6464fcef
--- /dev/null
+++ b/be/test/storage/adaptive_thread_pool_controller_test.cpp
@@ -0,0 +1,283 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "storage/adaptive_thread_pool_controller.h"
+
+#include <gtest/gtest.h>
+
+#include <thread>
+
+#include "common/config.h"
+#include "util/threadpool.h"
+
+namespace doris {
+
+class AdaptiveThreadPoolControllerTest : public testing::Test {
+protected:
+ void SetUp() override {
+ _original_enable_adaptive = config::enable_adaptive_flush_threads;
+
+ ASSERT_TRUE(ThreadPoolBuilder("TestPool")
+ .set_min_threads(2)
+ .set_max_threads(64)
+ .build(&_pool)
+ .ok());
+
+ ASSERT_TRUE(ThreadPoolBuilder("TestPool2")
+ .set_min_threads(2)
+ .set_max_threads(64)
+ .build(&_pool2)
+ .ok());
+ }
+
+ void TearDown() override {
+ config::enable_adaptive_flush_threads = _original_enable_adaptive;
+ if (_pool) _pool->shutdown();
+ if (_pool2) _pool2->shutdown();
+ }
+
+ bool _original_enable_adaptive;
+ std::unique_ptr<ThreadPool> _pool;
+ std::unique_ptr<ThreadPool> _pool2;
+};
+
+// Test basic add and get_current_threads
+TEST_F(AdaptiveThreadPoolControllerTest, TestAddPoolGroup) {
+ AdaptiveThreadPoolController controller;
+ controller.init(nullptr, nullptr);
+
+ controller.add(
+ "test", {_pool.get()},
+ [](int current, int min_t, int max_t, std::string&) { return
current; }, 4, 0.5);
+
+ int num_cpus = std::thread::hardware_concurrency();
+ if (num_cpus <= 0) num_cpus = 1;
+
+ EXPECT_EQ(controller.get_current_threads("test"), num_cpus * 4);
+ EXPECT_EQ(controller.get_current_threads("nonexistent"), 0);
+}
+
+// Test adding multiple pool groups with different adjust logic
+TEST_F(AdaptiveThreadPoolControllerTest, TestMultiplePoolGroups) {
+ AdaptiveThreadPoolController controller;
+ controller.init(nullptr, nullptr);
+
+ controller.add(
+ "group_a", {_pool.get()},
+ [](int current, int min_t, int max_t, std::string&) { return
current; }, 4, 0.5);
+ controller.add(
+ "group_b", {_pool2.get()},
+ [](int current, int min_t, int max_t, std::string&) { return
current; }, 2, 0.5);
+
+ int num_cpus = std::thread::hardware_concurrency();
+ if (num_cpus <= 0) num_cpus = 1;
+
+ EXPECT_EQ(controller.get_current_threads("group_a"), num_cpus * 4);
+ EXPECT_EQ(controller.get_current_threads("group_b"), num_cpus * 2);
+}
+
+// Test that when adaptive is disabled, adjust_once is a no-op (config guard
in _fire_group)
+TEST_F(AdaptiveThreadPoolControllerTest, TestAdaptiveDisabled) {
+ config::enable_adaptive_flush_threads = false;
+
+ AdaptiveThreadPoolController controller;
+ controller.init(nullptr, nullptr);
+ controller.add(
+ "test", {_pool.get()},
+ [](int current, int min_t, int max_t, std::string&) { return 1; },
4, 0.5);
+
+ int initial = controller.get_current_threads("test");
+ // bthread_timer callbacks check config::enable_adaptive_flush_threads
before firing;
+ // adjust_once bypasses that check, so we only verify the group was
registered.
+ EXPECT_GT(initial, 0);
+}
+
+// Test adjust_once calls the custom adjust function
+TEST_F(AdaptiveThreadPoolControllerTest, TestCustomAdjustFunc) {
+ config::enable_adaptive_flush_threads = true;
+
+ AdaptiveThreadPoolController controller;
+ controller.init(nullptr, nullptr);
+
+ int num_cpus = std::thread::hardware_concurrency();
+ if (num_cpus <= 0) num_cpus = 1;
+ int expected_min = std::max(1, static_cast<int>(num_cpus * 0.5));
+
+ // Adjust function always returns min
+ controller.add(
+ "test", {_pool.get()},
+ [](int current, int min_t, int max_t, std::string&) { return
min_t; }, 4, 0.5);
+
+ controller.adjust_once();
+
+ EXPECT_EQ(controller.get_current_threads("test"), expected_min);
+}
+
+// Test that result is clamped to [min, max]
+TEST_F(AdaptiveThreadPoolControllerTest, TestClampToMinMax) {
+ config::enable_adaptive_flush_threads = true;
+
+ AdaptiveThreadPoolController controller;
+ controller.init(nullptr, nullptr);
+
+ int num_cpus = std::thread::hardware_concurrency();
+ if (num_cpus <= 0) num_cpus = 1;
+ int expected_max = num_cpus * 2;
+ int expected_min = std::max(1, static_cast<int>(num_cpus * 0.5));
+
+ // Adjust function tries to return way beyond max
+ controller.add(
+ "test", {_pool.get()},
+ [](int current, int min_t, int max_t, std::string&) { return
99999; }, 2, 0.5);
+ controller.adjust_once();
+ EXPECT_EQ(controller.get_current_threads("test"), expected_max);
+
+ // Adjust function tries to return below min
+ controller.add(
+ "test2", {_pool2.get()},
+ [](int current, int min_t, int max_t, std::string&) { return -1;
}, 2, 0.5);
+ controller.adjust_once();
+ EXPECT_EQ(controller.get_current_threads("test2"), expected_min);
+}
+
+// Test adjust_once with no change keeps current threads
+TEST_F(AdaptiveThreadPoolControllerTest, TestAdjustOnceNoChange) {
+ config::enable_adaptive_flush_threads = true;
+
+ AdaptiveThreadPoolController controller;
+ controller.init(nullptr, nullptr);
+ controller.add(
+ "test", {_pool.get()},
+ [](int current, int min_t, int max_t, std::string&) { return
current; }, 4, 0.5);
+
+ int initial = controller.get_current_threads("test");
+ controller.adjust_once();
+ EXPECT_EQ(controller.get_current_threads("test"), initial);
+}
+
+// Test stop lifecycle
+TEST_F(AdaptiveThreadPoolControllerTest, TestStartStopLifecycle) {
+ config::enable_adaptive_flush_threads = true;
+
+ AdaptiveThreadPoolController controller;
+ controller.init(nullptr, nullptr);
+ controller.add(
+ "test", {_pool.get()},
+ [](int current, int min_t, int max_t, std::string&) { return
current; }, 4, 0.5);
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ controller.stop();
+
+ // Multiple stops should be safe
+ controller.stop();
+}
+
+// Test destructor stops the controller (cancels all timer events)
+TEST_F(AdaptiveThreadPoolControllerTest, TestDestructorStops) {
+ config::enable_adaptive_flush_threads = true;
+
+ {
+ AdaptiveThreadPoolController controller;
+ controller.init(nullptr, nullptr);
+ controller.add(
+ "test", {_pool.get()},
+ [](int current, int min_t, int max_t, std::string&) { return
current; }, 4, 0.5);
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ }
+ // Destructor called stop(), which cancelled all timer events. No UAF.
+ SUCCEED();
+}
+
+// Test constants are reasonable
+TEST_F(AdaptiveThreadPoolControllerTest, TestConstants) {
+ EXPECT_EQ(AdaptiveThreadPoolController::kDefaultIntervalMs, 10000);
+ EXPECT_EQ(AdaptiveThreadPoolController::kQueueThreshold, 10);
+ EXPECT_EQ(AdaptiveThreadPoolController::kIOBusyThresholdPercent, 90);
+ EXPECT_EQ(AdaptiveThreadPoolController::kCPUBusyThresholdPercent, 90);
+ EXPECT_EQ(AdaptiveThreadPoolController::kS3QueueBusyThreshold, 100);
+}
+
+// Test add after controller is already running
+TEST_F(AdaptiveThreadPoolControllerTest, TestAddAfterStart) {
+ config::enable_adaptive_flush_threads = true;
+
+ AdaptiveThreadPoolController controller;
+ controller.init(nullptr, nullptr);
+ controller.add(
+ "group_a", {_pool.get()},
+ [](int current, int min_t, int max_t, std::string&) { return
current; }, 4, 0.5);
+
+ // Add a second group while the controller is already running
+ controller.add(
+ "group_b", {_pool2.get()},
+ [](int current, int min_t, int max_t, std::string&) { return
current; }, 2, 0.5);
+
+ int num_cpus = std::thread::hardware_concurrency();
+ if (num_cpus <= 0) num_cpus = 1;
+ EXPECT_EQ(controller.get_current_threads("group_b"), num_cpus * 2);
+
+ controller.stop();
+}
+
+// Test is_io_busy and is_cpu_busy with null system_metrics
+TEST_F(AdaptiveThreadPoolControllerTest, TestIoBusyCpuBusyWithNullMetrics) {
+ AdaptiveThreadPoolController controller;
+ controller.init(nullptr, nullptr);
+
+ EXPECT_FALSE(controller.is_io_busy());
+ EXPECT_FALSE(controller.is_cpu_busy());
+}
+
+// Test adjust function that uses controller's is_io_busy/is_cpu_busy
+TEST_F(AdaptiveThreadPoolControllerTest, TestAdjustFuncWithControllerMethods) {
+ config::enable_adaptive_flush_threads = true;
+
+ AdaptiveThreadPoolController controller;
+ controller.init(nullptr, nullptr);
+
+ auto* ctrl = &controller;
+ controller.add(
+ "test", {_pool.get()},
+ [ctrl](int current, int min_t, int max_t, std::string&) {
+ int target = current;
+ if (ctrl->is_io_busy()) target = std::max(min_t, target - 1);
+ if (ctrl->is_cpu_busy()) target = std::max(min_t, target - 1);
+ return target;
+ },
+ 4, 0.5);
+
+ int initial = controller.get_current_threads("test");
+ controller.adjust_once();
+ // With null metrics, io_busy and cpu_busy return false, so no change
+ EXPECT_EQ(controller.get_current_threads("test"), initial);
+}
+
+// Test cancel removes the pool group
+TEST_F(AdaptiveThreadPoolControllerTest, TestCancel) {
+ AdaptiveThreadPoolController controller;
+ controller.init(nullptr, nullptr);
+
+ controller.add(
+ "test", {_pool.get()},
+ [](int current, int min_t, int max_t, std::string&) { return
current; }, 4, 0.5);
+ EXPECT_NE(controller.get_current_threads("test"), 0);
+
+ controller.cancel("test");
+ EXPECT_EQ(controller.get_current_threads("test"), 0);
+}
+
+} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]