This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push: new 25eb49082ae wait for clear cache before do spill to disk 25eb49082ae is described below commit 25eb49082ae7ca4646af13cc4890c8954e6baa6f Author: yiguolei <yiguo...@gmail.com> AuthorDate: Thu Sep 19 11:33:11 2024 +0800 wait for clear cache before do spill to disk --- be/src/common/daemon.cpp | 5 ++- be/src/runtime/memory/global_memory_arbitrator.cpp | 5 +++ be/src/runtime/memory/global_memory_arbitrator.h | 5 +++ .../workload_group/workload_group_manager.cpp | 45 ++++++++++++++++++---- 4 files changed, 52 insertions(+), 8 deletions(-) diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index c3f8d89de82..3bd0de7ebb6 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -487,7 +487,9 @@ void Daemon::cache_adjust_capacity_thread() { doris::GlobalMemoryArbitrator::cache_adjust_capacity_cv.wait_for( l, std::chrono::seconds(1)); } - double adjust_weighted = GlobalMemoryArbitrator::last_cache_capacity_adjust_weighted; + double adjust_weighted = std::min<double>( + GlobalMemoryArbitrator::last_cache_capacity_adjust_weighted, + GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted); if (_stop_background_threads_latch.count() == 0) { break; } @@ -502,6 +504,7 @@ void Daemon::cache_adjust_capacity_thread() { LOG(INFO) << fmt::format( "[MemoryGC] refresh cache capacity end, free memory {}, details: {}", PrettyPrinter::print(freed_mem, TUnit::BYTES), ss.str()); + GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted = adjust_weighted; doris::GlobalMemoryArbitrator::cache_adjust_capacity_notify.store( false, std::memory_order_relaxed); } while (true); diff --git a/be/src/runtime/memory/global_memory_arbitrator.cpp b/be/src/runtime/memory/global_memory_arbitrator.cpp index 45d7781786f..0c774187ff3 100644 --- a/be/src/runtime/memory/global_memory_arbitrator.cpp +++ b/be/src/runtime/memory/global_memory_arbitrator.cpp @@ -38,7 +38,12 @@ std::atomic<int64_t> GlobalMemoryArbitrator::refresh_interval_memory_growth = 0; std::mutex GlobalMemoryArbitrator::cache_adjust_capacity_lock; std::condition_variable GlobalMemoryArbitrator::cache_adjust_capacity_cv; std::atomic<bool> GlobalMemoryArbitrator::cache_adjust_capacity_notify {false}; +// This capacity is set by gc thread, it is running periodicity. std::atomic<double> GlobalMemoryArbitrator::last_cache_capacity_adjust_weighted {1}; +// This capacity is set by workload group spill disk thread +std::atomic<double> GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted {1}; +// The value that take affect +std::atomic<double> GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted {1}; std::mutex GlobalMemoryArbitrator::memtable_memory_refresh_lock; std::condition_variable GlobalMemoryArbitrator::memtable_memory_refresh_cv; std::atomic<bool> GlobalMemoryArbitrator::memtable_memory_refresh_notify {false}; diff --git a/be/src/runtime/memory/global_memory_arbitrator.h b/be/src/runtime/memory/global_memory_arbitrator.h index 1859f45391f..468d442b662 100644 --- a/be/src/runtime/memory/global_memory_arbitrator.h +++ b/be/src/runtime/memory/global_memory_arbitrator.h @@ -178,6 +178,11 @@ public: static std::condition_variable cache_adjust_capacity_cv; static std::atomic<bool> cache_adjust_capacity_notify; static std::atomic<double> last_cache_capacity_adjust_weighted; + // This capacity is set by workload group spill disk thread + static std::atomic<double> last_wg_trigger_cache_capacity_adjust_weighted; + // The value that take affect + static std::atomic<double> last_affected_cache_capacity_adjust_weighted; + static void notify_cache_adjust_capacity() { cache_adjust_capacity_notify.store(true, std::memory_order_relaxed); cache_adjust_capacity_cv.notify_all(); diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index c17b55f8956..6fe8c7a51eb 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -233,6 +233,10 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() { continue; } + // If the wg enable over commit memory, then it is no need to update query memlimit + if (wg.second->enable_memory_overcommit()) { + continue; + } int32_t total_used_slot_count = 0; int32_t total_slot_count = wg.second->total_query_slot_count(); // calculate total used slot count @@ -335,6 +339,11 @@ void WorkloadGroupMgr::add_paused_query(const std::shared_ptr<QueryContext>& que void WorkloadGroupMgr::handle_paused_queries() { std::unique_lock<std::mutex> lock(_paused_queries_lock); if (_paused_queries_list.empty()) { + if (doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted != 1) { + doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted = 1; + doris::GlobalMemoryArbitrator::notify_cache_adjust_capacity(); + LOG(INFO) << "There are no queries in paused list, so that set cache capacity to 1 now"; + } return; } @@ -371,13 +380,10 @@ void WorkloadGroupMgr::handle_paused_queries() { ++it; } - std::shared_ptr<QueryContext> max_revocable_query; - std::shared_ptr<QueryContext> max_memory_usage_query; - std::shared_ptr<QueryContext> running_query; - bool has_running_query = false; - size_t max_revocable_size = 0; - size_t max_memory_usage = 0; - auto it_to_remove = queries_list.end(); + // If the wg's query list is empty, then should do nothing + if (queries_list.empty()) { + continue; + } // TODO: should check buffer type memory first, if could release many these memory, then not need do spill disk // Buffer Memory are: @@ -387,6 +393,31 @@ void WorkloadGroupMgr::handle_paused_queries() { // 4. streaming aggs. // If we could not recycle memory from these buffers(< 10%), then do spill disk. + // 1. Check cache used, if cache is larger than > 0, then just return and wait for it to 0 to release some memory. + if (doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted > 0 && + doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted > 0) { + doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted = 0; + doris::GlobalMemoryArbitrator::notify_cache_adjust_capacity(); + LOG(INFO) << "There are some queries need memory, so that set cache capacity to 0 now"; + // If there is cache, then return, only check to do spill disk when cache is larger than 0. + return; + } + + // 2. If memtable size larger than 10% of wg's limit, then flush memtable and wait. + + // If the wg enable memory overcommit, then not spill, just cancel query. + if (wg->enable_memory_overcommit()) { + continue; + } + + std::shared_ptr<QueryContext> max_revocable_query; + std::shared_ptr<QueryContext> max_memory_usage_query; + std::shared_ptr<QueryContext> running_query; + bool has_running_query = false; + size_t max_revocable_size = 0; + size_t max_memory_usage = 0; + auto it_to_remove = queries_list.end(); + for (auto query_it = queries_list.begin(); query_it != queries_list.end();) { const auto query_ctx = query_it->query_ctx_.lock(); // The query is finished during in paused list. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org