github-actions[bot] commented on code in PR #60617:
URL: https://github.com/apache/doris/pull/60617#discussion_r2988979556


##########
be/src/storage/adaptive_thread_pool_controller.cpp:
##########
@@ -0,0 +1,244 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "storage/adaptive_thread_pool_controller.h"
+
+#include <algorithm>
+#include <thread>
+
+#include "cloud/config.h"
+#include "common/config.h"
+#include "common/logging.h"
+#include "common/metrics/system_metrics.h"
+#include "common/status.h"
+#include "util/threadpool.h"
+#include "util/time.h"
+
+namespace doris {
+
+int AdaptiveThreadPoolController::PoolGroup::get_max_threads() const {
+    int num_cpus = std::thread::hardware_concurrency();
+    if (num_cpus <= 0) num_cpus = 1;
+    return static_cast<int>(num_cpus * max_threads_per_cpu);
+}
+
+int AdaptiveThreadPoolController::PoolGroup::get_min_threads() const {
+    int num_cpus = std::thread::hardware_concurrency();
+    if (num_cpus <= 0) num_cpus = 1;
+    return std::max(1, static_cast<int>(num_cpus * min_threads_per_cpu));
+}
+
+void AdaptiveThreadPoolController::init(SystemMetrics* system_metrics,
+                                        ThreadPool* s3_file_upload_pool, 
Timer* timer) {
+    _system_metrics = system_metrics;
+    _s3_file_upload_pool = s3_file_upload_pool;
+    _timer = timer;
+}
+
+void AdaptiveThreadPoolController::stop() {
+    std::vector<std::string> names;
+    {
+        std::lock_guard<std::mutex> lk(_mutex);
+        for (const auto& [name, _] : _pool_groups) {
+            names.push_back(name);
+        }
+    }
+    for (const auto& name : names) {
+        cancel(name);
+    }
+}
+
+void AdaptiveThreadPoolController::add(std::string name, 
std::vector<ThreadPool*> pools,
+                                       AdjustFunc adjust_func, double 
max_threads_per_cpu,
+                                       double min_threads_per_cpu, 
Timer::Duration interval) {
+    PoolGroup group;
+    group.name = name;
+    group.pools = std::move(pools);
+    group.adjust_func = std::move(adjust_func);
+    group.max_threads_per_cpu = max_threads_per_cpu;
+    group.min_threads_per_cpu = min_threads_per_cpu;
+    group.current_threads = group.get_max_threads();
+
+    // Schedule the recurring adjustment on the shared timer.
+    Timer::EventId event_id =
+            _timer->schedule_recurring(interval, [this, name] { 
_fire_group(name); });
+
+    {
+        std::lock_guard<std::mutex> lk(_mutex);
+        _pool_groups[name] = std::move(group);
+        _event_ids[name] = event_id;
+    }
+
+    LOG(INFO) << "Adaptive: added pool group '" << name << "'"
+              << ", max_threads=" << _pool_groups[name].get_max_threads()

Review Comment:
   **[Bug / Data Race]** `_pool_groups[name]` is accessed here **after** 
releasing `_mutex` (the lock scope ends at line 84). Another thread could call 
`cancel(name)` which erases the entry from `_pool_groups` concurrently, or the 
timer callback could modify `current_threads`. This is undefined behavior under 
the C++ memory model.
   
   Fix: capture the values inside the lock scope before logging:
   ```cpp
   int log_max, log_min;
   {
       std::lock_guard<std::mutex> lk(_mutex);
       _pool_groups[name] = std::move(group);
       _event_ids[name] = event_id;
       log_max = _pool_groups[name].get_max_threads();
       log_min = _pool_groups[name].get_min_threads();
   }
   LOG(INFO) << ... << log_max << ... << log_min << ...;
   ```



##########
be/test/storage/adaptive_thread_pool_controller_test.cpp:
##########
@@ -0,0 +1,286 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <thread>
+
+#include "common/config.h"
+#include "storage/adaptive_thread_controller.h"
+#include "util/threadpool.h"

Review Comment:
   **[Critical / Compilation Error]** This include path does not exist. The 
actual header is `"storage/adaptive_thread_pool_controller.h"`. Additionally, 
throughout this entire test file:
   
   1. `AdaptiveThreadController` should be `AdaptiveThreadPoolController`
   2. `register_pool_group(...)` should be `add(...)`
   3. `unregister_pool_group(...)` should be `cancel(...)`
   4. `kCheckIntervalMs` should be `kDefaultIntervalMs` (and its value is 
`10000`, not `60000`)
   5. `controller.init(nullptr, nullptr)` has wrong arity — the actual 
signature is `init(SystemMetrics*, ThreadPool*, Timer*)`
   6. `controller.start()` / `controller.stop()` do not exist on 
`AdaptiveThreadPoolController` (the controller has no internal thread — 
scheduling is delegated to `Timer`)
   
   This test file appears to be written against a different/earlier version of 
the API. It **will not compile** and needs to be completely rewritten to match 
the actual `AdaptiveThreadPoolController` interface.



##########
be/src/runtime/workload_group/workload_group.cpp:
##########
@@ -743,6 +755,14 @@ void WorkloadGroup::try_stop_schedulers() {
         _remote_scan_task_sched->stop();
     }
     if (_memtable_flush_pool) {
+        // Unregister from adaptive controller before destroying the pool to 
avoid UAF:
+        // the adjustment loop holds raw ThreadPool* pointers and must not 
access them
+        // after the pool is gone.
+        if (config::enable_adaptive_flush_threads) {
+            auto* controller =
+                    
ExecEnv::GetInstance()->storage_engine().adaptive_thread_controller();
+            controller->cancel("flush_wg_" + std::to_string(_id));
+        }

Review Comment:
   **[Bug / Potential UAF]** `cancel()` is **non-blocking** with respect to 
in-flight timer callbacks. The timer's `cancel()` only prevents rescheduling — 
it does not wait for a currently-executing callback to finish.
   
   Race scenario:
   1. Timer thread fires `_fire_group("flush_wg_42")`, copies the `AdjustFunc` 
(which captures raw `flush_pool*`), releases the controller's lock.
   2. This thread calls `cancel("flush_wg_42")`, which returns immediately.
   3. This thread proceeds to `_memtable_flush_pool->shutdown()` and eventually 
destroys the pool.
   4. The timer thread is still executing `flush_pool->get_queue_size()` on the 
now-destroyed pool → **use-after-free**.
   
   To fix this, either:
   - Make `Timer::cancel()` synchronous (wait for in-flight callback to 
complete), or
   - Use `shared_ptr<ThreadPool>` in the captured lambda and the PoolGroup, or
   - Add a synchronization mechanism (e.g., a per-group `std::shared_mutex` or 
a "drain" API) that blocks until the in-flight callback completes.



##########
be/src/storage/adaptive_thread_pool_controller.cpp:
##########
@@ -0,0 +1,244 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "storage/adaptive_thread_pool_controller.h"
+
+#include <algorithm>
+#include <thread>
+
+#include "cloud/config.h"
+#include "common/config.h"
+#include "common/logging.h"
+#include "common/metrics/system_metrics.h"
+#include "common/status.h"
+#include "util/threadpool.h"
+#include "util/time.h"
+
+namespace doris {
+
+int AdaptiveThreadPoolController::PoolGroup::get_max_threads() const {
+    int num_cpus = std::thread::hardware_concurrency();
+    if (num_cpus <= 0) num_cpus = 1;
+    return static_cast<int>(num_cpus * max_threads_per_cpu);
+}
+
+int AdaptiveThreadPoolController::PoolGroup::get_min_threads() const {
+    int num_cpus = std::thread::hardware_concurrency();
+    if (num_cpus <= 0) num_cpus = 1;
+    return std::max(1, static_cast<int>(num_cpus * min_threads_per_cpu));
+}
+
+void AdaptiveThreadPoolController::init(SystemMetrics* system_metrics,
+                                        ThreadPool* s3_file_upload_pool, 
Timer* timer) {
+    _system_metrics = system_metrics;
+    _s3_file_upload_pool = s3_file_upload_pool;
+    _timer = timer;
+}
+
+void AdaptiveThreadPoolController::stop() {
+    std::vector<std::string> names;
+    {
+        std::lock_guard<std::mutex> lk(_mutex);
+        for (const auto& [name, _] : _pool_groups) {
+            names.push_back(name);
+        }
+    }
+    for (const auto& name : names) {
+        cancel(name);
+    }
+}
+
+void AdaptiveThreadPoolController::add(std::string name, 
std::vector<ThreadPool*> pools,
+                                       AdjustFunc adjust_func, double 
max_threads_per_cpu,
+                                       double min_threads_per_cpu, 
Timer::Duration interval) {
+    PoolGroup group;
+    group.name = name;
+    group.pools = std::move(pools);
+    group.adjust_func = std::move(adjust_func);
+    group.max_threads_per_cpu = max_threads_per_cpu;
+    group.min_threads_per_cpu = min_threads_per_cpu;
+    group.current_threads = group.get_max_threads();
+
+    // Schedule the recurring adjustment on the shared timer.
+    Timer::EventId event_id =
+            _timer->schedule_recurring(interval, [this, name] { 
_fire_group(name); });
+
+    {
+        std::lock_guard<std::mutex> lk(_mutex);
+        _pool_groups[name] = std::move(group);
+        _event_ids[name] = event_id;
+    }
+
+    LOG(INFO) << "Adaptive: added pool group '" << name << "'"
+              << ", max_threads=" << _pool_groups[name].get_max_threads()
+              << ", min_threads=" << _pool_groups[name].get_min_threads() << 
", interval_ms="
+              << 
std::chrono::duration_cast<std::chrono::milliseconds>(interval).count();
+}
+
+void AdaptiveThreadPoolController::cancel(const std::string& name) {
+    Timer::EventId event_id = Timer::kInvalidId;
+    {
+        std::lock_guard<std::mutex> lk(_mutex);
+        auto it = _event_ids.find(name);
+        if (it != _event_ids.end()) {
+            event_id = it->second;
+            _event_ids.erase(it);
+        }
+        _pool_groups.erase(name);
+    }
+    if (event_id != Timer::kInvalidId) {
+        _timer->cancel(event_id);
+    }
+    LOG(INFO) << "Adaptive: cancelled pool group '" << name << "'";
+}
+
+// Called from the Timer thread. No lock held on entry.
+void AdaptiveThreadPoolController::_fire_group(const std::string& name) {
+    if (!config::enable_adaptive_flush_threads) {
+        return;
+    }
+    // Phase 1: snapshot parameters under the lock.
+    AdjustFunc fn;
+    int current, min_t, max_t;
+    {
+        std::lock_guard<std::mutex> lk(_mutex);
+        auto it = _pool_groups.find(name);
+        if (it == _pool_groups.end()) return;
+        const PoolGroup& g = it->second;
+        fn = g.adjust_func;
+        current = g.current_threads;
+        min_t = g.get_min_threads();
+        max_t = g.get_max_threads();
+    }
+
+    // Phase 2: compute target — no lock held (adjust_func may call is_io_busy 
etc.).
+    std::string reason;
+    int target = fn(current, min_t, max_t, reason);
+
+    // Phase 3: apply under lock; recheck in case cancel() raced with us.
+    std::lock_guard<std::mutex> lk(_mutex);
+    auto it = _pool_groups.find(name);
+    if (it == _pool_groups.end()) return;
+    _apply_thread_count(it->second, target, reason);
+}
+
+// Fire all groups once regardless of schedule. For testing.
+void AdaptiveThreadPoolController::adjust_once() {
+    std::vector<std::string> names;
+    {
+        std::lock_guard<std::mutex> lk(_mutex);
+        for (const auto& [name, _] : _pool_groups) {
+            names.push_back(name);
+        }
+    }
+    for (const auto& name : names) {
+        _fire_group(name);
+    }
+}
+
+void AdaptiveThreadPoolController::_apply_thread_count(PoolGroup& group, int 
target_threads,
+                                                       const std::string& 
reason) {
+    int max_threads = group.get_max_threads();
+    int min_threads = group.get_min_threads();
+    target_threads = std::max(min_threads, std::min(max_threads, 
target_threads));
+    if (target_threads == group.current_threads) return;
+
+    LOG(INFO) << "Adaptive[" << group.name << "]: adjusting threads from " << 
group.current_threads
+              << " to " << target_threads << " (min=" << min_threads << ", 
max=" << max_threads
+              << ")" << (reason.empty() ? "" : " reason=[" + reason + "]");
+
+    bool all_success = true;
+    for (auto* pool : group.pools) {
+        if (pool == nullptr) continue;
+        Status st = pool->set_max_threads(target_threads);
+        if (!st.ok()) {

Review Comment:
   **[Bug / Missing `set_min_threads`]** `_apply_thread_count` only calls 
`set_max_threads()` but never `set_min_threads()`. This causes failures when 
the target is below the ThreadPool's internal `_min_threads`.
   
   `ThreadPool::set_max_threads()` returns `InternalError` if `_min_threads > 
max_threads`. Since other code paths 
(`MemTableFlushExecutor::update_memtable_flush_threads()`, 
`WorkloadGroup::update_memtable_flush_threads()`) independently call 
`set_min_threads()` on the same pools, the ThreadPool's `_min_threads` may 
diverge from what the adaptive controller expects.
   
   For example, if `update_memtable_flush_threads()` sets `_min_threads = 12`, 
then the adaptive controller trying `set_max_threads(8)` will fail silently 
(logged as warning, but the pool is NOT adjusted).
   
   Fix: When reducing threads, first call `set_min_threads(min_t)`, then 
`set_max_threads(target)`. When increasing threads, reverse the order. This 
mirrors the pattern already used in `update_memtable_flush_threads()`.



##########
be/src/runtime/exec_env_init.cpp:
##########
@@ -827,6 +828,7 @@ void ExecEnv::destroy() {
     SAFE_STOP(_external_scan_context_mgr);
     SAFE_STOP(_fragment_mgr);
     SAFE_STOP(_runtime_filter_timer_queue);
+    _timer.stop();
     // NewLoadStreamMgr should be destoried before storage_engine & after 
fragment_mgr stopped.

Review Comment:
   **[Warning / Fragile Lifecycle Ordering]** `_timer.stop()` is called 
**before** `SAFE_STOP(_storage_engine)`. This means the timer (infrastructure) 
is destroyed before the adaptive controller (consumer) is stopped.
   
   This currently works because `Timer::stop()` is synchronous — it joins the 
timer thread, guaranteeing all in-flight callbacks complete before returning. 
But the ordering is semantically inverted: the controller should be stopped 
before its scheduling infrastructure.
   
   If `Timer::stop()` were ever made asynchronous, or if the controller's 
`stop()` (called inside `SAFE_STOP(_storage_engine)`) tries to cancel events on 
an already-stopped timer, the behavior would silently degrade.
   
   Recommendation: Move `_timer.stop()` **after** `SAFE_STOP(_storage_engine)` 
and `_storage_engine.reset()`, or at minimum add a comment documenting this 
ordering dependency.



##########
be/src/storage/adaptive_thread_pool_controller.cpp:
##########
@@ -0,0 +1,244 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "storage/adaptive_thread_pool_controller.h"
+
+#include <algorithm>
+#include <thread>
+
+#include "cloud/config.h"
+#include "common/config.h"
+#include "common/logging.h"
+#include "common/metrics/system_metrics.h"
+#include "common/status.h"
+#include "util/threadpool.h"
+#include "util/time.h"
+
+namespace doris {
+
+int AdaptiveThreadPoolController::PoolGroup::get_max_threads() const {
+    int num_cpus = std::thread::hardware_concurrency();
+    if (num_cpus <= 0) num_cpus = 1;
+    return static_cast<int>(num_cpus * max_threads_per_cpu);
+}
+
+int AdaptiveThreadPoolController::PoolGroup::get_min_threads() const {
+    int num_cpus = std::thread::hardware_concurrency();
+    if (num_cpus <= 0) num_cpus = 1;
+    return std::max(1, static_cast<int>(num_cpus * min_threads_per_cpu));
+}
+
+void AdaptiveThreadPoolController::init(SystemMetrics* system_metrics,
+                                        ThreadPool* s3_file_upload_pool, 
Timer* timer) {
+    _system_metrics = system_metrics;
+    _s3_file_upload_pool = s3_file_upload_pool;
+    _timer = timer;
+}
+
+void AdaptiveThreadPoolController::stop() {
+    std::vector<std::string> names;
+    {
+        std::lock_guard<std::mutex> lk(_mutex);
+        for (const auto& [name, _] : _pool_groups) {
+            names.push_back(name);
+        }
+    }
+    for (const auto& name : names) {
+        cancel(name);
+    }
+}
+
+void AdaptiveThreadPoolController::add(std::string name, 
std::vector<ThreadPool*> pools,
+                                       AdjustFunc adjust_func, double 
max_threads_per_cpu,
+                                       double min_threads_per_cpu, 
Timer::Duration interval) {
+    PoolGroup group;
+    group.name = name;
+    group.pools = std::move(pools);
+    group.adjust_func = std::move(adjust_func);
+    group.max_threads_per_cpu = max_threads_per_cpu;
+    group.min_threads_per_cpu = min_threads_per_cpu;
+    group.current_threads = group.get_max_threads();
+
+    // Schedule the recurring adjustment on the shared timer.
+    Timer::EventId event_id =
+            _timer->schedule_recurring(interval, [this, name] { 
_fire_group(name); });
+
+    {
+        std::lock_guard<std::mutex> lk(_mutex);
+        _pool_groups[name] = std::move(group);
+        _event_ids[name] = event_id;
+    }
+
+    LOG(INFO) << "Adaptive: added pool group '" << name << "'"
+              << ", max_threads=" << _pool_groups[name].get_max_threads()
+              << ", min_threads=" << _pool_groups[name].get_min_threads() << 
", interval_ms="
+              << 
std::chrono::duration_cast<std::chrono::milliseconds>(interval).count();
+}
+
+void AdaptiveThreadPoolController::cancel(const std::string& name) {
+    Timer::EventId event_id = Timer::kInvalidId;
+    {
+        std::lock_guard<std::mutex> lk(_mutex);
+        auto it = _event_ids.find(name);
+        if (it != _event_ids.end()) {
+            event_id = it->second;
+            _event_ids.erase(it);
+        }
+        _pool_groups.erase(name);
+    }
+    if (event_id != Timer::kInvalidId) {
+        _timer->cancel(event_id);
+    }
+    LOG(INFO) << "Adaptive: cancelled pool group '" << name << "'";
+}
+
+// Called from the Timer thread. No lock held on entry.
+void AdaptiveThreadPoolController::_fire_group(const std::string& name) {
+    if (!config::enable_adaptive_flush_threads) {
+        return;
+    }
+    // Phase 1: snapshot parameters under the lock.
+    AdjustFunc fn;
+    int current, min_t, max_t;
+    {
+        std::lock_guard<std::mutex> lk(_mutex);
+        auto it = _pool_groups.find(name);
+        if (it == _pool_groups.end()) return;
+        const PoolGroup& g = it->second;
+        fn = g.adjust_func;
+        current = g.current_threads;
+        min_t = g.get_min_threads();
+        max_t = g.get_max_threads();
+    }
+
+    // Phase 2: compute target — no lock held (adjust_func may call is_io_busy 
etc.).
+    std::string reason;
+    int target = fn(current, min_t, max_t, reason);
+
+    // Phase 3: apply under lock; recheck in case cancel() raced with us.
+    std::lock_guard<std::mutex> lk(_mutex);
+    auto it = _pool_groups.find(name);
+    if (it == _pool_groups.end()) return;
+    _apply_thread_count(it->second, target, reason);
+}
+
+// Fire all groups once regardless of schedule. For testing.
+void AdaptiveThreadPoolController::adjust_once() {
+    std::vector<std::string> names;
+    {
+        std::lock_guard<std::mutex> lk(_mutex);
+        for (const auto& [name, _] : _pool_groups) {
+            names.push_back(name);
+        }
+    }
+    for (const auto& name : names) {
+        _fire_group(name);
+    }
+}
+
+void AdaptiveThreadPoolController::_apply_thread_count(PoolGroup& group, int 
target_threads,
+                                                       const std::string& 
reason) {
+    int max_threads = group.get_max_threads();
+    int min_threads = group.get_min_threads();
+    target_threads = std::max(min_threads, std::min(max_threads, 
target_threads));
+    if (target_threads == group.current_threads) return;
+
+    LOG(INFO) << "Adaptive[" << group.name << "]: adjusting threads from " << 
group.current_threads
+              << " to " << target_threads << " (min=" << min_threads << ", 
max=" << max_threads
+              << ")" << (reason.empty() ? "" : " reason=[" + reason + "]");
+
+    bool all_success = true;
+    for (auto* pool : group.pools) {
+        if (pool == nullptr) continue;
+        Status st = pool->set_max_threads(target_threads);
+        if (!st.ok()) {
+            all_success = false;
+            LOG(WARNING) << "Adaptive[" << group.name << "]: failed to set max 
threads: " << st;
+        }
+    }
+    if (all_success) {
+        group.current_threads = target_threads;
+    }
+}
+
+int AdaptiveThreadPoolController::get_current_threads(const std::string& name) 
const {
+    std::lock_guard<std::mutex> lk(_mutex);
+    auto it = _pool_groups.find(name);
+    return it != _pool_groups.end() ? it->second.current_threads : 0;
+}
+
+bool AdaptiveThreadPoolController::is_io_busy() {
+    if (config::is_cloud_mode()) {
+        if (_s3_file_upload_pool == nullptr) return false;
+        int queue_size = _s3_file_upload_pool->get_queue_size();
+        return queue_size > kS3QueueBusyThreshold;
+    }
+
+    if (_system_metrics == nullptr) return false;
+
+    int64_t current_time_sec = MonotonicSeconds();
+    int64_t interval_sec = current_time_sec - _last_check_time_sec;
+    if (interval_sec <= 0) {
+        // Interval too short to compute a valid IO delta (e.g. two groups 
fired
+        // back-to-back). Reuse the last known result rather than returning 
false,
+        // which would incorrectly signal "not busy".
+        return _last_io_busy;
+    }
+
+    int64_t max_io_util = _system_metrics->get_max_io_util(_last_disk_io_time, 
interval_sec);
+    _system_metrics->get_disks_io_time(&_last_disk_io_time);
+    _last_check_time_sec = current_time_sec;
+
+    _last_io_busy = max_io_util > kIOBusyThresholdPercent;
+    return _last_io_busy;
+}
+
+bool AdaptiveThreadPoolController::is_cpu_busy() {
+    if (_system_metrics == nullptr) return false;
+
+    double load_avg = _system_metrics->get_load_average_1_min();
+    int num_cpus = std::thread::hardware_concurrency();
+    if (num_cpus <= 0) return false;
+
+    double cpu_usage_percent = (load_avg / num_cpus) * 100.0;
+    return cpu_usage_percent > kCPUBusyThresholdPercent;
+}
+
+AdaptiveThreadPoolController::AdjustFunc 
AdaptiveThreadPoolController::make_flush_adjust_func(
+        AdaptiveThreadPoolController* controller, ThreadPool* flush_pool) {
+    return [controller, flush_pool](int current, int min_t, int max_t, 
std::string& reason) {
+        int target = current;
+        int queue_size = flush_pool->get_queue_size();
+        if (queue_size > kQueueThreshold) {
+            target = std::min(max_t, target + 1);
+            reason += "queue_size=" + std::to_string(queue_size) + ">" +

Review Comment:
   **[Design / Adjustment Logic]** The scaling logic has several concerns:
   
   1. **IO/CPU busyness unconditionally overrides queue backlog**: If 
`queue_size > 10` AND IO is busy AND CPU is busy, the net effect is `current + 
1 - 2 - 2 = current - 3`. This reduces threads even when work is piling up, 
which will exacerbate memory pressure from accumulated memtables.
   
   2. **Scale-up is always +1 per interval**: Even if `queue_size = 1000`, only 
1 thread is added per 10s cycle. On a 16-core machine (max=64, min=8), 
recovering from min to max takes 56 intervals = 560 seconds.
   
   3. **Step sizes (+1, -2) are hardcoded literals**: Unlike the thresholds 
(`kQueueThreshold`, `kIOBusyThresholdPercent`), the step sizes are not named 
constants or configurable.
   
   Consider:
   - Proportional scale-up based on queue depth (e.g., `+max(1, queue_size / 
kQueueThreshold)`)
   - A "critical queue" threshold above which IO/CPU busyness does not reduce 
threads
   - Extracting step sizes as named constants



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to