This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new bd5a593403 [enhancement](memtracker)  Use proc/meminfo MemAvailable to 
control memory and optimize MemTracker log printing (#14335)
bd5a593403 is described below

commit bd5a593403e7af98c3a4a2541549d585167ba58f
Author: Xinyi Zou <zouxiny...@gmail.com>
AuthorDate: Thu Nov 17 22:46:07 2022 +0800

    [enhancement](memtracker)  Use proc/meminfo MemAvailable to control memory 
and optimize MemTracker log printing (#14335)
---
 be/src/runtime/buffer_control_block.h         |  2 +
 be/src/runtime/memory/mem_tracker_limiter.cpp | 15 ++++-
 be/src/runtime/memory/mem_tracker_limiter.h   | 27 +++++----
 be/src/service/doris_main.cpp                 |  5 +-
 be/src/util/mem_info.cpp                      | 86 +++++++++++++++++++--------
 be/src/util/mem_info.h                        | 19 +++++-
 be/src/vec/sink/vresult_sink.cpp              |  1 +
 7 files changed, 111 insertions(+), 44 deletions(-)

diff --git a/be/src/runtime/buffer_control_block.h 
b/be/src/runtime/buffer_control_block.h
index c5c898259a..8528f74164 100644
--- a/be/src/runtime/buffer_control_block.h
+++ b/be/src/runtime/buffer_control_block.h
@@ -91,6 +91,8 @@ public:
         }
     }
 
