This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 83e7235bab6 [fix](memory) Add thread asynchronous purge jemalloc dirty 
pages (#28655)
83e7235bab6 is described below

commit 83e7235bab63417dd36aa20bd313369268c4da50
Author: Xinyi Zou <zouxiny...@gmail.com>
AuthorDate: Fri Dec 22 12:05:20 2023 +0800

    [fix](memory) Add thread asynchronous purge jemalloc dirty pages (#28655)
    
    jemallctl purge all arena dirty pages may take several seconds, which will 
block memory GC and cause OOM.
    So purge asynchronously in a thread.
---
 be/src/common/daemon.cpp | 18 ++++++++++++++++++
 be/src/common/daemon.h   |  1 +
 be/src/util/mem_info.cpp | 11 +++++++----
 be/src/util/mem_info.h   |  9 +++++++++
 4 files changed, 35 insertions(+), 4 deletions(-)

diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index 3879ef9ff8b..e3bf1a738b8 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -352,6 +352,21 @@ void Daemon::block_spill_gc_thread() {
     }
 }
 
+void Daemon::je_purge_dirty_pages_thread() const {
+    do {
+        std::unique_lock<std::mutex> 
l(doris::MemInfo::je_purge_dirty_pages_lock);
+        while (_stop_background_threads_latch.count() != 0 &&
+               
!doris::MemInfo::je_purge_dirty_pages_notify.load(std::memory_order_relaxed)) {
+            doris::MemInfo::je_purge_dirty_pages_cv.wait_for(l, 
std::chrono::seconds(1));
+        }
+        if (_stop_background_threads_latch.count() == 0) {
+            break;
+        }
+        doris::MemInfo::je_purge_all_arena_dirty_pages();
+        doris::MemInfo::je_purge_dirty_pages_notify.store(false, 
std::memory_order_relaxed);
+    } while (true);
+}
+
 void Daemon::start() {
     Status st;
     st = Thread::create(
@@ -381,6 +396,9 @@ void Daemon::start() {
     st = Thread::create(
             "Daemon", "block_spill_gc_thread", [this]() { 
this->block_spill_gc_thread(); },
             &_threads.emplace_back());
+    st = Thread::create(
+            "Daemon", "je_purge_dirty_pages_thread",
+            [this]() { this->je_purge_dirty_pages_thread(); }, 
&_threads.emplace_back());
     CHECK(st.ok()) << st;
 }
 
diff --git a/be/src/common/daemon.h b/be/src/common/daemon.h
index 139584ba93f..18f78cbe583 100644
--- a/be/src/common/daemon.h
+++ b/be/src/common/daemon.h
@@ -43,6 +43,7 @@ private:
     void memtable_memory_limiter_tracker_refresh_thread();
     void calculate_metrics_thread();
     void block_spill_gc_thread();
+    void je_purge_dirty_pages_thread() const;
 
     CountDownLatch _stop_background_threads_latch;
     std::vector<scoped_refptr<Thread>> _threads;
diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp
index 416ae1ae200..ec79a8f1cc3 100644
--- a/be/src/util/mem_info.cpp
+++ b/be/src/util/mem_info.cpp
@@ -80,6 +80,9 @@ int64_t MemInfo::_s_sys_mem_available_low_water_mark = -1;
 int64_t MemInfo::_s_sys_mem_available_warning_water_mark = -1;
 int64_t MemInfo::_s_process_minor_gc_size = -1;
 int64_t MemInfo::_s_process_full_gc_size = -1;
+std::mutex MemInfo::je_purge_dirty_pages_lock;
+std::condition_variable MemInfo::je_purge_dirty_pages_cv;
+std::atomic<bool> MemInfo::je_purge_dirty_pages_notify {false};
 
 void MemInfo::refresh_allocator_mem() {
 #if defined(ADDRESS_SANITIZER) || defined(LEAK_SANITIZER) || 
defined(THREAD_SANITIZER)
@@ -129,7 +132,7 @@ bool MemInfo::process_minor_gc() {
     std::string pre_sys_mem_available = MemInfo::sys_mem_available_str();
 
     Defer defer {[&]() {
-        je_purge_all_arena_dirty_pages();
+        notify_je_purge_dirty_pages();
         std::stringstream ss;
         profile->pretty_print(&ss);
         LOG(INFO) << fmt::format(
@@ -139,7 +142,7 @@ bool MemInfo::process_minor_gc() {
     }};
 
     freed_mem += 
CacheManager::instance()->for_each_cache_prune_stale(profile.get());
-    je_purge_all_arena_dirty_pages();
+    notify_je_purge_dirty_pages();
     if (freed_mem > _s_process_minor_gc_size) {
         return true;
     }
@@ -180,7 +183,7 @@ bool MemInfo::process_full_gc() {
     std::string pre_sys_mem_available = MemInfo::sys_mem_available_str();
 
     Defer defer {[&]() {
-        je_purge_all_arena_dirty_pages();
+        notify_je_purge_dirty_pages();
         std::stringstream ss;
         profile->pretty_print(&ss);
         LOG(INFO) << fmt::format(
@@ -190,7 +193,7 @@ bool MemInfo::process_full_gc() {
     }};
 
     freed_mem += 
CacheManager::instance()->for_each_cache_prune_all(profile.get());
-    je_purge_all_arena_dirty_pages();
+    notify_je_purge_dirty_pages();
     if (freed_mem > _s_process_full_gc_size) {
         return true;
     }
diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h
index 3691934b800..8d702ddf065 100644
--- a/be/src/util/mem_info.h
+++ b/be/src/util/mem_info.h
@@ -24,6 +24,7 @@
 #include <stdint.h>
 
 #include <atomic>
+#include <condition_variable>
 #include <string>
 
 #if !defined(__APPLE__) || !defined(_POSIX_C_SOURCE)
@@ -127,6 +128,14 @@ public:
 #endif
     }
 
+    static std::mutex je_purge_dirty_pages_lock;
+    static std::condition_variable je_purge_dirty_pages_cv;
+    static std::atomic<bool> je_purge_dirty_pages_notify;
+    static void notify_je_purge_dirty_pages() {
+        je_purge_dirty_pages_notify.store(true, std::memory_order_relaxed);
+        je_purge_dirty_pages_cv.notify_all();
+    }
+
     static inline size_t allocator_virtual_mem() {
         return _s_virtual_memory_used.load(std::memory_order_relaxed);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to