This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new bd5a593403 [enhancement](memtracker) Use proc/meminfo MemAvailable to control memory and optimize MemTracker log printing (#14335) bd5a593403 is described below commit bd5a593403e7af98c3a4a2541549d585167ba58f Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Thu Nov 17 22:46:07 2022 +0800 [enhancement](memtracker) Use proc/meminfo MemAvailable to control memory and optimize MemTracker log printing (#14335) --- be/src/runtime/buffer_control_block.h | 2 + be/src/runtime/memory/mem_tracker_limiter.cpp | 15 ++++- be/src/runtime/memory/mem_tracker_limiter.h | 27 +++++---- be/src/service/doris_main.cpp | 5 +- be/src/util/mem_info.cpp | 86 +++++++++++++++++++-------- be/src/util/mem_info.h | 19 +++++- be/src/vec/sink/vresult_sink.cpp | 1 + 7 files changed, 111 insertions(+), 44 deletions(-) diff --git a/be/src/runtime/buffer_control_block.h b/be/src/runtime/buffer_control_block.h index c5c898259a..8528f74164 100644 --- a/be/src/runtime/buffer_control_block.h +++ b/be/src/runtime/buffer_control_block.h @@ -91,6 +91,8 @@ public: } } + // TODO: The value of query peak mem usage in fe.audit.log comes from a random BE, + // not the BE with the largest peak mem usage void update_max_peak_memory_bytes() { if (_query_statistics.get() != nullptr) { int64_t max_peak_memory_bytes = _query_statistics->calculate_max_peak_memory_bytes(); diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 2453dca631..9d40acbdd5 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -167,6 +167,12 @@ std::string MemTrackerLimiter::log_usage(MemTracker::Snapshot snapshot) { print_bytes(snapshot.peak_consumption), snapshot.peak_consumption); } +std::string MemTrackerLimiter::type_log_usage(MemTracker::Snapshot snapshot) { + return fmt::format("Type={}, Used={}({} B), Peak={}({} B)", snapshot.type, + print_bytes(snapshot.cur_consumption), snapshot.cur_consumption, + print_bytes(snapshot.peak_consumption), snapshot.peak_consumption); +} + void MemTrackerLimiter::print_log_usage(const std::string& msg) { if (_enable_print_log_usage) { _enable_print_log_usage = false; @@ -190,13 +196,16 @@ void MemTrackerLimiter::print_log_process_usage(const std::string& msg, bool wit if (MemTrackerLimiter::_enable_print_log_process_usage) { MemTrackerLimiter::_enable_print_log_process_usage = false; std::string detail = msg; - detail += "\n " + MemTrackerLimiter::process_mem_log_str(); - if (with_stacktrace) detail += "\n" + get_stack_trace(); + detail += "\nProcess Memory Summary:\n " + MemTrackerLimiter::process_mem_log_str(); + if (with_stacktrace) detail += "\nAlloc Stacktrace:\n" + get_stack_trace(); std::vector<MemTracker::Snapshot> snapshots; MemTrackerLimiter::make_process_snapshots(&snapshots); MemTrackerLimiter::make_type_snapshots(&snapshots, MemTrackerLimiter::Type::GLOBAL); + detail += "\nMemory Tracker Summary:"; for (const auto& snapshot : snapshots) { - if (snapshot.parent_label == "") { + if (snapshot.label == "" && snapshot.parent_label == "") { + detail += "\n " + MemTrackerLimiter::type_log_usage(snapshot); + } else if (snapshot.parent_label == "") { detail += "\n " + MemTrackerLimiter::log_usage(snapshot); } else { detail += "\n " + MemTracker::log_usage(snapshot); diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index c1c631e3ce..3c8876f408 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -87,11 +87,10 @@ public: // tcmalloc/jemalloc allocator cache does not participate in the mem check as part of the process physical memory. // because `new/malloc` will trigger mem hook when using tcmalloc/jemalloc allocator cache, // but it may not actually alloc physical memory, which is not expected in mem hook fail. - // - // TODO: In order to ensure no OOM, currently reserve 200M, and then use the free mem in /proc/meminfo to ensure no OOM. if (MemInfo::proc_mem_no_allocator_cache() + bytes >= MemInfo::mem_limit() || - PerfCounters::get_vm_rss() + bytes >= MemInfo::hard_mem_limit()) { - print_log_process_usage("sys mem exceed limit check faild"); + MemInfo::sys_mem_available() < MemInfo::sys_mem_available_low_water_mark()) { + print_log_process_usage( + fmt::format("System Mem Exceed Limit Check Faild, Try Alloc: {}", bytes)); return true; } return false; @@ -128,6 +127,7 @@ public: static std::string log_usage(MemTracker::Snapshot snapshot); std::string log_usage() { return log_usage(make_snapshot()); } + static std::string type_log_usage(MemTracker::Snapshot snapshot); void print_log_usage(const std::string& msg); void enable_print_log_usage() { _enable_print_log_usage = true; } static void enable_print_log_process_usage() { _enable_print_log_process_usage = true; } @@ -173,18 +173,23 @@ private: static std::string process_limit_exceeded_errmsg_str(int64_t bytes) { return fmt::format( - "process memory used {}, tc/jemalloc allocator cache {}, exceed limit {}, failed " - "alloc size {}", - PerfCounters::get_vm_rss_str(), MemInfo::allocator_cache_mem_str(), - MemInfo::mem_limit_str(), print_bytes(bytes)); + "process memory used {} exceed limit {} or sys mem available {} less than min " + "reserve {}, failed " + "alloc size {}, tc/jemalloc allocator cache {}", + PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(), + MemInfo::sys_mem_available_str(), + PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES), + print_bytes(bytes), MemInfo::allocator_cache_mem_str()); } static std::string process_mem_log_str() { return fmt::format( - "process memory used {}, limit {}, hard limit {}, tc/jemalloc " + "process memory used {} limit {}, sys mem available {} min reserve {}, tc/jemalloc " "allocator cache {}", PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(), - print_bytes(MemInfo::hard_mem_limit()), MemInfo::allocator_cache_mem_str()); + MemInfo::sys_mem_available_str(), + PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES), + MemInfo::allocator_cache_mem_str()); } private: @@ -246,10 +251,10 @@ inline bool MemTrackerLimiter::try_consume(int64_t bytes, std::string& failed_ms } inline Status MemTrackerLimiter::check_limit(int64_t bytes) { - if (bytes <= 0) return Status::OK(); if (sys_mem_exceed_limit_check(bytes)) { return Status::MemoryLimitExceeded(process_limit_exceeded_errmsg_str(bytes)); } + if (bytes <= 0) return Status::OK(); if (_limit > 0 && _consumption->current_value() + bytes > _limit) { return Status::MemoryLimitExceeded(tracker_limit_exceeded_errmsg_str(bytes, this)); } diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index b193f0d8a4..0570e3c319 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -499,12 +499,13 @@ int main(int argc, char** argv) { __lsan_do_leak_check(); #endif doris::PerfCounters::refresh_proc_status(); + doris::MemInfo::refresh_proc_meminfo(); doris::MemTrackerLimiter::refresh_global_counter(); doris::ExecEnv::GetInstance()->load_channel_mgr()->refresh_mem_tracker(); -#if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) && \ - !defined(USE_JEMALLOC) +#if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) doris::MemInfo::refresh_allocator_mem(); #endif + doris::MemInfo::refresh_proc_mem_no_allocator_cache(); if (doris::config::memory_debug) { doris::MemTrackerLimiter::print_log_process_usage("memory_debug", false); } diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp index 9070b09527..aeaf73cb02 100644 --- a/be/src/util/mem_info.cpp +++ b/be/src/util/mem_info.cpp @@ -45,7 +45,6 @@ bool MemInfo::_s_initialized = false; int64_t MemInfo::_s_physical_mem = -1; int64_t MemInfo::_s_mem_limit = -1; std::string MemInfo::_s_mem_limit_str = ""; -int64_t MemInfo::_s_hard_mem_limit = -1; size_t MemInfo::_s_allocator_physical_mem = 0; size_t MemInfo::_s_pageheap_unmapped_bytes = 0; size_t MemInfo::_s_tcmalloc_pageheap_free_bytes = 0; @@ -57,36 +56,43 @@ std::string MemInfo::_s_allocator_cache_mem_str = ""; size_t MemInfo::_s_virtual_memory_used = 0; int64_t MemInfo::_s_proc_mem_no_allocator_cache = -1; +static std::unordered_map<std::string, int64_t> _mem_info_bytes; +int64_t MemInfo::_s_sys_mem_available = 0; +std::string MemInfo::_s_sys_mem_available_str = ""; +int64_t MemInfo::_s_sys_mem_available_low_water_mark = 0; + #ifndef __APPLE__ -void MemInfo::init() { - // Read from /proc/meminfo +void MemInfo::refresh_proc_meminfo() { std::ifstream meminfo("/proc/meminfo", std::ios::in); std::string line; while (meminfo.good() && !meminfo.eof()) { getline(meminfo, line); std::vector<std::string> fields = strings::Split(line, " ", strings::SkipWhitespace()); - - // We expect lines such as, e.g., 'MemTotal: 16129508 kB' - if (fields.size() < 3) { - continue; - } - - if (fields[0].compare("MemTotal:") != 0) { - continue; - } + if (fields.size() < 2) continue; + std::string key = fields[0].substr(0, fields[0].size() - 1); StringParser::ParseResult result; - int64_t mem_total_kb = + int64_t mem_value = StringParser::string_to_int<int64_t>(fields[1].data(), fields[1].size(), &result); if (result == StringParser::PARSE_SUCCESS) { - // Entries in /proc/meminfo are in KB. - _s_physical_mem = mem_total_kb * 1024L; + if (fields.size() == 2) { + _mem_info_bytes[key] = mem_value; + } else if (fields[2].compare("kB") == 0) { + _mem_info_bytes[key] = mem_value * 1024L; + } } - - break; } + if (meminfo.is_open()) meminfo.close(); + + _s_sys_mem_available = _mem_info_bytes["MemAvailable"]; + _s_sys_mem_available_str = PrettyPrinter::print(_s_sys_mem_available, TUnit::BYTES); +} + +void MemInfo::init() { + refresh_proc_meminfo(); + _s_physical_mem = _mem_info_bytes["MemTotal"]; int64_t cgroup_mem_limit = 0; Status status = CGroupUtil::find_cgroup_mem_limit(&cgroup_mem_limit); @@ -94,10 +100,6 @@ void MemInfo::init() { _s_physical_mem = std::min(_s_physical_mem, cgroup_mem_limit); } - if (meminfo.is_open()) { - meminfo.close(); - } - if (_s_physical_mem == -1) { LOG(WARNING) << "Could not determine amount of physical memory on this machine."; } @@ -115,15 +117,49 @@ void MemInfo::init() { _s_mem_limit = _s_physical_mem; } _s_mem_limit_str = PrettyPrinter::print(_s_mem_limit, TUnit::BYTES); - _s_hard_mem_limit = - _s_physical_mem - std::max<int64_t>(209715200L, _s_physical_mem / 10); // 200M + + std::string line; + int64_t _s_vm_min_free_kbytes = 0; + std::ifstream vminfo("/proc/sys/vm/min_free_kbytes", std::ios::in); + if (vminfo.good() && !vminfo.eof()) { + getline(vminfo, line); + boost::algorithm::trim(line); + StringParser::ParseResult result; + int64_t mem_value = StringParser::string_to_int<int64_t>(line.data(), line.size(), &result); + + if (result == StringParser::PARSE_SUCCESS) { + _s_vm_min_free_kbytes = mem_value * 1024L; + } + } + if (vminfo.is_open()) vminfo.close(); + + // MemAvailable = MemFree - LowWaterMark + (PageCache - min(PageCache / 2, LowWaterMark)) + // LowWaterMark = /proc/sys/vm/min_free_kbytes + // Ref: + // https://serverfault.com/questions/940196/why-is-memavailable-a-lot-less-than-memfreebufferscached + // https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/commit/?id=34e431b0ae398fc54ea69ff85ec700722c9da773 + // + // available_low_water_mark = p1 - p2 + // p1: max 3.2G, avoid wasting too much memory on machines with large memory larger than 32G. + // p2: vm/min_free_kbytes is usually 0.4% - 5% of the total memory, some cloud machines vm/min_free_kbytes is 5%, + // in order to avoid wasting too much memory, available_low_water_mark minus 1% at most. + int64_t p1 = std::min<int64_t>( + std::min<int64_t>(_s_physical_mem - _s_mem_limit, _s_physical_mem * 0.1), 3435973836L); + int64_t p2 = std::max<int64_t>(_s_vm_min_free_kbytes - _s_physical_mem * 0.01, 0); + _s_sys_mem_available_low_water_mark = std::max<int64_t>(p1 - p2, 0); LOG(INFO) << "Physical Memory: " << PrettyPrinter::print(_s_physical_mem, TUnit::BYTES) << ", Mem Limit: " << _s_mem_limit_str - << ", origin config value: " << config::mem_limit; + << ", origin config value: " << config::mem_limit + << ", System Mem Available Min Reserve: " + << PrettyPrinter::print(_s_sys_mem_available_low_water_mark, TUnit::BYTES) + << ", Vm Min Free KBytes: " + << PrettyPrinter::print(_s_vm_min_free_kbytes, TUnit::BYTES); _s_initialized = true; } #else +void MemInfo::refresh_proc_meminfo() {} + void MemInfo::init() { size_t size = sizeof(_s_physical_mem); if (sysctlbyname("hw.memsize", &_s_physical_mem, &size, nullptr, 0) != 0) { @@ -134,8 +170,6 @@ void MemInfo::init() { bool is_percent = true; _s_mem_limit = ParseUtil::parse_mem_spec(config::mem_limit, -1, _s_physical_mem, &is_percent); _s_mem_limit_str = PrettyPrinter::print(_s_mem_limit, TUnit::BYTES); - _s_hard_mem_limit = - _s_physical_mem - std::max<int64_t>(209715200L, _s_physical_mem / 10); // 200M LOG(INFO) << "Physical Memory: " << PrettyPrinter::print(_s_physical_mem, TUnit::BYTES); _s_initialized = true; diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h index d730c4ae1b..a4f10394f7 100644 --- a/be/src/util/mem_info.h +++ b/be/src/util/mem_info.h @@ -46,6 +46,14 @@ public: return _s_physical_mem; } + static void refresh_proc_meminfo(); + + static inline int64_t sys_mem_available() { return _s_sys_mem_available; } + static inline std::string sys_mem_available_str() { return _s_sys_mem_available_str; } + static inline int64_t sys_mem_available_low_water_mark() { + return _s_sys_mem_available_low_water_mark; + } + static inline size_t current_mem() { return _s_allocator_physical_mem; } static inline size_t allocator_virtual_mem() { return _s_virtual_memory_used; } static inline size_t allocator_cache_mem() { return _s_allocator_cache_mem; } @@ -55,6 +63,7 @@ public: // Tcmalloc property `generic.total_physical_bytes` records the total length of the virtual memory // obtained by the process malloc, not the physical memory actually used by the process in the OS. static inline void refresh_allocator_mem() { +#if !defined(USE_JEMALLOC) MallocExtension::instance()->GetNumericProperty("generic.total_physical_bytes", &_s_allocator_physical_mem); MallocExtension::instance()->GetNumericProperty("tcmalloc.pageheap_unmapped_bytes", @@ -72,6 +81,10 @@ public: _s_allocator_cache_mem_str = PrettyPrinter::print(static_cast<uint64_t>(_s_allocator_cache_mem), TUnit::BYTES); _s_virtual_memory_used = _s_allocator_physical_mem + _s_pageheap_unmapped_bytes; +#endif + } + + static inline void refresh_proc_mem_no_allocator_cache() { _s_proc_mem_no_allocator_cache = PerfCounters::get_vm_rss() - static_cast<int64_t>(_s_allocator_cache_mem); } @@ -84,7 +97,6 @@ public: DCHECK(_s_initialized); return _s_mem_limit_str; } - static inline int64_t hard_mem_limit() { return _s_hard_mem_limit; } static std::string debug_string(); @@ -93,7 +105,6 @@ private: static int64_t _s_physical_mem; static int64_t _s_mem_limit; static std::string _s_mem_limit_str; - static int64_t _s_hard_mem_limit; static size_t _s_allocator_physical_mem; static size_t _s_pageheap_unmapped_bytes; static size_t _s_tcmalloc_pageheap_free_bytes; @@ -104,6 +115,10 @@ private: static std::string _s_allocator_cache_mem_str; static size_t _s_virtual_memory_used; static int64_t _s_proc_mem_no_allocator_cache; + + static int64_t _s_sys_mem_available; + static std::string _s_sys_mem_available_str; + static int64_t _s_sys_mem_available_low_water_mark; }; } // namespace doris diff --git a/be/src/vec/sink/vresult_sink.cpp b/be/src/vec/sink/vresult_sink.cpp index 9fe8cf8b66..77e64a8959 100644 --- a/be/src/vec/sink/vresult_sink.cpp +++ b/be/src/vec/sink/vresult_sink.cpp @@ -115,6 +115,7 @@ Status VResultSink::close(RuntimeState* state, Status exec_status) { // close sender, this is normal path end if (_sender) { if (_writer) _sender->update_num_written_rows(_writer->get_written_rows()); + _sender->update_max_peak_memory_bytes(); _sender->close(final_status); } state->exec_env()->result_mgr()->cancel_at_time( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org