+    // TODO: The value of query peak mem usage in fe.audit.log comes from a 
random BE,
+    // not the BE with the largest peak mem usage
     void update_max_peak_memory_bytes() {
         if (_query_statistics.get() != nullptr) {
             int64_t max_peak_memory_bytes = 
_query_statistics->calculate_max_peak_memory_bytes();
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp 
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 2453dca631..9d40acbdd5 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -167,6 +167,12 @@ std::string 
MemTrackerLimiter::log_usage(MemTracker::Snapshot snapshot) {
             print_bytes(snapshot.peak_consumption), snapshot.peak_consumption);
 }
 
+std::string MemTrackerLimiter::type_log_usage(MemTracker::Snapshot snapshot) {
+    return fmt::format("Type={}, Used={}({} B), Peak={}({} B)", snapshot.type,
+                       print_bytes(snapshot.cur_consumption), 
snapshot.cur_consumption,
+                       print_bytes(snapshot.peak_consumption), 
snapshot.peak_consumption);
+}
+
 void MemTrackerLimiter::print_log_usage(const std::string& msg) {
     if (_enable_print_log_usage) {
         _enable_print_log_usage = false;
@@ -190,13 +196,16 @@ void MemTrackerLimiter::print_log_process_usage(const 
std::string& msg, bool wit
     if (MemTrackerLimiter::_enable_print_log_process_usage) {
         MemTrackerLimiter::_enable_print_log_process_usage = false;
         std::string detail = msg;
-        detail += "\n    " + MemTrackerLimiter::process_mem_log_str();
-        if (with_stacktrace) detail += "\n" + get_stack_trace();
+        detail += "\nProcess Memory Summary:\n    " + 
MemTrackerLimiter::process_mem_log_str();
+        if (with_stacktrace) detail += "\nAlloc Stacktrace:\n" + 
get_stack_trace();
         std::vector<MemTracker::Snapshot> snapshots;
         MemTrackerLimiter::make_process_snapshots(&snapshots);
         MemTrackerLimiter::make_type_snapshots(&snapshots, 
MemTrackerLimiter::Type::GLOBAL);
+        detail += "\nMemory Tracker Summary:";
         for (const auto& snapshot : snapshots) {
-            if (snapshot.parent_label == "") {
+            if (snapshot.label == "" && snapshot.parent_label == "") {
+                detail += "\n    " + 
MemTrackerLimiter::type_log_usage(snapshot);
+            } else if (snapshot.parent_label == "") {
                 detail += "\n    " + MemTrackerLimiter::log_usage(snapshot);
             } else {
                 detail += "\n    " + MemTracker::log_usage(snapshot);
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h 
b/be/src/runtime/memory/mem_tracker_limiter.h
index c1c631e3ce..3c8876f408 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -87,11 +87,10 @@ public:
         // tcmalloc/jemalloc allocator cache does not participate in the mem 
check as part of the process physical memory.
         // because `new/malloc` will trigger mem hook when using 
tcmalloc/jemalloc allocator cache,
         // but it may not actually alloc physical memory, which is not 
expected in mem hook fail.
-        //
-        // TODO: In order to ensure no OOM, currently reserve 200M, and then 
use the free mem in /proc/meminfo to ensure no OOM.
         if (MemInfo::proc_mem_no_allocator_cache() + bytes >= 
MemInfo::mem_limit() ||
-            PerfCounters::get_vm_rss() + bytes >= MemInfo::hard_mem_limit()) {
-            print_log_process_usage("sys mem exceed limit check faild");
+            MemInfo::sys_mem_available() < 
MemInfo::sys_mem_available_low_water_mark()) {
+            print_log_process_usage(
+                    fmt::format("System Mem Exceed Limit Check Faild, Try 
Alloc: {}", bytes));
             return true;
         }
         return false;
@@ -128,6 +127,7 @@ public:
 
     static std::string log_usage(MemTracker::Snapshot snapshot);
     std::string log_usage() { return log_usage(make_snapshot()); }
+    static std::string type_log_usage(MemTracker::Snapshot snapshot);
     void print_log_usage(const std::string& msg);
     void enable_print_log_usage() { _enable_print_log_usage = true; }
     static void enable_print_log_process_usage() { 
_enable_print_log_process_usage = true; }
@@ -173,18 +173,23 @@ private:
 
     static std::string process_limit_exceeded_errmsg_str(int64_t bytes) {
         return fmt::format(
-                "process memory used {}, tc/jemalloc allocator cache {}, 
exceed limit {}, failed "
-                "alloc size {}",
-                PerfCounters::get_vm_rss_str(), 
MemInfo::allocator_cache_mem_str(),
-                MemInfo::mem_limit_str(), print_bytes(bytes));
+                "process memory used {} exceed limit {} or sys mem available 
{} less than min "
+                "reserve {}, failed "
+                "alloc size {}, tc/jemalloc allocator cache {}",
+                PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(),
+                MemInfo::sys_mem_available_str(),
+                
PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES),
+                print_bytes(bytes), MemInfo::allocator_cache_mem_str());
     }
 
     static std::string process_mem_log_str() {
         return fmt::format(
-                "process memory used {}, limit {}, hard limit {}, tc/jemalloc "
+                "process memory used {} limit {}, sys mem available {} min 
reserve {}, tc/jemalloc "
                 "allocator cache {}",
                 PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(),
-                print_bytes(MemInfo::hard_mem_limit()), 
MemInfo::allocator_cache_mem_str());
+                MemInfo::sys_mem_available_str(),
+                
PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES),
+                MemInfo::allocator_cache_mem_str());
     }
 
 private:
@@ -246,10 +251,10 @@ inline bool MemTrackerLimiter::try_consume(int64_t bytes, 
std::string& failed_ms
 }
 
 inline Status MemTrackerLimiter::check_limit(int64_t bytes) {
-    if (bytes <= 0) return Status::OK();
     if (sys_mem_exceed_limit_check(bytes)) {
         return 
Status::MemoryLimitExceeded(process_limit_exceeded_errmsg_str(bytes));
     }
+    if (bytes <= 0) return Status::OK();
     if (_limit > 0 && _consumption->current_value() + bytes > _limit) {
         return 
Status::MemoryLimitExceeded(tracker_limit_exceeded_errmsg_str(bytes, this));
     }
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index b193f0d8a4..0570e3c319 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -499,12 +499,13 @@ int main(int argc, char** argv) {
         __lsan_do_leak_check();
 #endif
         doris::PerfCounters::refresh_proc_status();
+        doris::MemInfo::refresh_proc_meminfo();
         doris::MemTrackerLimiter::refresh_global_counter();
         
doris::ExecEnv::GetInstance()->load_channel_mgr()->refresh_mem_tracker();
-#if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && 
!defined(THREAD_SANITIZER) && \
-        !defined(USE_JEMALLOC)
+#if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && 
!defined(THREAD_SANITIZER)
         doris::MemInfo::refresh_allocator_mem();
 #endif
+        doris::MemInfo::refresh_proc_mem_no_allocator_cache();
         if (doris::config::memory_debug) {
             doris::MemTrackerLimiter::print_log_process_usage("memory_debug", 
false);
         }
diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp
index 9070b09527..aeaf73cb02 100644
--- a/be/src/util/mem_info.cpp
+++ b/be/src/util/mem_info.cpp
@@ -45,7 +45,6 @@ bool MemInfo::_s_initialized = false;
 int64_t MemInfo::_s_physical_mem = -1;
 int64_t MemInfo::_s_mem_limit = -1;
 std::string MemInfo::_s_mem_limit_str = "";
-int64_t MemInfo::_s_hard_mem_limit = -1;
 size_t MemInfo::_s_allocator_physical_mem = 0;
 size_t MemInfo::_s_pageheap_unmapped_bytes = 0;
 size_t MemInfo::_s_tcmalloc_pageheap_free_bytes = 0;
@@ -57,36 +56,43 @@ std::string MemInfo::_s_allocator_cache_mem_str = "";
 size_t MemInfo::_s_virtual_memory_used = 0;
 int64_t MemInfo::_s_proc_mem_no_allocator_cache = -1;
 
+static std::unordered_map<std::string, int64_t> _mem_info_bytes;
+int64_t MemInfo::_s_sys_mem_available = 0;
+std::string MemInfo::_s_sys_mem_available_str = "";
+int64_t MemInfo::_s_sys_mem_available_low_water_mark = 0;
+
 #ifndef __APPLE__
-void MemInfo::init() {
-    // Read from /proc/meminfo
+void MemInfo::refresh_proc_meminfo() {
     std::ifstream meminfo("/proc/meminfo", std::ios::in);
     std::string line;
 
     while (meminfo.good() && !meminfo.eof()) {
         getline(meminfo, line);
         std::vector<std::string> fields = strings::Split(line, " ", 
strings::SkipWhitespace());
-
-        // We expect lines such as, e.g., 'MemTotal: 16129508 kB'
-        if (fields.size() < 3) {
-            continue;
-        }
-
-        if (fields[0].compare("MemTotal:") != 0) {
-            continue;
-        }
+        if (fields.size() < 2) continue;
+        std::string key = fields[0].substr(0, fields[0].size() - 1);
 
         StringParser::ParseResult result;
-        int64_t mem_total_kb =
+        int64_t mem_value =
                 StringParser::string_to_int<int64_t>(fields[1].data(), 
fields[1].size(), &result);
 
         if (result == StringParser::PARSE_SUCCESS) {
-            // Entries in /proc/meminfo are in KB.
-            _s_physical_mem = mem_total_kb * 1024L;
+            if (fields.size() == 2) {
+                _mem_info_bytes[key] = mem_value;
+            } else if (fields[2].compare("kB") == 0) {
+                _mem_info_bytes[key] = mem_value * 1024L;
+            }
         }
-
-        break;
     }
+    if (meminfo.is_open()) meminfo.close();
+
+    _s_sys_mem_available = _mem_info_bytes["MemAvailable"];
+    _s_sys_mem_available_str = PrettyPrinter::print(_s_sys_mem_available, 
TUnit::BYTES);
+}
+
+void MemInfo::init() {
+    refresh_proc_meminfo();
+    _s_physical_mem = _mem_info_bytes["MemTotal"];
 
     int64_t cgroup_mem_limit = 0;
     Status status = CGroupUtil::find_cgroup_mem_limit(&cgroup_mem_limit);
@@ -94,10 +100,6 @@ void MemInfo::init() {
         _s_physical_mem = std::min(_s_physical_mem, cgroup_mem_limit);
     }
 
-    if (meminfo.is_open()) {
-        meminfo.close();
-    }
-
     if (_s_physical_mem == -1) {
         LOG(WARNING) << "Could not determine amount of physical memory on this 
machine.";
     }
@@ -115,15 +117,49 @@ void MemInfo::init() {
         _s_mem_limit = _s_physical_mem;
     }
     _s_mem_limit_str = PrettyPrinter::print(_s_mem_limit, TUnit::BYTES);
-    _s_hard_mem_limit =
-            _s_physical_mem - std::max<int64_t>(209715200L, _s_physical_mem / 
10); // 200M
+
+    std::string line;
+    int64_t _s_vm_min_free_kbytes = 0;
+    std::ifstream vminfo("/proc/sys/vm/min_free_kbytes", std::ios::in);
+    if (vminfo.good() && !vminfo.eof()) {
+        getline(vminfo, line);
+        boost::algorithm::trim(line);
+        StringParser::ParseResult result;
+        int64_t mem_value = StringParser::string_to_int<int64_t>(line.data(), 
line.size(), &result);
+
+        if (result == StringParser::PARSE_SUCCESS) {
+            _s_vm_min_free_kbytes = mem_value * 1024L;
+        }
+    }
+    if (vminfo.is_open()) vminfo.close();
+
+    // MemAvailable = MemFree - LowWaterMark + (PageCache - min(PageCache / 2, 
LowWaterMark))
+    // LowWaterMark = /proc/sys/vm/min_free_kbytes
+    // Ref:
+    // 
https://serverfault.com/questions/940196/why-is-memavailable-a-lot-less-than-memfreebufferscached
+    // 
https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/commit/?id=34e431b0ae398fc54ea69ff85ec700722c9da773
+    //
+    // available_low_water_mark = p1 - p2
+    // p1: max 3.2G, avoid wasting too much memory on machines with large 
memory larger than 32G.
+    // p2: vm/min_free_kbytes is usually 0.4% - 5% of the total memory, some 
cloud machines vm/min_free_kbytes is 5%,
+    //     in order to avoid wasting too much memory, available_low_water_mark 
minus 1% at most.
+    int64_t p1 = std::min<int64_t>(
+            std::min<int64_t>(_s_physical_mem - _s_mem_limit, _s_physical_mem 
* 0.1), 3435973836L);
+    int64_t p2 = std::max<int64_t>(_s_vm_min_free_kbytes - _s_physical_mem * 
0.01, 0);
+    _s_sys_mem_available_low_water_mark = std::max<int64_t>(p1 - p2, 0);
 
     LOG(INFO) << "Physical Memory: " << PrettyPrinter::print(_s_physical_mem, 
TUnit::BYTES)
               << ", Mem Limit: " << _s_mem_limit_str
-              << ", origin config value: " << config::mem_limit;
+              << ", origin config value: " << config::mem_limit
+              << ", System Mem Available Min Reserve: "
+              << PrettyPrinter::print(_s_sys_mem_available_low_water_mark, 
TUnit::BYTES)
+              << ", Vm Min Free KBytes: "
+              << PrettyPrinter::print(_s_vm_min_free_kbytes, TUnit::BYTES);
     _s_initialized = true;
 }
 #else
+void MemInfo::refresh_proc_meminfo() {}
+
 void MemInfo::init() {
     size_t size = sizeof(_s_physical_mem);
     if (sysctlbyname("hw.memsize", &_s_physical_mem, &size, nullptr, 0) != 0) {
@@ -134,8 +170,6 @@ void MemInfo::init() {
     bool is_percent = true;
     _s_mem_limit = ParseUtil::parse_mem_spec(config::mem_limit, -1, 
_s_physical_mem, &is_percent);
     _s_mem_limit_str = PrettyPrinter::print(_s_mem_limit, TUnit::BYTES);
-    _s_hard_mem_limit =
-            _s_physical_mem - std::max<int64_t>(209715200L, _s_physical_mem / 
10); // 200M
 
     LOG(INFO) << "Physical Memory: " << PrettyPrinter::print(_s_physical_mem, 
TUnit::BYTES);
     _s_initialized = true;
diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h
index d730c4ae1b..a4f10394f7 100644
--- a/be/src/util/mem_info.h
+++ b/be/src/util/mem_info.h
@@ -46,6 +46,14 @@ public:
         return _s_physical_mem;
     }
 
+    static void refresh_proc_meminfo();
+
+    static inline int64_t sys_mem_available() { return _s_sys_mem_available; }
+    static inline std::string sys_mem_available_str() { return 
_s_sys_mem_available_str; }
+    static inline int64_t sys_mem_available_low_water_mark() {
+        return _s_sys_mem_available_low_water_mark;
+    }
+
     static inline size_t current_mem() { return _s_allocator_physical_mem; }
     static inline size_t allocator_virtual_mem() { return 
_s_virtual_memory_used; }
     static inline size_t allocator_cache_mem() { return 
_s_allocator_cache_mem; }
@@ -55,6 +63,7 @@ public:
     // Tcmalloc property `generic.total_physical_bytes` records the total 
length of the virtual memory
     // obtained by the process malloc, not the physical memory actually used 
by the process in the OS.
     static inline void refresh_allocator_mem() {
+#if !defined(USE_JEMALLOC)
         
MallocExtension::instance()->GetNumericProperty("generic.total_physical_bytes",
                                                         
&_s_allocator_physical_mem);
         
MallocExtension::instance()->GetNumericProperty("tcmalloc.pageheap_unmapped_bytes",
@@ -72,6 +81,10 @@ public:
         _s_allocator_cache_mem_str =
                 
PrettyPrinter::print(static_cast<uint64_t>(_s_allocator_cache_mem), 
TUnit::BYTES);
         _s_virtual_memory_used = _s_allocator_physical_mem + 
_s_pageheap_unmapped_bytes;
+#endif
+    }
+
+    static inline void refresh_proc_mem_no_allocator_cache() {
         _s_proc_mem_no_allocator_cache =
                 PerfCounters::get_vm_rss() - 
static_cast<int64_t>(_s_allocator_cache_mem);
     }
@@ -84,7 +97,6 @@ public:
         DCHECK(_s_initialized);
         return _s_mem_limit_str;
     }
-    static inline int64_t hard_mem_limit() { return _s_hard_mem_limit; }
 
     static std::string debug_string();
 
@@ -93,7 +105,6 @@ private:
     static int64_t _s_physical_mem;
     static int64_t _s_mem_limit;
     static std::string _s_mem_limit_str;
-    static int64_t _s_hard_mem_limit;
     static size_t _s_allocator_physical_mem;
     static size_t _s_pageheap_unmapped_bytes;
     static size_t _s_tcmalloc_pageheap_free_bytes;
@@ -104,6 +115,10 @@ private:
     static std::string _s_allocator_cache_mem_str;
     static size_t _s_virtual_memory_used;
     static int64_t _s_proc_mem_no_allocator_cache;
+
+    static int64_t _s_sys_mem_available;
+    static std::string _s_sys_mem_available_str;
+    static int64_t _s_sys_mem_available_low_water_mark;
 };
 
 } // namespace doris
diff --git a/be/src/vec/sink/vresult_sink.cpp b/be/src/vec/sink/vresult_sink.cpp
index 9fe8cf8b66..77e64a8959 100644
--- a/be/src/vec/sink/vresult_sink.cpp
+++ b/be/src/vec/sink/vresult_sink.cpp
@@ -115,6 +115,7 @@ Status VResultSink::close(RuntimeState* state, Status 
exec_status) {
     // close sender, this is normal path end
     if (_sender) {
         if (_writer) 
_sender->update_num_written_rows(_writer->get_written_rows());
+        _sender->update_max_peak_memory_bytes();
         _sender->close(final_status);
     }
     state->exec_env()->result_mgr()->cancel_at_time(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to