This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve by this push:
     new a86d512321b Spill and reserve (#42611)
a86d512321b is described below

commit a86d512321b296522ae1f90c6c831b85415f4232
Author: yiguolei <676222...@qq.com>
AuthorDate: Mon Oct 28 17:18:52 2024 +0800

    Spill and reserve (#42611)
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
    
    ---------
    
    Co-authored-by: yiguolei <yiguo...@gmail.com>
---
 be/src/olap/memtable_memory_limiter.cpp                  |  3 ++-
 be/src/runtime/memory/mem_tracker_limiter.h              |  2 +-
 be/src/runtime/workload_group/workload_group.cpp         | 10 +++++-----
 be/src/runtime/workload_group/workload_group.h           |  6 ++++--
 be/src/runtime/workload_group/workload_group_manager.cpp | 12 +++++++++---
 5 files changed, 21 insertions(+), 12 deletions(-)

diff --git a/be/src/olap/memtable_memory_limiter.cpp 
b/be/src/olap/memtable_memory_limiter.cpp
index fe483706127..75b2418372f 100644
--- a/be/src/olap/memtable_memory_limiter.cpp
+++ b/be/src/olap/memtable_memory_limiter.cpp
@@ -118,7 +118,8 @@ void 
MemTableMemoryLimiter::handle_workload_group_memtable_flush(WorkloadGroupPt
     // Should releae memory quickly.
     using namespace std::chrono_literals;
     int32_t sleep_times = 10;
-    while (wg != nullptr && wg->enable_write_buffer_limit() && sleep_times > 
0) {
+    while (wg != nullptr && wg->enable_write_buffer_limit() && 
wg->exceed_write_buffer_limit() &&
+           sleep_times > 0) {
         std::this_thread::sleep_for(100ms);
         --sleep_times;
     }
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h 
b/be/src/runtime/memory/mem_tracker_limiter.h
index 1b03f2f2082..7e1a0e11c83 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -235,7 +235,7 @@ public:
     static void make_top_consumption_tasks_tracker_profile(RuntimeProfile* 
profile, int top_num);
     static void make_all_tasks_tracker_profile(RuntimeProfile* profile);
 
-    int64_t load_buffer_size() const { return _write_tracker->consumption(); }
+    int64_t write_buffer_size() const { return _write_tracker->consumption(); }
 
     std::shared_ptr<MemTrackerLimiter> write_tracker() { return 
_write_tracker; }
 
diff --git a/be/src/runtime/workload_group/workload_group.cpp 
b/be/src/runtime/workload_group/workload_group.cpp
index c0895e8f0fd..badc55073ab 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -94,7 +94,7 @@ std::string WorkloadGroup::debug_string() const {
     return fmt::format(
             "WorkloadGroup[id = {}, name = {}, version = {}, cpu_share = {}, "
             "total_query_slot_count={}, "
-            "memory_limit = {}, write_buffer_ratio= {}%"
+            "memory_limit = {}, write_buffer_ratio= {}%, "
             "enable_memory_overcommit = {}, total_mem_used = {},"
             "wg_refresh_interval_memory_growth = {},  mem_used_ratio = {}, 
spill_low_watermark = "
             "{}, spill_high_watermark = {},cpu_hard_limit = {}, 
scan_thread_num = "
@@ -185,7 +185,7 @@ void WorkloadGroup::check_and_update(const 
WorkloadGroupInfo& tg_info) {
 // MemtrackerLimiter is not removed during query context release, so that 
should remove it here.
 int64_t WorkloadGroup::refresh_memory_usage() {
     int64_t used_memory = 0;
-    int64_t load_buffer_size = 0;
+    int64_t write_buffer_size = 0;
     for (auto& mem_tracker_group : _mem_tracker_limiter_pool) {
         std::lock_guard<std::mutex> l(mem_tracker_group.group_lock);
         for (auto trackerWptr = mem_tracker_group.trackers.begin();
@@ -195,14 +195,14 @@ int64_t WorkloadGroup::refresh_memory_usage() {
                 trackerWptr = mem_tracker_group.trackers.erase(trackerWptr);
             } else {
                 used_memory += tracker->consumption();
-                load_buffer_size += tracker->load_buffer_size();
+                write_buffer_size += tracker->write_buffer_size();
                 ++trackerWptr;
             }
         }
     }
     // refresh total memory used.
-    _total_mem_used = used_memory + load_buffer_size;
-    _load_buffer_size = load_buffer_size;
+    _total_mem_used = used_memory + write_buffer_size;
+    _write_buffer_size = write_buffer_size;
     // reserve memory is recorded in the query mem tracker
     // and _total_mem_used already contains all the current reserve memory.
     // so after refreshing _total_mem_used, reset 
_wg_refresh_interval_memory_growth.
diff --git a/be/src/runtime/workload_group/workload_group.h 
b/be/src/runtime/workload_group/workload_group.h
index ccdc6374ce8..ce95495b29a 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -80,12 +80,14 @@ public:
 
     int64_t total_mem_used() const { return _total_mem_used; }
 
-    int64_t write_buffer_size() const { return _load_buffer_size; }
+    int64_t write_buffer_size() const { return _write_buffer_size; }
 
     void enable_write_buffer_limit(bool enable_limit) { 
_enable_write_buffer_limit = enable_limit; }
 
     bool enable_write_buffer_limit() const { return 
_enable_write_buffer_limit; }
 
+    bool exceed_write_buffer_limit() const { return _write_buffer_size > 
write_buffer_limit(); }
+
     // make memory snapshots and refresh total memory used at the same time.
     int64_t refresh_memory_usage();
     int64_t memory_used();
@@ -213,7 +215,7 @@ private:
     std::atomic<bool> _enable_write_buffer_limit = false;
 
     std::atomic_int64_t _total_mem_used = 0; // bytes
-    std::atomic_int64_t _load_buffer_size = 0;
+    std::atomic_int64_t _write_buffer_size = 0;
     std::atomic_int64_t _wg_refresh_interval_memory_growth;
     bool _enable_memory_overcommit;
     std::atomic<uint64_t> _cpu_share;
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp 
b/be/src/runtime/workload_group/workload_group_manager.cpp
index d7130ae26e4..91a1438d2fb 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -270,6 +270,15 @@ void WorkloadGroupMgr::add_paused_query(const 
std::shared_ptr<QueryContext>& que
  * strategy 5: If any query exceed process's memlimit and cache is zero, then 
do following:
  */
 void WorkloadGroupMgr::handle_paused_queries() {
+    {
+        std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
+        for (auto& [wg_id, wg] : _workload_groups) {
+            std::unique_lock<std::mutex> lock(_paused_queries_lock);
+            if (_paused_queries_list[wg].empty()) {
+                // Add an empty set to wg that not contains paused queries.
+            }
+        }
+    }
     const int64_t TIMEOUT_IN_QUEUE = 1000L * 10;
     std::unique_lock<std::mutex> lock(_paused_queries_lock);
     bool has_revoked_from_other_group = false;
@@ -353,9 +362,6 @@ void WorkloadGroupMgr::handle_paused_queries() {
                     wg->enable_write_buffer_limit(true);
                     ++query_it;
                     continue;
-                } else {
-                    // If could not revoke memory by flush memtable, then 
disable load buffer limit
-                    wg->enable_write_buffer_limit(false);
                 }
                 if (!has_changed_hard_limit) {
                     update_queries_limit_(wg, true);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to