This is an automated email from the ASF dual-hosted git repository. zouxinyi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 1f3de0eae3 [fix](memory) fix invalid large memory check && fix memory info thread safety (#22027) 1f3de0eae3 is described below commit 1f3de0eae3f56068a3ea23a066df25454fe67cf5 Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Wed Jul 26 12:18:31 2023 +0800 [fix](memory) fix invalid large memory check && fix memory info thread safety (#22027) fix invalid large memory check fix memory info thread safety --- be/src/common/config.cpp | 2 ++ be/src/common/config.h | 5 +++ be/src/runtime/memory/thread_mem_tracker_mgr.h | 17 ++++++---- be/src/runtime/thread_context.h | 27 ++++++++------- be/src/util/mem_info.cpp | 46 +++++++++++++++----------- be/src/util/mem_info.h | 31 +++++++++++------ be/src/vec/common/allocator.cpp | 20 +++++++++-- be/src/vec/common/allocator.h | 7 ++-- 8 files changed, 102 insertions(+), 53 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 9a0b2a0e3a..cb7f0f3698 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -128,6 +128,8 @@ DEFINE_mBool(enable_query_memory_overcommit, "true"); DEFINE_mBool(disable_memory_gc, "false"); +DEFINE_mInt64(large_memory_check_bytes, "1073741824"); + // The maximum time a thread waits for a full GC. Currently only query will wait for full gc. DEFINE_mInt32(thread_wait_gc_max_milliseconds, "1000"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 169bbbac32..63cb352618 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -169,6 +169,11 @@ DECLARE_mBool(enable_query_memory_overcommit); // default gc strategy is conservative, if you want to exclude the interference of gc, let it be true DECLARE_mBool(disable_memory_gc); +// malloc or new large memory larger than large_memory_check_bytes and Doris Allocator is not used, +// will print a warning containing the stacktrace, but not prevent memory alloc. +// large memory alloc looking forward to using Allocator. +DECLARE_mInt64(large_memory_check_bytes); + // The maximum time a thread waits for a full GC. Currently only query will wait for full gc. DECLARE_mInt32(thread_wait_gc_max_milliseconds); diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index ca4334ad45..2bf866cb9d 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -76,7 +76,7 @@ public: // such as calling LOG/iostream/sstream/stringstream/etc. related methods, // must increase the control to avoid entering infinite recursion, otherwise it may cause crash or stuck, // Returns whether the memory exceeds limit, and will consume mem trcker no matter whether the limit is exceeded. - void consume(int64_t size); + void consume(int64_t size, bool large_memory_check = false); void flush_untracked_mem(); bool is_attach_query() { return _fragment_instance_id != TUniqueId(); } @@ -160,23 +160,28 @@ inline void ThreadMemTrackerMgr::pop_consumer_tracker() { _consumer_tracker_stack.pop_back(); } -inline void ThreadMemTrackerMgr::consume(int64_t size) { +inline void ThreadMemTrackerMgr::consume(int64_t size, bool large_memory_check) { _untracked_mem += size; + if (!ExecEnv::GetInstance()->initialized()) { + return; + } // When some threads `0 < _untracked_mem < config::mem_tracker_consume_min_size_bytes` // and some threads `_untracked_mem <= -config::mem_tracker_consume_min_size_bytes` trigger consumption(), // it will cause tracker->consumption to be temporarily less than 0. // After the jemalloc hook is loaded, before ExecEnv init, _limiter_tracker=nullptr. if ((_untracked_mem >= config::mem_tracker_consume_min_size_bytes || _untracked_mem <= -config::mem_tracker_consume_min_size_bytes) && - !_stop_consume && ExecEnv::GetInstance()->initialized()) { + !_stop_consume) { flush_untracked_mem(); } // Large memory alloc should use allocator.h // Direct malloc or new large memory, unable to catch std::bad_alloc, BE may OOM. - if (size > 1024l * 1024 * 1024 && !doris::config::disable_memory_gc) { // 1G + if (large_memory_check && size > doris::config::large_memory_check_bytes) { _stop_consume = true; - LOG(WARNING) << fmt::format("MemHook alloc large memory: {}, stacktrace:\n{}", size, - get_stack_trace()); + LOG(WARNING) << fmt::format( + "malloc or new large memory: {}, looking forward to using Allocator, this is just " + "a warning, not prevent memory alloc, stacktrace:\n{}", + size, get_stack_trace()); _stop_consume = false; } } diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 2a46e22a18..c32a9d5653 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -86,11 +86,11 @@ #define SCOPED_TRACK_MEMORY_TO_UNKNOWN() (void)0 #endif -#define SKIP_MEMORY_CHECK(...) \ - do { \ - doris::skip_memory_check++; \ - DEFER({ doris::skip_memory_check--; }); \ - __VA_ARGS__; \ +#define SKIP_MEMORY_CHECK(...) \ + do { \ + doris::thread_context()->skip_memory_check++; \ + DEFER({ doris::thread_context()->skip_memory_check--; }); \ + __VA_ARGS__; \ } while (0) namespace doris { @@ -137,7 +137,6 @@ public: }; inline thread_local ThreadContextPtr thread_context_ptr; -inline thread_local int skip_memory_check = 0; // To avoid performance problems caused by frequently calling `bthread_getspecific` to obtain bthread TLS // in tcmalloc hook, cache the key and value of bthread TLS in pthread TLS. @@ -201,7 +200,13 @@ public: return thread_mem_tracker_mgr->limiter_mem_tracker_raw(); } + void consume_memory(const int64_t size) const { + thread_mem_tracker_mgr->consume(size, large_memory_check); + } + int switch_bthread_local_count = 0; + int skip_memory_check = 0; + bool large_memory_check = true; private: TUniqueId _task_id; @@ -364,10 +369,8 @@ private: // Basic macros for mem tracker, usually do not need to be modified and used. #ifdef USE_MEM_TRACKER // For the memory that cannot be counted by mem hook, manually count it into the mem tracker, such as mmap. -#define CONSUME_THREAD_MEM_TRACKER(size) \ - doris::thread_context()->thread_mem_tracker_mgr->consume(size) -#define RELEASE_THREAD_MEM_TRACKER(size) \ - doris::thread_context()->thread_mem_tracker_mgr->consume(-size) +#define CONSUME_THREAD_MEM_TRACKER(size) doris::thread_context()->consume_memory(size) +#define RELEASE_THREAD_MEM_TRACKER(size) doris::thread_context()->consume_memory(-size) // used to fix the tracking accuracy of caches. #define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker) \ @@ -385,7 +388,7 @@ private: #define CONSUME_MEM_TRACKER(size) \ do { \ if (doris::thread_context_ptr.init) { \ - doris::thread_context()->thread_mem_tracker_mgr->consume(size); \ + doris::thread_context()->consume_memory(size); \ } else if (doris::ExecEnv::GetInstance()->initialized()) { \ doris::ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume_no_update_peak(size); \ } \ @@ -393,7 +396,7 @@ private: #define RELEASE_MEM_TRACKER(size) \ do { \ if (doris::thread_context_ptr.init) { \ - doris::thread_context()->thread_mem_tracker_mgr->consume(-size); \ + doris::thread_context()->consume_memory(-size); \ } else if (doris::ExecEnv::GetInstance()->initialized()) { \ doris::ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume_no_update_peak( \ -size); \ diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp index 29c58ca23e..79341c1e77 100644 --- a/be/src/util/mem_info.cpp +++ b/be/src/util/mem_info.cpp @@ -60,14 +60,14 @@ std::string MemInfo::_s_mem_limit_str = ""; int64_t MemInfo::_s_soft_mem_limit = -1; std::string MemInfo::_s_soft_mem_limit_str = ""; -int64_t MemInfo::_s_allocator_cache_mem = 0; +std::atomic<int64_t> MemInfo::_s_allocator_cache_mem = 0; std::string MemInfo::_s_allocator_cache_mem_str = ""; -int64_t MemInfo::_s_virtual_memory_used = 0; -int64_t MemInfo::_s_proc_mem_no_allocator_cache = -1; +std::atomic<int64_t> MemInfo::_s_virtual_memory_used = 0; +std::atomic<int64_t> MemInfo::_s_proc_mem_no_allocator_cache = -1; std::atomic<int64_t> MemInfo::refresh_interval_memory_growth = 0; static std::unordered_map<std::string, int64_t> _mem_info_bytes; -int64_t MemInfo::_s_sys_mem_available = -1; +std::atomic<int64_t> MemInfo::_s_sys_mem_available = -1; std::string MemInfo::_s_sys_mem_available_str = ""; int64_t MemInfo::_s_sys_mem_available_low_water_mark = -1; int64_t MemInfo::_s_sys_mem_available_warning_water_mark = -1; @@ -86,21 +86,26 @@ void MemInfo::refresh_allocator_mem() { // https://jemalloc.net/jemalloc.3.html // https://www.bookstack.cn/read/aliyun-rds-core/4a0cdf677f62feb3.md - _s_allocator_cache_mem = get_je_all_arena_metrics("tcache_bytes") + - get_je_metrics("stats.metadata") + - get_je_all_arena_metrics("pdirty") * get_page_size(); - _s_allocator_cache_mem_str = - PrettyPrinter::print(static_cast<uint64_t>(_s_allocator_cache_mem), TUnit::BYTES); - _s_virtual_memory_used = get_je_metrics("stats.mapped"); + _s_allocator_cache_mem.store(get_je_all_arena_metrics("tcache_bytes") + + get_je_metrics("stats.metadata") + + get_je_all_arena_metrics("pdirty") * get_page_size(), + std::memory_order_relaxed); + _s_allocator_cache_mem_str = PrettyPrinter::print( + static_cast<uint64_t>(_s_allocator_cache_mem.load(std::memory_order_relaxed)), + TUnit::BYTES); + _s_virtual_memory_used.store(get_je_metrics("stats.mapped"), std::memory_order_relaxed); #else - _s_allocator_cache_mem = get_tc_metrics("tcmalloc.pageheap_free_bytes") + - get_tc_metrics("tcmalloc.central_cache_free_bytes") + - get_tc_metrics("tcmalloc.transfer_cache_free_bytes") + - get_tc_metrics("tcmalloc.thread_cache_free_bytes"); - _s_allocator_cache_mem_str = - PrettyPrinter::print(static_cast<uint64_t>(_s_allocator_cache_mem), TUnit::BYTES); - _s_virtual_memory_used = get_tc_metrics("generic.total_physical_bytes") + - get_tc_metrics("tcmalloc.pageheap_unmapped_bytes"); + _s_allocator_cache_mem.store(get_tc_metrics("tcmalloc.pageheap_free_bytes") + + get_tc_metrics("tcmalloc.central_cache_free_bytes") + + get_tc_metrics("tcmalloc.transfer_cache_free_bytes") + + get_tc_metrics("tcmalloc.thread_cache_free_bytes"), + std::memory_order_relaxed); + _s_allocator_cache_mem_str = PrettyPrinter::print( + static_cast<uint64_t>(_s_allocator_cache_mem.load(std::memory_order_relaxed)), + TUnit::BYTES); + _s_virtual_memory_used.store(get_tc_metrics("generic.total_physical_bytes") + + get_tc_metrics("tcmalloc.pageheap_unmapped_bytes"), + std::memory_order_relaxed); #endif } @@ -307,8 +312,9 @@ void MemInfo::refresh_proc_meminfo() { if (meminfo.is_open()) meminfo.close(); if (_mem_info_bytes.find("MemAvailable") != _mem_info_bytes.end()) { - _s_sys_mem_available = _mem_info_bytes["MemAvailable"]; - _s_sys_mem_available_str = PrettyPrinter::print(_s_sys_mem_available, TUnit::BYTES); + _s_sys_mem_available.store(_mem_info_bytes["MemAvailable"], std::memory_order_relaxed); + _s_sys_mem_available_str = PrettyPrinter::print( + _s_sys_mem_available.load(std::memory_order_relaxed), TUnit::BYTES); } } diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h index 179295fd69..98179dda2c 100644 --- a/be/src/util/mem_info.h +++ b/be/src/util/mem_info.h @@ -71,7 +71,8 @@ public: static void refresh_proc_meminfo(); static inline int64_t sys_mem_available() { - return _s_sys_mem_available - refresh_interval_memory_growth; + return _s_sys_mem_available.load(std::memory_order_relaxed) - + refresh_interval_memory_growth; } static inline std::string sys_mem_available_str() { return _s_sys_mem_available_str; } static inline int64_t sys_mem_available_low_water_mark() { @@ -121,11 +122,16 @@ public: #endif } - static inline size_t allocator_virtual_mem() { return _s_virtual_memory_used; } - static inline size_t allocator_cache_mem() { return _s_allocator_cache_mem; } + static inline size_t allocator_virtual_mem() { + return _s_virtual_memory_used.load(std::memory_order_relaxed); + } + static inline size_t allocator_cache_mem() { + return _s_allocator_cache_mem.load(std::memory_order_relaxed); + } static inline std::string allocator_cache_mem_str() { return _s_allocator_cache_mem_str; } static inline int64_t proc_mem_no_allocator_cache() { - return _s_proc_mem_no_allocator_cache + refresh_interval_memory_growth; + return _s_proc_mem_no_allocator_cache.load(std::memory_order_relaxed) + + refresh_interval_memory_growth; } // Tcmalloc property `generic.total_physical_bytes` records the total length of the virtual memory @@ -140,8 +146,10 @@ public: * that can be used at anytime via jemalloc. */ static inline void refresh_proc_mem_no_allocator_cache() { - _s_proc_mem_no_allocator_cache = - PerfCounters::get_vm_rss() - static_cast<int64_t>(_s_allocator_cache_mem); + _s_proc_mem_no_allocator_cache.store( + PerfCounters::get_vm_rss() - static_cast<int64_t>(_s_allocator_cache_mem.load( + std::memory_order_relaxed)), + std::memory_order_relaxed); refresh_interval_memory_growth = 0; } @@ -162,7 +170,8 @@ public: return _s_soft_mem_limit_str; } static bool is_exceed_soft_mem_limit(int64_t bytes = 0) { - return proc_mem_no_allocator_cache() + bytes > soft_mem_limit(); + return proc_mem_no_allocator_cache() + bytes >= soft_mem_limit() || + sys_mem_available() < sys_mem_available_warning_water_mark(); } static std::string debug_string(); @@ -185,12 +194,12 @@ private: static int64_t _s_soft_mem_limit; static std::string _s_soft_mem_limit_str; - static int64_t _s_allocator_cache_mem; + static std::atomic<int64_t> _s_allocator_cache_mem; static std::string _s_allocator_cache_mem_str; - static int64_t _s_virtual_memory_used; - static int64_t _s_proc_mem_no_allocator_cache; + static std::atomic<int64_t> _s_virtual_memory_used; + static std::atomic<int64_t> _s_proc_mem_no_allocator_cache; - static int64_t _s_sys_mem_available; + static std::atomic<int64_t> _s_sys_mem_available; static std::string _s_sys_mem_available_str; static int64_t _s_sys_mem_available_low_water_mark; static int64_t _s_sys_mem_available_warning_water_mark; diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp index 9167c7df9f..ed3a2440ee 100644 --- a/be/src/vec/common/allocator.cpp +++ b/be/src/vec/common/allocator.cpp @@ -38,7 +38,7 @@ template <bool clear_memory_, bool mmap_populate, bool use_mmap> void Allocator<clear_memory_, mmap_populate, use_mmap>::sys_memory_check(size_t size) const { - if (doris::skip_memory_check) return; + if (doris::thread_context()->skip_memory_check) return; if (doris::MemTrackerLimiter::sys_mem_exceed_limit_check(size)) { // Only thread attach query, and has not completely waited for thread_wait_gc_max_milliseconds, // will wait for gc, asynchronous cancel or throw bad::alloc. @@ -116,7 +116,7 @@ void Allocator<clear_memory_, mmap_populate, use_mmap>::sys_memory_check(size_t template <bool clear_memory_, bool mmap_populate, bool use_mmap> void Allocator<clear_memory_, mmap_populate, use_mmap>::memory_tracker_check(size_t size) const { - if (doris::skip_memory_check) return; + if (doris::thread_context()->skip_memory_check) return; auto st = doris::thread_context()->thread_mem_tracker()->check_limit(size); if (!st) { auto err_msg = @@ -175,6 +175,22 @@ void Allocator<clear_memory_, mmap_populate, use_mmap>::throw_bad_alloc( throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err); } +template <bool clear_memory_, bool mmap_populate, bool use_mmap> +void* Allocator<clear_memory_, mmap_populate, use_mmap>::alloc(size_t size, size_t alignment) { + doris::thread_context()->large_memory_check = false; + DEFER({ doris::thread_context()->large_memory_check = true; }); + return alloc_impl(size, alignment); +} + +template <bool clear_memory_, bool mmap_populate, bool use_mmap> +void* Allocator<clear_memory_, mmap_populate, use_mmap>::realloc(void* buf, size_t old_size, + size_t new_size, + size_t alignment) { + doris::thread_context()->large_memory_check = false; + DEFER({ doris::thread_context()->large_memory_check = true; }); + return realloc_impl(buf, old_size, new_size, alignment); +} + template class Allocator<true, true, true>; template class Allocator<true, true, false>; template class Allocator<true, false, true>; diff --git a/be/src/vec/common/allocator.h b/be/src/vec/common/allocator.h index ae29eb916c..24fef16290 100644 --- a/be/src/vec/common/allocator.h +++ b/be/src/vec/common/allocator.h @@ -92,8 +92,11 @@ public: void release_memory(size_t size) const; void throw_bad_alloc(const std::string& err) const; + void* alloc(size_t size, size_t alignment = 0); + void* realloc(void* buf, size_t old_size, size_t new_size, size_t alignment = 0); + /// Allocate memory range. - void* alloc(size_t size, size_t alignment = 0) { + void* alloc_impl(size_t size, size_t alignment = 0) { memory_check(size); void* buf; @@ -155,7 +158,7 @@ public: * Data from old range is moved to the beginning of new range. * Address of memory range could change. */ - void* realloc(void* buf, size_t old_size, size_t new_size, size_t alignment = 0) { + void* realloc_impl(void* buf, size_t old_size, size_t new_size, size_t alignment = 0) { if (old_size == new_size) { /// nothing to do. /// BTW, it's not possible to change alignment while doing realloc. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org