This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit f8fcd17f33deab0605c9378850a21714293ef1b5
Author: Xinyi Zou <zouxiny...@gmail.com>
AuthorDate: Mon May 27 18:11:37 2024 +0800

    [fix](memory) Fix nested scoped tracker and nested reserve memory (#35257)
    
    SCOPED_ATTACH_TASK cannot be nested, but 
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER can continue to be called, so 
attach_limiter_tracker may be nested.
---
 be/src/runtime/memory/thread_mem_tracker_mgr.cpp | 14 ++++++--
 be/src/runtime/memory/thread_mem_tracker_mgr.h   | 41 ++++++++++++++++--------
 be/src/util/mem_info.cpp                         |  4 +--
 3 files changed, 41 insertions(+), 18 deletions(-)

diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp 
b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
index 766ee643584..b37b35313d8 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
@@ -44,9 +44,16 @@ private:
 void ThreadMemTrackerMgr::attach_limiter_tracker(
         const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
     DCHECK(mem_tracker);
-    DCHECK(_reserved_mem == 0);
     CHECK(init());
     flush_untracked_mem();
+    _reserved_mem_stack.push_back(_reserved_mem);
+    if (_reserved_mem != 0) {
+        // _untracked_mem temporary store bytes that not synchronized to 
process reserved memory,
+        // but bytes have been subtracted from thread _reserved_mem.
+        
doris::GlobalMemoryArbitrator::release_process_reserved_memory(_untracked_mem);
+        _reserved_mem = 0;
+        _untracked_mem = 0;
+    }
     _limiter_tracker = mem_tracker;
     _limiter_tracker_raw = mem_tracker.get();
 }
@@ -54,8 +61,11 @@ void ThreadMemTrackerMgr::attach_limiter_tracker(
 void ThreadMemTrackerMgr::detach_limiter_tracker(
         const std::shared_ptr<MemTrackerLimiter>& old_mem_tracker) {
     CHECK(init());
-    release_reserved();
     flush_untracked_mem();
+    release_reserved();
+    DCHECK(!_reserved_mem_stack.empty());
+    _reserved_mem = _reserved_mem_stack.back();
+    _reserved_mem_stack.pop_back();
     _limiter_tracker = old_mem_tracker;
     _limiter_tracker_raw = old_mem_tracker.get();
 }
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h 
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index 6081b013346..64c2190a149 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -48,6 +48,7 @@ public:
     ~ThreadMemTrackerMgr() {
         // if _init == false, exec env is not initialized when init(). and 
never consumed mem tracker once.
         if (_init) {
+            DCHECK(_reserved_mem == 0);
             flush_untracked_mem();
         }
     }
@@ -132,6 +133,9 @@ private:
     int64_t _old_untracked_mem = 0;
 
     int64_t _reserved_mem = 0;
+    // SCOPED_ATTACH_TASK cannot be nested, but 
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER can continue to be used,
+    // so `attach_limiter_tracker` may be nested.
+    std::vector<int64_t> _reserved_mem_stack;
 
     bool _count_scope_mem = false;
     int64_t _scope_mem = 0;
@@ -178,6 +182,7 @@ inline bool 
ThreadMemTrackerMgr::push_consumer_tracker(MemTracker* tracker) {
 
 inline void ThreadMemTrackerMgr::pop_consumer_tracker() {
     DCHECK(!_consumer_tracker_stack.empty());
+    flush_untracked_mem();
     _consumer_tracker_stack.back()->consume(_untracked_mem);
     _consumer_tracker_stack.back()->release(_reserved_mem);
     _consumer_tracker_stack.pop_back();
@@ -191,9 +196,13 @@ inline void ThreadMemTrackerMgr::consume(int64_t size, int 
skip_large_memory_che
             // subtract size from process global reserved memory,
             // because this part of the reserved memory has already been used 
by BE process.
             _reserved_mem -= size;
-            // store bytes that not synchronized to process reserved memory.
+            // temporary store bytes that not synchronized to process reserved 
memory.
             _untracked_mem += size;
-            if (_untracked_mem >= SYNC_PROC_RESERVED_INTERVAL_BYTES) {
+            // If _untracked_mem > 0, reserved memory that has been used, if 
_untracked_mem greater than
+            // SYNC_PROC_RESERVED_INTERVAL_BYTES, release process reserved 
memory.
+            // If _untracked_mem < 0, used reserved memory is returned, will 
increase reserved memory,
+            // if _untracked_mem less than -SYNC_PROC_RESERVED_INTERVAL_BYTES, 
increase process reserved memory.
+            if (std::abs(_untracked_mem) >= SYNC_PROC_RESERVED_INTERVAL_BYTES) 
{
                 
doris::GlobalMemoryArbitrator::release_process_reserved_memory(_untracked_mem);
                 _untracked_mem = 0;
             }
@@ -209,7 +218,9 @@ inline void ThreadMemTrackerMgr::consume(int64_t size, int 
skip_large_memory_che
             _untracked_mem = 0;
         }
     }
+    // store bytes that not consumed by thread mem tracker.
     _untracked_mem += size;
+    DCHECK(_reserved_mem == 0);
     if (!_init && !ExecEnv::ready()) {
         return;
     }
@@ -217,9 +228,7 @@ inline void ThreadMemTrackerMgr::consume(int64_t size, int 
skip_large_memory_che
     // and some threads `_untracked_mem <= 
-config::mem_tracker_consume_min_size_bytes` trigger consumption(),
     // it will cause tracker->consumption to be temporarily less than 0.
     // After the jemalloc hook is loaded, before ExecEnv init, 
_limiter_tracker=nullptr.
-    if ((_untracked_mem >= config::mem_tracker_consume_min_size_bytes ||
-         _untracked_mem <= -config::mem_tracker_consume_min_size_bytes) &&
-        !_stop_consume) {
+    if (std::abs(_untracked_mem) >= config::mem_tracker_consume_min_size_bytes 
&& !_stop_consume) {
         flush_untracked_mem();
     }
 
@@ -238,6 +247,12 @@ inline void ThreadMemTrackerMgr::consume(int64_t size, int 
skip_large_memory_che
 }
 
 inline void ThreadMemTrackerMgr::flush_untracked_mem() {
+    // if during reserve memory, _untracked_mem temporary store bytes that not 
synchronized
+    // to process reserved memory, but bytes have been subtracted from thread 
_reserved_mem.
+    // so not need flush untracked_mem to consume mem tracker.
+    if (_reserved_mem != 0) {
+        return;
+    }
     // Temporary memory may be allocated during the consumption of the mem 
tracker, which will lead to entering
     // the Memory Hook again, so suspend consumption to avoid falling into an 
infinite loop.
     if (_untracked_mem == 0 || !init()) {
@@ -264,9 +279,7 @@ inline bool ThreadMemTrackerMgr::try_reserve(int64_t size) {
     CHECK(init());
     // if _reserved_mem not equal to 0, repeat reserve,
     // _untracked_mem store bytes that not synchronized to process reserved 
memory.
-    if (_reserved_mem == 0) {
-        flush_untracked_mem();
-    }
+    flush_untracked_mem();
     if (!_limiter_tracker_raw->try_consume(size)) {
         return false;
     }
@@ -281,21 +294,21 @@ inline bool ThreadMemTrackerMgr::try_reserve(int64_t 
size) {
         tracker->consume(size);
     }
     _reserved_mem += size;
-    DCHECK(_untracked_mem == 0);
     return true;
 }
 
 inline void ThreadMemTrackerMgr::release_reserved() {
-    flush_untracked_mem();
-    if (_reserved_mem > 0) {
-        
doris::GlobalMemoryArbitrator::release_process_reserved_memory(_reserved_mem);
-        _limiter_tracker_raw->consume(-_reserved_mem);
+    if (_reserved_mem != 0) {
+        
doris::GlobalMemoryArbitrator::release_process_reserved_memory(_reserved_mem +
+                                                                       
_untracked_mem);
+        _limiter_tracker_raw->release(_reserved_mem);
         if (_count_scope_mem) {
             _scope_mem -= _reserved_mem;
         }
         for (auto* tracker : _consumer_tracker_stack) {
-            tracker->consume(-_reserved_mem);
+            tracker->release(_reserved_mem);
         }
+        _untracked_mem = 0;
         _reserved_mem = 0;
     }
 }
diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp
index 00796857d5a..72a47fa076a 100644
--- a/be/src/util/mem_info.cpp
+++ b/be/src/util/mem_info.cpp
@@ -75,8 +75,8 @@ int64_t MemInfo::_s_cgroup_mem_refresh_wait_times = 0;
 
 static std::unordered_map<std::string, int64_t> _mem_info_bytes;
 std::atomic<int64_t> MemInfo::_s_sys_mem_available = -1;
-int64_t MemInfo::_s_sys_mem_available_low_water_mark = -1;
-int64_t MemInfo::_s_sys_mem_available_warning_water_mark = -1;
+int64_t MemInfo::_s_sys_mem_available_low_water_mark = 
std::numeric_limits<int64_t>::min();
+int64_t MemInfo::_s_sys_mem_available_warning_water_mark = 
std::numeric_limits<int64_t>::min();
 std::atomic<int64_t> MemInfo::_s_process_minor_gc_size = -1;
 std::atomic<int64_t> MemInfo::_s_process_full_gc_size = -1;
 std::mutex MemInfo::je_purge_dirty_pages_lock;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to