This is an automated email from the ASF dual-hosted git repository.

zouxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f3de0eae3 [fix](memory) fix invalid large memory check && fix memory 
info thread safety (#22027)
1f3de0eae3 is described below

commit 1f3de0eae3f56068a3ea23a066df25454fe67cf5
Author: Xinyi Zou <zouxiny...@gmail.com>
AuthorDate: Wed Jul 26 12:18:31 2023 +0800

    [fix](memory) fix invalid large memory check && fix memory info thread 
safety (#22027)
    
    fix invalid large memory check
    fix memory info thread safety
---
 be/src/common/config.cpp                       |  2 ++
 be/src/common/config.h                         |  5 +++
 be/src/runtime/memory/thread_mem_tracker_mgr.h | 17 ++++++----
 be/src/runtime/thread_context.h                | 27 ++++++++-------
 be/src/util/mem_info.cpp                       | 46 +++++++++++++++-----------
 be/src/util/mem_info.h                         | 31 +++++++++++------
 be/src/vec/common/allocator.cpp                | 20 +++++++++--
 be/src/vec/common/allocator.h                  |  7 ++--
 8 files changed, 102 insertions(+), 53 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 9a0b2a0e3a..cb7f0f3698 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -128,6 +128,8 @@ DEFINE_mBool(enable_query_memory_overcommit, "true");
 
 DEFINE_mBool(disable_memory_gc, "false");
 
+DEFINE_mInt64(large_memory_check_bytes, "1073741824");
+
 // The maximum time a thread waits for a full GC. Currently only query will 
wait for full gc.
 DEFINE_mInt32(thread_wait_gc_max_milliseconds, "1000");
 
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 169bbbac32..63cb352618 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -169,6 +169,11 @@ DECLARE_mBool(enable_query_memory_overcommit);
 // default gc strategy is conservative, if you want to exclude the 
interference of gc, let it be true
 DECLARE_mBool(disable_memory_gc);
 
+// malloc or new large memory larger than large_memory_check_bytes and Doris 
Allocator is not used,
+// will print a warning containing the stacktrace, but not prevent memory 
alloc.
+// large memory alloc looking forward to using Allocator.
+DECLARE_mInt64(large_memory_check_bytes);
+
 // The maximum time a thread waits for a full GC. Currently only query will 
wait for full gc.
 DECLARE_mInt32(thread_wait_gc_max_milliseconds);
 
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h 
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index ca4334ad45..2bf866cb9d 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -76,7 +76,7 @@ public:
     // such as calling LOG/iostream/sstream/stringstream/etc. related methods,
     // must increase the control to avoid entering infinite recursion, 
otherwise it may cause crash or stuck,
     // Returns whether the memory exceeds limit, and will consume mem trcker 
no matter whether the limit is exceeded.
-    void consume(int64_t size);
+    void consume(int64_t size, bool large_memory_check = false);
     void flush_untracked_mem();
 
     bool is_attach_query() { return _fragment_instance_id != TUniqueId(); }
@@ -160,23 +160,28 @@ inline void ThreadMemTrackerMgr::pop_consumer_tracker() {
     _consumer_tracker_stack.pop_back();
 }
 
-inline void ThreadMemTrackerMgr::consume(int64_t size) {
+inline void ThreadMemTrackerMgr::consume(int64_t size, bool 
large_memory_check) {
     _untracked_mem += size;
+    if (!ExecEnv::GetInstance()->initialized()) {
+        return;
+    }
     // When some threads `0 < _untracked_mem < 
config::mem_tracker_consume_min_size_bytes`
     // and some threads `_untracked_mem <= 
-config::mem_tracker_consume_min_size_bytes` trigger consumption(),
     // it will cause tracker->consumption to be temporarily less than 0.
     // After the jemalloc hook is loaded, before ExecEnv init, 
_limiter_tracker=nullptr.
     if ((_untracked_mem >= config::mem_tracker_consume_min_size_bytes ||
          _untracked_mem <= -config::mem_tracker_consume_min_size_bytes) &&
-        !_stop_consume && ExecEnv::GetInstance()->initialized()) {
+        !_stop_consume) {
         flush_untracked_mem();
     }
     // Large memory alloc should use allocator.h
     // Direct malloc or new large memory, unable to catch std::bad_alloc, BE 
may OOM.
-    if (size > 1024l * 1024 * 1024 && !doris::config::disable_memory_gc) { // 
1G
+    if (large_memory_check && size > doris::config::large_memory_check_bytes) {
         _stop_consume = true;
-        LOG(WARNING) << fmt::format("MemHook alloc large memory: {}, 
stacktrace:\n{}", size,
-                                    get_stack_trace());
+        LOG(WARNING) << fmt::format(
+                "malloc or new large memory: {}, looking forward to using 
Allocator, this is just "
+                "a warning, not prevent memory alloc, stacktrace:\n{}",
+                size, get_stack_trace());
         _stop_consume = false;
     }
 }
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 2a46e22a18..c32a9d5653 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -86,11 +86,11 @@
 #define SCOPED_TRACK_MEMORY_TO_UNKNOWN() (void)0
 #endif
 
-#define SKIP_MEMORY_CHECK(...)                  \
-    do {                                        \
-        doris::skip_memory_check++;             \
-        DEFER({ doris::skip_memory_check--; }); \
-        __VA_ARGS__;                            \
+#define SKIP_MEMORY_CHECK(...)                                    \
+    do {                                                          \
+        doris::thread_context()->skip_memory_check++;             \
+        DEFER({ doris::thread_context()->skip_memory_check--; }); \
+        __VA_ARGS__;                                              \
     } while (0)
 
 namespace doris {
@@ -137,7 +137,6 @@ public:
 };
 
 inline thread_local ThreadContextPtr thread_context_ptr;
-inline thread_local int skip_memory_check = 0;
 
 // To avoid performance problems caused by frequently calling 
`bthread_getspecific` to obtain bthread TLS
 // in tcmalloc hook, cache the key and value of bthread TLS in pthread TLS.
@@ -201,7 +200,13 @@ public:
         return thread_mem_tracker_mgr->limiter_mem_tracker_raw();
     }
 
+    void consume_memory(const int64_t size) const {
+        thread_mem_tracker_mgr->consume(size, large_memory_check);
+    }
+
     int switch_bthread_local_count = 0;
+    int skip_memory_check = 0;
+    bool large_memory_check = true;
 
 private:
     TUniqueId _task_id;
@@ -364,10 +369,8 @@ private:
 // Basic macros for mem tracker, usually do not need to be modified and used.
 #ifdef USE_MEM_TRACKER
 // For the memory that cannot be counted by mem hook, manually count it into 
the mem tracker, such as mmap.
-#define CONSUME_THREAD_MEM_TRACKER(size) \
-    doris::thread_context()->thread_mem_tracker_mgr->consume(size)
-#define RELEASE_THREAD_MEM_TRACKER(size) \
-    doris::thread_context()->thread_mem_tracker_mgr->consume(-size)
+#define CONSUME_THREAD_MEM_TRACKER(size) 
doris::thread_context()->consume_memory(size)
+#define RELEASE_THREAD_MEM_TRACKER(size) 
doris::thread_context()->consume_memory(-size)
 
 // used to fix the tracking accuracy of caches.
 #define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker)                          
              \
@@ -385,7 +388,7 @@ private:
 #define CONSUME_MEM_TRACKER(size)                                              
                    \
     do {                                                                       
                    \
         if (doris::thread_context_ptr.init) {                                  
                    \
-            doris::thread_context()->thread_mem_tracker_mgr->consume(size);    
                    \
+            doris::thread_context()->consume_memory(size);                     
                    \
         } else if (doris::ExecEnv::GetInstance()->initialized()) {             
                    \
             
doris::ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume_no_update_peak(size);
 \
         }                                                                      
                    \
@@ -393,7 +396,7 @@ private:
 #define RELEASE_MEM_TRACKER(size)                                              
              \
     do {                                                                       
              \
         if (doris::thread_context_ptr.init) {                                  
              \
-            doris::thread_context()->thread_mem_tracker_mgr->consume(-size);   
              \
+            doris::thread_context()->consume_memory(-size);                    
              \
         } else if (doris::ExecEnv::GetInstance()->initialized()) {             
              \
             
doris::ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume_no_update_peak(
 \
                     -size);                                                    
              \
diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp
index 29c58ca23e..79341c1e77 100644
--- a/be/src/util/mem_info.cpp
+++ b/be/src/util/mem_info.cpp
@@ -60,14 +60,14 @@ std::string MemInfo::_s_mem_limit_str = "";
 int64_t MemInfo::_s_soft_mem_limit = -1;
 std::string MemInfo::_s_soft_mem_limit_str = "";
 
-int64_t MemInfo::_s_allocator_cache_mem = 0;
+std::atomic<int64_t> MemInfo::_s_allocator_cache_mem = 0;
 std::string MemInfo::_s_allocator_cache_mem_str = "";
-int64_t MemInfo::_s_virtual_memory_used = 0;
-int64_t MemInfo::_s_proc_mem_no_allocator_cache = -1;
+std::atomic<int64_t> MemInfo::_s_virtual_memory_used = 0;
+std::atomic<int64_t> MemInfo::_s_proc_mem_no_allocator_cache = -1;
 std::atomic<int64_t> MemInfo::refresh_interval_memory_growth = 0;
 
 static std::unordered_map<std::string, int64_t> _mem_info_bytes;
-int64_t MemInfo::_s_sys_mem_available = -1;
+std::atomic<int64_t> MemInfo::_s_sys_mem_available = -1;
 std::string MemInfo::_s_sys_mem_available_str = "";
 int64_t MemInfo::_s_sys_mem_available_low_water_mark = -1;
 int64_t MemInfo::_s_sys_mem_available_warning_water_mark = -1;
@@ -86,21 +86,26 @@ void MemInfo::refresh_allocator_mem() {
 
     // https://jemalloc.net/jemalloc.3.html
     // https://www.bookstack.cn/read/aliyun-rds-core/4a0cdf677f62feb3.md
-    _s_allocator_cache_mem = get_je_all_arena_metrics("tcache_bytes") +
-                             get_je_metrics("stats.metadata") +
-                             get_je_all_arena_metrics("pdirty") * 
get_page_size();
-    _s_allocator_cache_mem_str =
-            
PrettyPrinter::print(static_cast<uint64_t>(_s_allocator_cache_mem), 
TUnit::BYTES);
-    _s_virtual_memory_used = get_je_metrics("stats.mapped");
+    _s_allocator_cache_mem.store(get_je_all_arena_metrics("tcache_bytes") +
+                                         get_je_metrics("stats.metadata") +
+                                         get_je_all_arena_metrics("pdirty") * 
get_page_size(),
+                                 std::memory_order_relaxed);
+    _s_allocator_cache_mem_str = PrettyPrinter::print(
+            
static_cast<uint64_t>(_s_allocator_cache_mem.load(std::memory_order_relaxed)),
+            TUnit::BYTES);
+    _s_virtual_memory_used.store(get_je_metrics("stats.mapped"), 
std::memory_order_relaxed);
 #else
-    _s_allocator_cache_mem = get_tc_metrics("tcmalloc.pageheap_free_bytes") +
-                             
get_tc_metrics("tcmalloc.central_cache_free_bytes") +
-                             
get_tc_metrics("tcmalloc.transfer_cache_free_bytes") +
-                             
get_tc_metrics("tcmalloc.thread_cache_free_bytes");
-    _s_allocator_cache_mem_str =
-            
PrettyPrinter::print(static_cast<uint64_t>(_s_allocator_cache_mem), 
TUnit::BYTES);
-    _s_virtual_memory_used = get_tc_metrics("generic.total_physical_bytes") +
-                             
get_tc_metrics("tcmalloc.pageheap_unmapped_bytes");
+    
_s_allocator_cache_mem.store(get_tc_metrics("tcmalloc.pageheap_free_bytes") +
+                                         
get_tc_metrics("tcmalloc.central_cache_free_bytes") +
+                                         
get_tc_metrics("tcmalloc.transfer_cache_free_bytes") +
+                                         
get_tc_metrics("tcmalloc.thread_cache_free_bytes"),
+                                 std::memory_order_relaxed);
+    _s_allocator_cache_mem_str = PrettyPrinter::print(
+            
static_cast<uint64_t>(_s_allocator_cache_mem.load(std::memory_order_relaxed)),
+            TUnit::BYTES);
+    
_s_virtual_memory_used.store(get_tc_metrics("generic.total_physical_bytes") +
+                                         
get_tc_metrics("tcmalloc.pageheap_unmapped_bytes"),
+                                 std::memory_order_relaxed);
 #endif
 }
 
@@ -307,8 +312,9 @@ void MemInfo::refresh_proc_meminfo() {
     if (meminfo.is_open()) meminfo.close();
 
     if (_mem_info_bytes.find("MemAvailable") != _mem_info_bytes.end()) {
-        _s_sys_mem_available = _mem_info_bytes["MemAvailable"];
-        _s_sys_mem_available_str = PrettyPrinter::print(_s_sys_mem_available, 
TUnit::BYTES);
+        _s_sys_mem_available.store(_mem_info_bytes["MemAvailable"], 
std::memory_order_relaxed);
+        _s_sys_mem_available_str = PrettyPrinter::print(
+                _s_sys_mem_available.load(std::memory_order_relaxed), 
TUnit::BYTES);
     }
 }
 
diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h
index 179295fd69..98179dda2c 100644
--- a/be/src/util/mem_info.h
+++ b/be/src/util/mem_info.h
@@ -71,7 +71,8 @@ public:
     static void refresh_proc_meminfo();
 
     static inline int64_t sys_mem_available() {
-        return _s_sys_mem_available - refresh_interval_memory_growth;
+        return _s_sys_mem_available.load(std::memory_order_relaxed) -
+               refresh_interval_memory_growth;
     }
     static inline std::string sys_mem_available_str() { return 
_s_sys_mem_available_str; }
     static inline int64_t sys_mem_available_low_water_mark() {
@@ -121,11 +122,16 @@ public:
 #endif
     }
 
-    static inline size_t allocator_virtual_mem() { return 
_s_virtual_memory_used; }
-    static inline size_t allocator_cache_mem() { return 
_s_allocator_cache_mem; }
+    static inline size_t allocator_virtual_mem() {
+        return _s_virtual_memory_used.load(std::memory_order_relaxed);
+    }
+    static inline size_t allocator_cache_mem() {
+        return _s_allocator_cache_mem.load(std::memory_order_relaxed);
+    }
     static inline std::string allocator_cache_mem_str() { return 
_s_allocator_cache_mem_str; }
     static inline int64_t proc_mem_no_allocator_cache() {
-        return _s_proc_mem_no_allocator_cache + refresh_interval_memory_growth;
+        return _s_proc_mem_no_allocator_cache.load(std::memory_order_relaxed) +
+               refresh_interval_memory_growth;
     }
 
     // Tcmalloc property `generic.total_physical_bytes` records the total 
length of the virtual memory
@@ -140,8 +146,10 @@ public:
       * that can be used at anytime via jemalloc.
       */
     static inline void refresh_proc_mem_no_allocator_cache() {
-        _s_proc_mem_no_allocator_cache =
-                PerfCounters::get_vm_rss() - 
static_cast<int64_t>(_s_allocator_cache_mem);
+        _s_proc_mem_no_allocator_cache.store(
+                PerfCounters::get_vm_rss() - 
static_cast<int64_t>(_s_allocator_cache_mem.load(
+                                                     
std::memory_order_relaxed)),
+                std::memory_order_relaxed);
         refresh_interval_memory_growth = 0;
     }
 
@@ -162,7 +170,8 @@ public:
         return _s_soft_mem_limit_str;
     }
     static bool is_exceed_soft_mem_limit(int64_t bytes = 0) {
-        return proc_mem_no_allocator_cache() + bytes > soft_mem_limit();
+        return proc_mem_no_allocator_cache() + bytes >= soft_mem_limit() ||
+               sys_mem_available() < sys_mem_available_warning_water_mark();
     }
 
     static std::string debug_string();
@@ -185,12 +194,12 @@ private:
     static int64_t _s_soft_mem_limit;
     static std::string _s_soft_mem_limit_str;
 
-    static int64_t _s_allocator_cache_mem;
+    static std::atomic<int64_t> _s_allocator_cache_mem;
     static std::string _s_allocator_cache_mem_str;
-    static int64_t _s_virtual_memory_used;
-    static int64_t _s_proc_mem_no_allocator_cache;
+    static std::atomic<int64_t> _s_virtual_memory_used;
+    static std::atomic<int64_t> _s_proc_mem_no_allocator_cache;
 
-    static int64_t _s_sys_mem_available;
+    static std::atomic<int64_t> _s_sys_mem_available;
     static std::string _s_sys_mem_available_str;
     static int64_t _s_sys_mem_available_low_water_mark;
     static int64_t _s_sys_mem_available_warning_water_mark;
diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp
index 9167c7df9f..ed3a2440ee 100644
--- a/be/src/vec/common/allocator.cpp
+++ b/be/src/vec/common/allocator.cpp
@@ -38,7 +38,7 @@
 
 template <bool clear_memory_, bool mmap_populate, bool use_mmap>
 void Allocator<clear_memory_, mmap_populate, 
use_mmap>::sys_memory_check(size_t size) const {
-    if (doris::skip_memory_check) return;
+    if (doris::thread_context()->skip_memory_check) return;
     if (doris::MemTrackerLimiter::sys_mem_exceed_limit_check(size)) {
         // Only thread attach query, and has not completely waited for 
thread_wait_gc_max_milliseconds,
         // will wait for gc, asynchronous cancel or throw bad::alloc.
@@ -116,7 +116,7 @@ void Allocator<clear_memory_, mmap_populate, 
use_mmap>::sys_memory_check(size_t
 
 template <bool clear_memory_, bool mmap_populate, bool use_mmap>
 void Allocator<clear_memory_, mmap_populate, 
use_mmap>::memory_tracker_check(size_t size) const {
-    if (doris::skip_memory_check) return;
+    if (doris::thread_context()->skip_memory_check) return;
     auto st = doris::thread_context()->thread_mem_tracker()->check_limit(size);
     if (!st) {
         auto err_msg =
@@ -175,6 +175,22 @@ void Allocator<clear_memory_, mmap_populate, 
use_mmap>::throw_bad_alloc(
     throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err);
 }
 
+template <bool clear_memory_, bool mmap_populate, bool use_mmap>
+void* Allocator<clear_memory_, mmap_populate, use_mmap>::alloc(size_t size, 
size_t alignment) {
+    doris::thread_context()->large_memory_check = false;
+    DEFER({ doris::thread_context()->large_memory_check = true; });
+    return alloc_impl(size, alignment);
+}
+
+template <bool clear_memory_, bool mmap_populate, bool use_mmap>
+void* Allocator<clear_memory_, mmap_populate, use_mmap>::realloc(void* buf, 
size_t old_size,
+                                                                 size_t 
new_size,
+                                                                 size_t 
alignment) {
+    doris::thread_context()->large_memory_check = false;
+    DEFER({ doris::thread_context()->large_memory_check = true; });
+    return realloc_impl(buf, old_size, new_size, alignment);
+}
+
 template class Allocator<true, true, true>;
 template class Allocator<true, true, false>;
 template class Allocator<true, false, true>;
diff --git a/be/src/vec/common/allocator.h b/be/src/vec/common/allocator.h
index ae29eb916c..24fef16290 100644
--- a/be/src/vec/common/allocator.h
+++ b/be/src/vec/common/allocator.h
@@ -92,8 +92,11 @@ public:
     void release_memory(size_t size) const;
     void throw_bad_alloc(const std::string& err) const;
 
+    void* alloc(size_t size, size_t alignment = 0);
+    void* realloc(void* buf, size_t old_size, size_t new_size, size_t 
alignment = 0);
+
     /// Allocate memory range.
-    void* alloc(size_t size, size_t alignment = 0) {
+    void* alloc_impl(size_t size, size_t alignment = 0) {
         memory_check(size);
         void* buf;
 
@@ -155,7 +158,7 @@ public:
       * Data from old range is moved to the beginning of new range.
       * Address of memory range could change.
       */
-    void* realloc(void* buf, size_t old_size, size_t new_size, size_t 
alignment = 0) {
+    void* realloc_impl(void* buf, size_t old_size, size_t new_size, size_t 
alignment = 0) {
         if (old_size == new_size) {
             /// nothing to do.
             /// BTW, it's not possible to change alignment while doing realloc.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to