This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 83e7235bab6 [fix](memory) Add thread asynchronous purge jemalloc dirty pages (#28655) 83e7235bab6 is described below commit 83e7235bab63417dd36aa20bd313369268c4da50 Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Fri Dec 22 12:05:20 2023 +0800 [fix](memory) Add thread asynchronous purge jemalloc dirty pages (#28655) jemallctl purge all arena dirty pages may take several seconds, which will block memory GC and cause OOM. So purge asynchronously in a thread. --- be/src/common/daemon.cpp | 18 ++++++++++++++++++ be/src/common/daemon.h | 1 + be/src/util/mem_info.cpp | 11 +++++++---- be/src/util/mem_info.h | 9 +++++++++ 4 files changed, 35 insertions(+), 4 deletions(-) diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index 3879ef9ff8b..e3bf1a738b8 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -352,6 +352,21 @@ void Daemon::block_spill_gc_thread() { } } +void Daemon::je_purge_dirty_pages_thread() const { + do { + std::unique_lock<std::mutex> l(doris::MemInfo::je_purge_dirty_pages_lock); + while (_stop_background_threads_latch.count() != 0 && + !doris::MemInfo::je_purge_dirty_pages_notify.load(std::memory_order_relaxed)) { + doris::MemInfo::je_purge_dirty_pages_cv.wait_for(l, std::chrono::seconds(1)); + } + if (_stop_background_threads_latch.count() == 0) { + break; + } + doris::MemInfo::je_purge_all_arena_dirty_pages(); + doris::MemInfo::je_purge_dirty_pages_notify.store(false, std::memory_order_relaxed); + } while (true); +} + void Daemon::start() { Status st; st = Thread::create( @@ -381,6 +396,9 @@ void Daemon::start() { st = Thread::create( "Daemon", "block_spill_gc_thread", [this]() { this->block_spill_gc_thread(); }, &_threads.emplace_back()); + st = Thread::create( + "Daemon", "je_purge_dirty_pages_thread", + [this]() { this->je_purge_dirty_pages_thread(); }, &_threads.emplace_back()); CHECK(st.ok()) << st; } diff --git a/be/src/common/daemon.h b/be/src/common/daemon.h index 139584ba93f..18f78cbe583 100644 --- a/be/src/common/daemon.h +++ b/be/src/common/daemon.h @@ -43,6 +43,7 @@ private: void memtable_memory_limiter_tracker_refresh_thread(); void calculate_metrics_thread(); void block_spill_gc_thread(); + void je_purge_dirty_pages_thread() const; CountDownLatch _stop_background_threads_latch; std::vector<scoped_refptr<Thread>> _threads; diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp index 416ae1ae200..ec79a8f1cc3 100644 --- a/be/src/util/mem_info.cpp +++ b/be/src/util/mem_info.cpp @@ -80,6 +80,9 @@ int64_t MemInfo::_s_sys_mem_available_low_water_mark = -1; int64_t MemInfo::_s_sys_mem_available_warning_water_mark = -1; int64_t MemInfo::_s_process_minor_gc_size = -1; int64_t MemInfo::_s_process_full_gc_size = -1; +std::mutex MemInfo::je_purge_dirty_pages_lock; +std::condition_variable MemInfo::je_purge_dirty_pages_cv; +std::atomic<bool> MemInfo::je_purge_dirty_pages_notify {false}; void MemInfo::refresh_allocator_mem() { #if defined(ADDRESS_SANITIZER) || defined(LEAK_SANITIZER) || defined(THREAD_SANITIZER) @@ -129,7 +132,7 @@ bool MemInfo::process_minor_gc() { std::string pre_sys_mem_available = MemInfo::sys_mem_available_str(); Defer defer {[&]() { - je_purge_all_arena_dirty_pages(); + notify_je_purge_dirty_pages(); std::stringstream ss; profile->pretty_print(&ss); LOG(INFO) << fmt::format( @@ -139,7 +142,7 @@ bool MemInfo::process_minor_gc() { }}; freed_mem += CacheManager::instance()->for_each_cache_prune_stale(profile.get()); - je_purge_all_arena_dirty_pages(); + notify_je_purge_dirty_pages(); if (freed_mem > _s_process_minor_gc_size) { return true; } @@ -180,7 +183,7 @@ bool MemInfo::process_full_gc() { std::string pre_sys_mem_available = MemInfo::sys_mem_available_str(); Defer defer {[&]() { - je_purge_all_arena_dirty_pages(); + notify_je_purge_dirty_pages(); std::stringstream ss; profile->pretty_print(&ss); LOG(INFO) << fmt::format( @@ -190,7 +193,7 @@ bool MemInfo::process_full_gc() { }}; freed_mem += CacheManager::instance()->for_each_cache_prune_all(profile.get()); - je_purge_all_arena_dirty_pages(); + notify_je_purge_dirty_pages(); if (freed_mem > _s_process_full_gc_size) { return true; } diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h index 3691934b800..8d702ddf065 100644 --- a/be/src/util/mem_info.h +++ b/be/src/util/mem_info.h @@ -24,6 +24,7 @@ #include <stdint.h> #include <atomic> +#include <condition_variable> #include <string> #if !defined(__APPLE__) || !defined(_POSIX_C_SOURCE) @@ -127,6 +128,14 @@ public: #endif } + static std::mutex je_purge_dirty_pages_lock; + static std::condition_variable je_purge_dirty_pages_cv; + static std::atomic<bool> je_purge_dirty_pages_notify; + static void notify_je_purge_dirty_pages() { + je_purge_dirty_pages_notify.store(true, std::memory_order_relaxed); + je_purge_dirty_pages_cv.notify_all(); + } + static inline size_t allocator_virtual_mem() { return _s_virtual_memory_used.load(std::memory_order_relaxed); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org