This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push: new a86d512321b Spill and reserve (#42611) a86d512321b is described below commit a86d512321b296522ae1f90c6c831b85415f4232 Author: yiguolei <676222...@qq.com> AuthorDate: Mon Oct 28 17:18:52 2024 +0800 Spill and reserve (#42611) ## Proposed changes Issue Number: close #xxx <!--Describe your changes.--> --------- Co-authored-by: yiguolei <yiguo...@gmail.com> --- be/src/olap/memtable_memory_limiter.cpp | 3 ++- be/src/runtime/memory/mem_tracker_limiter.h | 2 +- be/src/runtime/workload_group/workload_group.cpp | 10 +++++----- be/src/runtime/workload_group/workload_group.h | 6 ++++-- be/src/runtime/workload_group/workload_group_manager.cpp | 12 +++++++++--- 5 files changed, 21 insertions(+), 12 deletions(-) diff --git a/be/src/olap/memtable_memory_limiter.cpp b/be/src/olap/memtable_memory_limiter.cpp index fe483706127..75b2418372f 100644 --- a/be/src/olap/memtable_memory_limiter.cpp +++ b/be/src/olap/memtable_memory_limiter.cpp @@ -118,7 +118,8 @@ void MemTableMemoryLimiter::handle_workload_group_memtable_flush(WorkloadGroupPt // Should releae memory quickly. using namespace std::chrono_literals; int32_t sleep_times = 10; - while (wg != nullptr && wg->enable_write_buffer_limit() && sleep_times > 0) { + while (wg != nullptr && wg->enable_write_buffer_limit() && wg->exceed_write_buffer_limit() && + sleep_times > 0) { std::this_thread::sleep_for(100ms); --sleep_times; } diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 1b03f2f2082..7e1a0e11c83 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -235,7 +235,7 @@ public: static void make_top_consumption_tasks_tracker_profile(RuntimeProfile* profile, int top_num); static void make_all_tasks_tracker_profile(RuntimeProfile* profile); - int64_t load_buffer_size() const { return _write_tracker->consumption(); } + int64_t write_buffer_size() const { return _write_tracker->consumption(); } std::shared_ptr<MemTrackerLimiter> write_tracker() { return _write_tracker; } diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index c0895e8f0fd..badc55073ab 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -94,7 +94,7 @@ std::string WorkloadGroup::debug_string() const { return fmt::format( "WorkloadGroup[id = {}, name = {}, version = {}, cpu_share = {}, " "total_query_slot_count={}, " - "memory_limit = {}, write_buffer_ratio= {}%" + "memory_limit = {}, write_buffer_ratio= {}%, " "enable_memory_overcommit = {}, total_mem_used = {}," "wg_refresh_interval_memory_growth = {}, mem_used_ratio = {}, spill_low_watermark = " "{}, spill_high_watermark = {},cpu_hard_limit = {}, scan_thread_num = " @@ -185,7 +185,7 @@ void WorkloadGroup::check_and_update(const WorkloadGroupInfo& tg_info) { // MemtrackerLimiter is not removed during query context release, so that should remove it here. int64_t WorkloadGroup::refresh_memory_usage() { int64_t used_memory = 0; - int64_t load_buffer_size = 0; + int64_t write_buffer_size = 0; for (auto& mem_tracker_group : _mem_tracker_limiter_pool) { std::lock_guard<std::mutex> l(mem_tracker_group.group_lock); for (auto trackerWptr = mem_tracker_group.trackers.begin(); @@ -195,14 +195,14 @@ int64_t WorkloadGroup::refresh_memory_usage() { trackerWptr = mem_tracker_group.trackers.erase(trackerWptr); } else { used_memory += tracker->consumption(); - load_buffer_size += tracker->load_buffer_size(); + write_buffer_size += tracker->write_buffer_size(); ++trackerWptr; } } } // refresh total memory used. - _total_mem_used = used_memory + load_buffer_size; - _load_buffer_size = load_buffer_size; + _total_mem_used = used_memory + write_buffer_size; + _write_buffer_size = write_buffer_size; // reserve memory is recorded in the query mem tracker // and _total_mem_used already contains all the current reserve memory. // so after refreshing _total_mem_used, reset _wg_refresh_interval_memory_growth. diff --git a/be/src/runtime/workload_group/workload_group.h b/be/src/runtime/workload_group/workload_group.h index ccdc6374ce8..ce95495b29a 100644 --- a/be/src/runtime/workload_group/workload_group.h +++ b/be/src/runtime/workload_group/workload_group.h @@ -80,12 +80,14 @@ public: int64_t total_mem_used() const { return _total_mem_used; } - int64_t write_buffer_size() const { return _load_buffer_size; } + int64_t write_buffer_size() const { return _write_buffer_size; } void enable_write_buffer_limit(bool enable_limit) { _enable_write_buffer_limit = enable_limit; } bool enable_write_buffer_limit() const { return _enable_write_buffer_limit; } + bool exceed_write_buffer_limit() const { return _write_buffer_size > write_buffer_limit(); } + // make memory snapshots and refresh total memory used at the same time. int64_t refresh_memory_usage(); int64_t memory_used(); @@ -213,7 +215,7 @@ private: std::atomic<bool> _enable_write_buffer_limit = false; std::atomic_int64_t _total_mem_used = 0; // bytes - std::atomic_int64_t _load_buffer_size = 0; + std::atomic_int64_t _write_buffer_size = 0; std::atomic_int64_t _wg_refresh_interval_memory_growth; bool _enable_memory_overcommit; std::atomic<uint64_t> _cpu_share; diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index d7130ae26e4..91a1438d2fb 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -270,6 +270,15 @@ void WorkloadGroupMgr::add_paused_query(const std::shared_ptr<QueryContext>& que * strategy 5: If any query exceed process's memlimit and cache is zero, then do following: */ void WorkloadGroupMgr::handle_paused_queries() { + { + std::shared_lock<std::shared_mutex> r_lock(_group_mutex); + for (auto& [wg_id, wg] : _workload_groups) { + std::unique_lock<std::mutex> lock(_paused_queries_lock); + if (_paused_queries_list[wg].empty()) { + // Add an empty set to wg that not contains paused queries. + } + } + } const int64_t TIMEOUT_IN_QUEUE = 1000L * 10; std::unique_lock<std::mutex> lock(_paused_queries_lock); bool has_revoked_from_other_group = false; @@ -353,9 +362,6 @@ void WorkloadGroupMgr::handle_paused_queries() { wg->enable_write_buffer_limit(true); ++query_it; continue; - } else { - // If could not revoke memory by flush memtable, then disable load buffer limit - wg->enable_write_buffer_limit(false); } if (!has_changed_hard_limit) { update_queries_limit_(wg, true); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org