This is an automated email from the ASF dual-hosted git repository.

zouxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0702277196 [improvement](tcmalloc) add moderate mode and avoid oom 
with a lot of cache (#14374)
0702277196 is described below

commit 07022771965376258f9500c06a43e0f048661674
Author: Yongqiang YANG <[email protected]>
AuthorDate: Mon Nov 28 20:17:51 2022 +0800

    [improvement](tcmalloc) add moderate mode and avoid oom with a lot of cache 
(#14374)
    
    ReleaseToSystem aggressively when there are little free memory.
---
 be/src/common/config.h                        |   2 +-
 be/src/common/daemon.cpp                      | 118 +++++++++++++++++++++-----
 be/src/runtime/memory/mem_tracker_limiter.cpp |   1 +
 be/src/runtime/memory/mem_tracker_limiter.h   |   6 ++
 be/src/service/doris_main.cpp                 |  19 ++---
 5 files changed, 114 insertions(+), 32 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 554735b7c5..9a6750067a 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -48,7 +48,7 @@ CONF_String(priority_networks, "");
 
 // memory mode
 // performance or compact
-CONF_String(memory_mode, "performance");
+CONF_String(memory_mode, "moderate");
 
 // process memory limit specified as number of bytes
 // ('<int>[bB]?'), megabytes ('<float>[mM]'), gigabytes ('<float>[gG]'),
diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index 8e3cc663da..9a828b3d7a 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -72,31 +72,109 @@ void Daemon::tcmalloc_gc_thread() {
 #if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && 
!defined(THREAD_SANITIZER) && \
         !defined(USE_JEMALLOC)
 
-    size_t tc_use_memory_min = MemInfo::mem_limit();
+    // Limit size of tcmalloc cache via release_rate and max_cache_percent.
+    // We adjust release_rate according to memory_pressure, which is usage 
percent of memory.
+    int64_t max_cache_percent = 60;
+    double release_rates[10] = {1.0, 1.0, 1.0, 5.0, 5.0, 20.0, 50.0, 100.0, 
500.0, 2000.0};
+    int64_t pressure_limit = 90;
+    bool is_performance_mode = false;
+    size_t physical_limit_bytes = std::min(MemInfo::hard_mem_limit(), 
MemInfo::mem_limit());
+
     if (config::memory_mode == std::string("performance")) {
-        tc_use_memory_min = std::max(tc_use_memory_min / 10 * 9,
-                                     tc_use_memory_min - (size_t)10 * 1024 * 
1024 * 1024);
-    } else {
-        tc_use_memory_min >>= 1;
+        max_cache_percent = 100;
+        pressure_limit = 90;
+        is_performance_mode = true;
+        physical_limit_bytes = std::min(MemInfo::mem_limit(), 
MemInfo::physical_mem());
+    } else if (config::memory_mode == std::string("compact")) {
+        max_cache_percent = 20;
+        pressure_limit = 80;
     }
 
-    while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(10))) 
{
-        size_t used_size = 0;
-        size_t free_size = 0;
-
+    int last_ms = 0;
+    const int kMaxLastMs = 30000;
+    const int kIntervalMs = 10;
+    size_t init_aggressive_decommit = 0;
+    size_t current_aggressive_decommit = 0;
+    size_t expected_aggressive_decommit = 0;
+    int64_t last_memory_pressure = 0;
+
+    
MallocExtension::instance()->GetNumericProperty("tcmalloc.aggressive_memory_decommit",
+                                                    &init_aggressive_decommit);
+    current_aggressive_decommit = init_aggressive_decommit;
+
+    while 
(!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(kIntervalMs)))
 {
+        size_t tc_used_bytes = 0;
+        size_t tc_alloc_bytes = 0;
+        size_t rss = PerfCounters::get_vm_rss();
+
+        
MallocExtension::instance()->GetNumericProperty("generic.total_physical_bytes",
+                                                        &tc_alloc_bytes);
         
MallocExtension::instance()->GetNumericProperty("generic.current_allocated_bytes",
-                                                        &used_size);
-        
MallocExtension::instance()->GetNumericProperty("tcmalloc.pageheap_free_bytes", 
&free_size);
-        size_t alloc_size = used_size + free_size;
-        LOG(INFO) << "tcmalloc.pageheap_free_bytes " << free_size
-                  << ", generic.current_allocated_bytes " << used_size << ", 
tc_use_memory_min "
-                  << tc_use_memory_min;
-
-        if (alloc_size > tc_use_memory_min) {
-            size_t max_free_size = alloc_size * 20 / 100;
-            if (free_size > max_free_size) {
-                MallocExtension::instance()->ReleaseToSystem(free_size - 
max_free_size);
+                                                        &tc_used_bytes);
+        int64_t tc_cached_bytes = tc_alloc_bytes - tc_used_bytes;
+        int64_t to_free_bytes =
+                (int64_t)tc_cached_bytes - (tc_used_bytes * max_cache_percent 
/ 100);
+
+        int64_t memory_pressure = 0;
+        int64_t alloc_bytes = std::max(rss, tc_alloc_bytes);
+        memory_pressure = alloc_bytes * 100 / physical_limit_bytes;
+
+        expected_aggressive_decommit = init_aggressive_decommit;
+        if (memory_pressure > pressure_limit) {
+            // We are reaching oom, so release cache aggressively.
+            // Ideally, we should reuse cache and not allocate from system any 
more,
+            // however, it is hard to set limit on cache of tcmalloc and doris
+            // use mmap in vectorized mode.
+            if (last_memory_pressure <= pressure_limit) {
+                int64_t min_free_bytes = alloc_bytes - physical_limit_bytes * 
9 / 10;
+                to_free_bytes = std::max(to_free_bytes, min_free_bytes);
+                to_free_bytes = std::max(to_free_bytes, tc_cached_bytes * 30 / 
100);
+                to_free_bytes = std::min(to_free_bytes, tc_cached_bytes);
+                expected_aggressive_decommit = 1;
+            } else {
+                // release rate is enough.
+                to_free_bytes = 0;
+            }
+            last_ms = kMaxLastMs;
+        } else if (memory_pressure > (pressure_limit - 10)) {
+            if (last_memory_pressure <= (pressure_limit - 10)) {
+                to_free_bytes = std::max(to_free_bytes, tc_cached_bytes * 10 / 
100);
+            } else {
+                to_free_bytes = 0;
+            }
+        }
+
+        int release_rate_index = memory_pressure / 10;
+        double release_rate = 1.0;
+        if (release_rate_index >= sizeof(release_rates)) {
+            release_rate = 2000.0;
+        } else {
+            release_rate = release_rates[release_rate_index];
+        }
+        MallocExtension::instance()->SetMemoryReleaseRate(release_rate);
+
+        if ((current_aggressive_decommit != expected_aggressive_decommit) && 
!is_performance_mode) {
+            
MallocExtension::instance()->SetNumericProperty("tcmalloc.aggressive_memory_decommit",
+                                                            
expected_aggressive_decommit);
+            current_aggressive_decommit = expected_aggressive_decommit;
+        }
+
+        last_memory_pressure = memory_pressure;
+        if (to_free_bytes > 0) {
+            last_ms += kIntervalMs;
+            if (last_ms >= kMaxLastMs) {
+                LOG(INFO) << "generic.current_allocated_bytes " << 
tc_used_bytes
+                          << ", generic.total_physical_bytes " << 
tc_alloc_bytes << ", rss " << rss
+                          << ", max_cache_percent " << max_cache_percent << ", 
release_rate "
+                          << release_rate << ", memory_pressure " << 
memory_pressure
+                          << ", physical_limit_bytes " << physical_limit_bytes 
<< ", to_free_bytes "
+                          << to_free_bytes << ", current_aggressive_decommit "
+                          << current_aggressive_decommit;
+                MallocExtension::instance()->ReleaseToSystem(to_free_bytes);
+                last_ms = 0;
             }
+        } else {
+            last_ms = 0;
         }
     }
 #endif
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp 
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 9d40acbdd5..c1ef640c7b 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -41,6 +41,7 @@ struct TrackerLimiterGroup {
 static std::vector<TrackerLimiterGroup> mem_tracker_limiter_pool(1000);
 
 std::atomic<bool> MemTrackerLimiter::_enable_print_log_process_usage {true};
+bool MemTrackerLimiter::_oom_avoidance {true};
 
 MemTrackerLimiter::MemTrackerLimiter(Type type, const std::string& label, 
int64_t byte_limit,
                                      RuntimeProfile* profile) {
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h 
b/be/src/runtime/memory/mem_tracker_limiter.h
index 3c8876f408..8685cdf953 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -79,6 +79,9 @@ public:
     ~MemTrackerLimiter();
 
     static bool sys_mem_exceed_limit_check(int64_t bytes) {
+        if (!_oom_avoidance) {
+            return false;
+        }
         // Limit process memory usage using the actual physical memory of the 
process in `/proc/self/status`.
         // This is independent of the consumption value of the mem tracker, 
which counts the virtual memory
         // of the process malloc.
@@ -109,6 +112,8 @@ public:
     // this tracker limiter.
     int64_t spare_capacity() const { return _limit - consumption(); }
 
+    static void disable_oom_avoidance() { _oom_avoidance = false; }
+
 public:
     // If need to consume the tracker frequently, use it
     void cache_consume(int64_t bytes);
@@ -208,6 +213,7 @@ private:
     // Avoid frequent printing.
     bool _enable_print_log_usage = false;
     static std::atomic<bool> _enable_print_log_process_usage;
+    static bool _oom_avoidance;
 
     // Iterator into mem_tracker_limiter_pool for this object. Stored to have 
O(1) remove.
     std::list<MemTrackerLimiter*>::iterator _tracker_limiter_group_it;
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index f43d95eaa9..793fd4d4c8 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -323,21 +323,18 @@ int main(int argc, char** argv) {
 #if !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && 
!defined(LEAK_SANITIZER) && \
         !defined(THREAD_SANITIZER) && !defined(USE_JEMALLOC)
     // Change the total TCMalloc thread cache size if necessary.
-    size_t total_thread_cache_bytes;
-    if 
(!MallocExtension::instance()->GetNumericProperty("tcmalloc.max_total_thread_cache_bytes",
-                                                         
&total_thread_cache_bytes)) {
-        fprintf(stderr, "Failed to get TCMalloc total thread cache size.\n");
-    }
     const size_t kDefaultTotalThreadCacheBytes = 1024 * 1024 * 1024;
-    if (total_thread_cache_bytes < kDefaultTotalThreadCacheBytes) {
-        if (!MallocExtension::instance()->SetNumericProperty(
-                    "tcmalloc.max_total_thread_cache_bytes", 
kDefaultTotalThreadCacheBytes)) {
-            fprintf(stderr, "Failed to change TCMalloc total thread cache 
size.\n");
-            return -1;
-        }
+    if 
(!MallocExtension::instance()->SetNumericProperty("tcmalloc.max_total_thread_cache_bytes",
+                                                         
kDefaultTotalThreadCacheBytes)) {
+        fprintf(stderr, "Failed to change TCMalloc total thread cache 
size.\n");
+        return -1;
     }
 #endif
 
+    if (doris::config::memory_mode == std::string("performance")) {
+        doris::MemTrackerLimiter::disable_oom_avoidance();
+    }
+
     std::vector<doris::StorePath> paths;
     auto olap_res = 
doris::parse_conf_store_paths(doris::config::storage_root_path, &paths);
     if (!olap_res) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to