This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve by this push:
     new 25eb49082ae wait for clear cache before do spill to disk
25eb49082ae is described below

commit 25eb49082ae7ca4646af13cc4890c8954e6baa6f
Author: yiguolei <yiguo...@gmail.com>
AuthorDate: Thu Sep 19 11:33:11 2024 +0800

    wait for clear cache before do spill to disk
---
 be/src/common/daemon.cpp                           |  5 ++-
 be/src/runtime/memory/global_memory_arbitrator.cpp |  5 +++
 be/src/runtime/memory/global_memory_arbitrator.h   |  5 +++
 .../workload_group/workload_group_manager.cpp      | 45 ++++++++++++++++++----
 4 files changed, 52 insertions(+), 8 deletions(-)

diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index c3f8d89de82..3bd0de7ebb6 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -487,7 +487,9 @@ void Daemon::cache_adjust_capacity_thread() {
             doris::GlobalMemoryArbitrator::cache_adjust_capacity_cv.wait_for(
                     l, std::chrono::seconds(1));
         }
-        double adjust_weighted = 
GlobalMemoryArbitrator::last_cache_capacity_adjust_weighted;
+        double adjust_weighted = std::min<double>(
+                GlobalMemoryArbitrator::last_cache_capacity_adjust_weighted,
+                
GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted);
         if (_stop_background_threads_latch.count() == 0) {
             break;
         }
@@ -502,6 +504,7 @@ void Daemon::cache_adjust_capacity_thread() {
         LOG(INFO) << fmt::format(
                 "[MemoryGC] refresh cache capacity end, free memory {}, 
details: {}",
                 PrettyPrinter::print(freed_mem, TUnit::BYTES), ss.str());
+        GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted = 
adjust_weighted;
         doris::GlobalMemoryArbitrator::cache_adjust_capacity_notify.store(
                 false, std::memory_order_relaxed);
     } while (true);
diff --git a/be/src/runtime/memory/global_memory_arbitrator.cpp 
b/be/src/runtime/memory/global_memory_arbitrator.cpp
index 45d7781786f..0c774187ff3 100644
--- a/be/src/runtime/memory/global_memory_arbitrator.cpp
+++ b/be/src/runtime/memory/global_memory_arbitrator.cpp
@@ -38,7 +38,12 @@ std::atomic<int64_t> 
GlobalMemoryArbitrator::refresh_interval_memory_growth = 0;
 std::mutex GlobalMemoryArbitrator::cache_adjust_capacity_lock;
 std::condition_variable GlobalMemoryArbitrator::cache_adjust_capacity_cv;
 std::atomic<bool> GlobalMemoryArbitrator::cache_adjust_capacity_notify {false};
+// This capacity is set by gc thread, it is running periodicity.
 std::atomic<double> 
GlobalMemoryArbitrator::last_cache_capacity_adjust_weighted {1};
+// This capacity is set by workload group spill disk thread
+std::atomic<double> 
GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted {1};
+// The value that take affect
+std::atomic<double> 
GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted {1};
 std::mutex GlobalMemoryArbitrator::memtable_memory_refresh_lock;
 std::condition_variable GlobalMemoryArbitrator::memtable_memory_refresh_cv;
 std::atomic<bool> GlobalMemoryArbitrator::memtable_memory_refresh_notify 
{false};
diff --git a/be/src/runtime/memory/global_memory_arbitrator.h 
b/be/src/runtime/memory/global_memory_arbitrator.h
index 1859f45391f..468d442b662 100644
--- a/be/src/runtime/memory/global_memory_arbitrator.h
+++ b/be/src/runtime/memory/global_memory_arbitrator.h
@@ -178,6 +178,11 @@ public:
     static std::condition_variable cache_adjust_capacity_cv;
     static std::atomic<bool> cache_adjust_capacity_notify;
     static std::atomic<double> last_cache_capacity_adjust_weighted;
+    // This capacity is set by workload group spill disk thread
+    static std::atomic<double> last_wg_trigger_cache_capacity_adjust_weighted;
+    // The value that take affect
+    static std::atomic<double> last_affected_cache_capacity_adjust_weighted;
+
     static void notify_cache_adjust_capacity() {
         cache_adjust_capacity_notify.store(true, std::memory_order_relaxed);
         cache_adjust_capacity_cv.notify_all();
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp 
b/be/src/runtime/workload_group/workload_group_manager.cpp
index c17b55f8956..6fe8c7a51eb 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -233,6 +233,10 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() {
             continue;
         }
 
+        // If the wg enable over commit memory, then it is no need to update 
query memlimit
+        if (wg.second->enable_memory_overcommit()) {
+            continue;
+        }
         int32_t total_used_slot_count = 0;
         int32_t total_slot_count = wg.second->total_query_slot_count();
         // calculate total used slot count
@@ -335,6 +339,11 @@ void WorkloadGroupMgr::add_paused_query(const 
std::shared_ptr<QueryContext>& que
 void WorkloadGroupMgr::handle_paused_queries() {
     std::unique_lock<std::mutex> lock(_paused_queries_lock);
     if (_paused_queries_list.empty()) {
+        if 
(doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted 
!= 1) {
+            
doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted = 
1;
+            doris::GlobalMemoryArbitrator::notify_cache_adjust_capacity();
+            LOG(INFO) << "There are no queries in paused list, so that set 
cache capacity to 1 now";
+        }
         return;
     }
 
@@ -371,13 +380,10 @@ void WorkloadGroupMgr::handle_paused_queries() {
             ++it;
         }
 
-        std::shared_ptr<QueryContext> max_revocable_query;
-        std::shared_ptr<QueryContext> max_memory_usage_query;
-        std::shared_ptr<QueryContext> running_query;
-        bool has_running_query = false;
-        size_t max_revocable_size = 0;
-        size_t max_memory_usage = 0;
-        auto it_to_remove = queries_list.end();
+        // If the wg's query list is empty, then should do nothing
+        if (queries_list.empty()) {
+            continue;
+        }
 
         // TODO: should check buffer type memory first, if could release many 
these memory, then not need do spill disk
         // Buffer Memory are:
@@ -387,6 +393,31 @@ void WorkloadGroupMgr::handle_paused_queries() {
         // 4. streaming aggs.
         // If we could not recycle memory from these buffers(< 10%), then do 
spill disk.
 
+        // 1. Check cache used, if cache is larger than > 0, then just return 
and wait for it to 0 to release some memory.
+        if 
(doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted > 
0 &&
+            
doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted > 
0) {
+            
doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted = 
0;
+            doris::GlobalMemoryArbitrator::notify_cache_adjust_capacity();
+            LOG(INFO) << "There are some queries need memory, so that set 
cache capacity to 0 now";
+            // If there is cache, then return, only check to do spill disk 
when cache is larger than 0.
+            return;
+        }
+
+        // 2. If memtable size larger than 10% of wg's limit, then flush 
memtable and wait.
+
+        // If the wg enable memory overcommit, then not spill, just cancel 
query.
+        if (wg->enable_memory_overcommit()) {
+            continue;
+        }
+
+        std::shared_ptr<QueryContext> max_revocable_query;
+        std::shared_ptr<QueryContext> max_memory_usage_query;
+        std::shared_ptr<QueryContext> running_query;
+        bool has_running_query = false;
+        size_t max_revocable_size = 0;
+        size_t max_memory_usage = 0;
+        auto it_to_remove = queries_list.end();
+
         for (auto query_it = queries_list.begin(); query_it != 
queries_list.end();) {
             const auto query_ctx = query_it->query_ctx_.lock();
             // The query is finished during in paused list.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to