This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit f8fcd17f33deab0605c9378850a21714293ef1b5 Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Mon May 27 18:11:37 2024 +0800 [fix](memory) Fix nested scoped tracker and nested reserve memory (#35257) SCOPED_ATTACH_TASK cannot be nested, but SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER can continue to be called, so attach_limiter_tracker may be nested. --- be/src/runtime/memory/thread_mem_tracker_mgr.cpp | 14 ++++++-- be/src/runtime/memory/thread_mem_tracker_mgr.h | 41 ++++++++++++++++-------- be/src/util/mem_info.cpp | 4 +-- 3 files changed, 41 insertions(+), 18 deletions(-) diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp index 766ee643584..b37b35313d8 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp @@ -44,9 +44,16 @@ private: void ThreadMemTrackerMgr::attach_limiter_tracker( const std::shared_ptr<MemTrackerLimiter>& mem_tracker) { DCHECK(mem_tracker); - DCHECK(_reserved_mem == 0); CHECK(init()); flush_untracked_mem(); + _reserved_mem_stack.push_back(_reserved_mem); + if (_reserved_mem != 0) { + // _untracked_mem temporary store bytes that not synchronized to process reserved memory, + // but bytes have been subtracted from thread _reserved_mem. + doris::GlobalMemoryArbitrator::release_process_reserved_memory(_untracked_mem); + _reserved_mem = 0; + _untracked_mem = 0; + } _limiter_tracker = mem_tracker; _limiter_tracker_raw = mem_tracker.get(); } @@ -54,8 +61,11 @@ void ThreadMemTrackerMgr::attach_limiter_tracker( void ThreadMemTrackerMgr::detach_limiter_tracker( const std::shared_ptr<MemTrackerLimiter>& old_mem_tracker) { CHECK(init()); - release_reserved(); flush_untracked_mem(); + release_reserved(); + DCHECK(!_reserved_mem_stack.empty()); + _reserved_mem = _reserved_mem_stack.back(); + _reserved_mem_stack.pop_back(); _limiter_tracker = old_mem_tracker; _limiter_tracker_raw = old_mem_tracker.get(); } diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 6081b013346..64c2190a149 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -48,6 +48,7 @@ public: ~ThreadMemTrackerMgr() { // if _init == false, exec env is not initialized when init(). and never consumed mem tracker once. if (_init) { + DCHECK(_reserved_mem == 0); flush_untracked_mem(); } } @@ -132,6 +133,9 @@ private: int64_t _old_untracked_mem = 0; int64_t _reserved_mem = 0; + // SCOPED_ATTACH_TASK cannot be nested, but SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER can continue to be used, + // so `attach_limiter_tracker` may be nested. + std::vector<int64_t> _reserved_mem_stack; bool _count_scope_mem = false; int64_t _scope_mem = 0; @@ -178,6 +182,7 @@ inline bool ThreadMemTrackerMgr::push_consumer_tracker(MemTracker* tracker) { inline void ThreadMemTrackerMgr::pop_consumer_tracker() { DCHECK(!_consumer_tracker_stack.empty()); + flush_untracked_mem(); _consumer_tracker_stack.back()->consume(_untracked_mem); _consumer_tracker_stack.back()->release(_reserved_mem); _consumer_tracker_stack.pop_back(); @@ -191,9 +196,13 @@ inline void ThreadMemTrackerMgr::consume(int64_t size, int skip_large_memory_che // subtract size from process global reserved memory, // because this part of the reserved memory has already been used by BE process. _reserved_mem -= size; - // store bytes that not synchronized to process reserved memory. + // temporary store bytes that not synchronized to process reserved memory. _untracked_mem += size; - if (_untracked_mem >= SYNC_PROC_RESERVED_INTERVAL_BYTES) { + // If _untracked_mem > 0, reserved memory that has been used, if _untracked_mem greater than + // SYNC_PROC_RESERVED_INTERVAL_BYTES, release process reserved memory. + // If _untracked_mem < 0, used reserved memory is returned, will increase reserved memory, + // if _untracked_mem less than -SYNC_PROC_RESERVED_INTERVAL_BYTES, increase process reserved memory. + if (std::abs(_untracked_mem) >= SYNC_PROC_RESERVED_INTERVAL_BYTES) { doris::GlobalMemoryArbitrator::release_process_reserved_memory(_untracked_mem); _untracked_mem = 0; } @@ -209,7 +218,9 @@ inline void ThreadMemTrackerMgr::consume(int64_t size, int skip_large_memory_che _untracked_mem = 0; } } + // store bytes that not consumed by thread mem tracker. _untracked_mem += size; + DCHECK(_reserved_mem == 0); if (!_init && !ExecEnv::ready()) { return; } @@ -217,9 +228,7 @@ inline void ThreadMemTrackerMgr::consume(int64_t size, int skip_large_memory_che // and some threads `_untracked_mem <= -config::mem_tracker_consume_min_size_bytes` trigger consumption(), // it will cause tracker->consumption to be temporarily less than 0. // After the jemalloc hook is loaded, before ExecEnv init, _limiter_tracker=nullptr. - if ((_untracked_mem >= config::mem_tracker_consume_min_size_bytes || - _untracked_mem <= -config::mem_tracker_consume_min_size_bytes) && - !_stop_consume) { + if (std::abs(_untracked_mem) >= config::mem_tracker_consume_min_size_bytes && !_stop_consume) { flush_untracked_mem(); } @@ -238,6 +247,12 @@ inline void ThreadMemTrackerMgr::consume(int64_t size, int skip_large_memory_che } inline void ThreadMemTrackerMgr::flush_untracked_mem() { + // if during reserve memory, _untracked_mem temporary store bytes that not synchronized + // to process reserved memory, but bytes have been subtracted from thread _reserved_mem. + // so not need flush untracked_mem to consume mem tracker. + if (_reserved_mem != 0) { + return; + } // Temporary memory may be allocated during the consumption of the mem tracker, which will lead to entering // the Memory Hook again, so suspend consumption to avoid falling into an infinite loop. if (_untracked_mem == 0 || !init()) { @@ -264,9 +279,7 @@ inline bool ThreadMemTrackerMgr::try_reserve(int64_t size) { CHECK(init()); // if _reserved_mem not equal to 0, repeat reserve, // _untracked_mem store bytes that not synchronized to process reserved memory. - if (_reserved_mem == 0) { - flush_untracked_mem(); - } + flush_untracked_mem(); if (!_limiter_tracker_raw->try_consume(size)) { return false; } @@ -281,21 +294,21 @@ inline bool ThreadMemTrackerMgr::try_reserve(int64_t size) { tracker->consume(size); } _reserved_mem += size; - DCHECK(_untracked_mem == 0); return true; } inline void ThreadMemTrackerMgr::release_reserved() { - flush_untracked_mem(); - if (_reserved_mem > 0) { - doris::GlobalMemoryArbitrator::release_process_reserved_memory(_reserved_mem); - _limiter_tracker_raw->consume(-_reserved_mem); + if (_reserved_mem != 0) { + doris::GlobalMemoryArbitrator::release_process_reserved_memory(_reserved_mem + + _untracked_mem); + _limiter_tracker_raw->release(_reserved_mem); if (_count_scope_mem) { _scope_mem -= _reserved_mem; } for (auto* tracker : _consumer_tracker_stack) { - tracker->consume(-_reserved_mem); + tracker->release(_reserved_mem); } + _untracked_mem = 0; _reserved_mem = 0; } } diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp index 00796857d5a..72a47fa076a 100644 --- a/be/src/util/mem_info.cpp +++ b/be/src/util/mem_info.cpp @@ -75,8 +75,8 @@ int64_t MemInfo::_s_cgroup_mem_refresh_wait_times = 0; static std::unordered_map<std::string, int64_t> _mem_info_bytes; std::atomic<int64_t> MemInfo::_s_sys_mem_available = -1; -int64_t MemInfo::_s_sys_mem_available_low_water_mark = -1; -int64_t MemInfo::_s_sys_mem_available_warning_water_mark = -1; +int64_t MemInfo::_s_sys_mem_available_low_water_mark = std::numeric_limits<int64_t>::min(); +int64_t MemInfo::_s_sys_mem_available_warning_water_mark = std::numeric_limits<int64_t>::min(); std::atomic<int64_t> MemInfo::_s_process_minor_gc_size = -1; std::atomic<int64_t> MemInfo::_s_process_full_gc_size = -1; std::mutex MemInfo::je_purge_dirty_pages_lock; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org