This is an automated email from the ASF dual-hosted git repository. mrhhsg pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new b61cad3e451 [enhancement](memory) Record value of MemTracker reserve memory (#36412) b61cad3e451 is described below commit b61cad3e45145f34b3930ec13bd95ed4fafcc7e2 Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Wed Jun 19 11:08:09 2024 +0800 [enhancement](memory) Record value of MemTracker reserve memory (#36412) ## Proposed changes Tracking each MemTracker reserve memory, used to analyze reserve memory usage. --- be/src/http/default_path_handlers.cpp | 3 ++ be/src/runtime/memory/global_memory_arbitrator.cpp | 45 ++++++++++++++++++++++ be/src/runtime/memory/global_memory_arbitrator.h | 38 +++++++++--------- 3 files changed, 66 insertions(+), 20 deletions(-) diff --git a/be/src/http/default_path_handlers.cpp b/be/src/http/default_path_handlers.cpp index f9003373f82..f303ddc8c8f 100644 --- a/be/src/http/default_path_handlers.cpp +++ b/be/src/http/default_path_handlers.cpp @@ -42,6 +42,7 @@ #include "gutil/strings/substitute.h" #include "http/action/tablets_info_action.h" #include "http/web_page_handler.h" +#include "runtime/memory/global_memory_arbitrator.h" #include "runtime/memory/mem_tracker.h" #include "runtime/memory/mem_tracker_limiter.h" #include "util/easy_json.h" @@ -161,6 +162,8 @@ void mem_tracker_handler(const WebPageHandler::ArgumentMap& args, std::stringstr MemTrackerLimiter::Type::SCHEMA_CHANGE); } else if (iter->second == "other") { MemTrackerLimiter::make_type_snapshots(&snapshots, MemTrackerLimiter::Type::OTHER); + } else if (iter->second == "reserved_memory") { + GlobalMemoryArbitrator::make_reserved_memory_snapshots(&snapshots); } } else { (*output) << "<h4>*Notice:</h4>\n"; diff --git a/be/src/runtime/memory/global_memory_arbitrator.cpp b/be/src/runtime/memory/global_memory_arbitrator.cpp index be25d541df9..35fa350987f 100644 --- a/be/src/runtime/memory/global_memory_arbitrator.cpp +++ b/be/src/runtime/memory/global_memory_arbitrator.cpp @@ -19,8 +19,13 @@ #include <bvar/bvar.h> +#include "runtime/thread_context.h" + namespace doris { +std::mutex GlobalMemoryArbitrator::_reserved_trackers_lock; +std::unordered_map<std::string, MemTracker::MemCounter> GlobalMemoryArbitrator::_reserved_trackers; + bvar::PassiveStatus<int64_t> g_vm_rss_sub_allocator_cache( "meminfo_vm_rss_sub_allocator_cache", [](void*) { return GlobalMemoryArbitrator::vm_rss_sub_allocator_cache(); }, nullptr); @@ -34,4 +39,44 @@ bvar::PassiveStatus<int64_t> g_sys_mem_avail( std::atomic<int64_t> GlobalMemoryArbitrator::_s_process_reserved_memory = 0; std::atomic<int64_t> GlobalMemoryArbitrator::refresh_interval_memory_growth = 0; +bool GlobalMemoryArbitrator::try_reserve_process_memory(int64_t bytes) { + if (sys_mem_available() - bytes < MemInfo::sys_mem_available_low_water_mark()) { + return false; + } + int64_t old_reserved_mem = _s_process_reserved_memory.load(std::memory_order_relaxed); + int64_t new_reserved_mem = 0; + do { + new_reserved_mem = old_reserved_mem + bytes; + if (UNLIKELY(vm_rss_sub_allocator_cache() + + refresh_interval_memory_growth.load(std::memory_order_relaxed) + + new_reserved_mem >= + MemInfo::mem_limit())) { + return false; + } + } while (!_s_process_reserved_memory.compare_exchange_weak(old_reserved_mem, new_reserved_mem, + std::memory_order_relaxed)); + { + std::lock_guard<std::mutex> l(_reserved_trackers_lock); + _reserved_trackers[doris::thread_context()->thread_mem_tracker()->label()].add(bytes); + } + return true; +} + +void GlobalMemoryArbitrator::release_process_reserved_memory(int64_t bytes) { + _s_process_reserved_memory.fetch_sub(bytes, std::memory_order_relaxed); + { + std::lock_guard<std::mutex> l(_reserved_trackers_lock); + auto label = doris::thread_context()->thread_mem_tracker()->label(); + auto it = _reserved_trackers.find(label); + if (it == _reserved_trackers.end()) { + DCHECK(false) << "release unknown reserved memory " << label << ", bytes: " << bytes; + return; + } + _reserved_trackers[label].sub(bytes); + if (_reserved_trackers[label].current_value() == 0) { + _reserved_trackers.erase(it); + } + } +} + } // namespace doris diff --git a/be/src/runtime/memory/global_memory_arbitrator.h b/be/src/runtime/memory/global_memory_arbitrator.h index 19b0d483081..f8fda18d0e9 100644 --- a/be/src/runtime/memory/global_memory_arbitrator.h +++ b/be/src/runtime/memory/global_memory_arbitrator.h @@ -17,6 +17,7 @@ #pragma once +#include "runtime/memory/mem_tracker.h" #include "util/mem_info.h" namespace doris { @@ -102,27 +103,21 @@ public: return msg; } - static inline bool try_reserve_process_memory(int64_t bytes) { - if (sys_mem_available() - bytes < MemInfo::sys_mem_available_low_water_mark()) { - return false; + static bool try_reserve_process_memory(int64_t bytes); + static void release_process_reserved_memory(int64_t bytes); + + static inline void make_reserved_memory_snapshots( + std::vector<MemTracker::Snapshot>* snapshots) { + std::lock_guard<std::mutex> l(_reserved_trackers_lock); + for (const auto& pair : _reserved_trackers) { + MemTracker::Snapshot snapshot; + snapshot.type = "reserved_memory"; + snapshot.label = pair.first; + snapshot.limit = -1; + snapshot.cur_consumption = pair.second.current_value(); + snapshot.peak_consumption = pair.second.peak_value(); + (*snapshots).emplace_back(snapshot); } - int64_t old_reserved_mem = _s_process_reserved_memory.load(std::memory_order_relaxed); - int64_t new_reserved_mem = 0; - do { - new_reserved_mem = old_reserved_mem + bytes; - if (UNLIKELY(vm_rss_sub_allocator_cache() + - refresh_interval_memory_growth.load(std::memory_order_relaxed) + - new_reserved_mem >= - MemInfo::mem_limit())) { - return false; - } - } while (!_s_process_reserved_memory.compare_exchange_weak( - old_reserved_mem, new_reserved_mem, std::memory_order_relaxed)); - return true; - } - - static inline void release_process_reserved_memory(int64_t bytes) { - _s_process_reserved_memory.fetch_sub(bytes, std::memory_order_relaxed); } static inline int64_t process_reserved_memory() { @@ -180,6 +175,9 @@ public: private: static std::atomic<int64_t> _s_process_reserved_memory; + + static std::mutex _reserved_trackers_lock; + static std::unordered_map<std::string, MemTracker::MemCounter> _reserved_trackers; }; } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org