This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new bd5a593403 [enhancement](memtracker) Use proc/meminfo MemAvailable to
control memory and optimize MemTracker log printing (#14335)
bd5a593403 is described below
commit bd5a593403e7af98c3a4a2541549d585167ba58f
Author: Xinyi Zou <[email protected]>
AuthorDate: Thu Nov 17 22:46:07 2022 +0800
[enhancement](memtracker) Use proc/meminfo MemAvailable to control memory
and optimize MemTracker log printing (#14335)
---
be/src/runtime/buffer_control_block.h | 2 +
be/src/runtime/memory/mem_tracker_limiter.cpp | 15 ++++-
be/src/runtime/memory/mem_tracker_limiter.h | 27 +++++----
be/src/service/doris_main.cpp | 5 +-
be/src/util/mem_info.cpp | 86 +++++++++++++++++++--------
be/src/util/mem_info.h | 19 +++++-
be/src/vec/sink/vresult_sink.cpp | 1 +
7 files changed, 111 insertions(+), 44 deletions(-)
diff --git a/be/src/runtime/buffer_control_block.h
b/be/src/runtime/buffer_control_block.h
index c5c898259a..8528f74164 100644
--- a/be/src/runtime/buffer_control_block.h
+++ b/be/src/runtime/buffer_control_block.h
@@ -91,6 +91,8 @@ public:
}
}
+ // TODO: The value of query peak mem usage in fe.audit.log comes from a
random BE,
+ // not the BE with the largest peak mem usage
void update_max_peak_memory_bytes() {
if (_query_statistics.get() != nullptr) {
int64_t max_peak_memory_bytes =
_query_statistics->calculate_max_peak_memory_bytes();
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 2453dca631..9d40acbdd5 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -167,6 +167,12 @@ std::string
MemTrackerLimiter::log_usage(MemTracker::Snapshot snapshot) {
print_bytes(snapshot.peak_consumption), snapshot.peak_consumption);
}
+std::string MemTrackerLimiter::type_log_usage(MemTracker::Snapshot snapshot) {
+ return fmt::format("Type={}, Used={}({} B), Peak={}({} B)", snapshot.type,
+ print_bytes(snapshot.cur_consumption),
snapshot.cur_consumption,
+ print_bytes(snapshot.peak_consumption),
snapshot.peak_consumption);
+}
+
void MemTrackerLimiter::print_log_usage(const std::string& msg) {
if (_enable_print_log_usage) {
_enable_print_log_usage = false;
@@ -190,13 +196,16 @@ void MemTrackerLimiter::print_log_process_usage(const
std::string& msg, bool wit
if (MemTrackerLimiter::_enable_print_log_process_usage) {
MemTrackerLimiter::_enable_print_log_process_usage = false;
std::string detail = msg;
- detail += "\n " + MemTrackerLimiter::process_mem_log_str();
- if (with_stacktrace) detail += "\n" + get_stack_trace();
+ detail += "\nProcess Memory Summary:\n " +
MemTrackerLimiter::process_mem_log_str();
+ if (with_stacktrace) detail += "\nAlloc Stacktrace:\n" +
get_stack_trace();
std::vector<MemTracker::Snapshot> snapshots;
MemTrackerLimiter::make_process_snapshots(&snapshots);
MemTrackerLimiter::make_type_snapshots(&snapshots,
MemTrackerLimiter::Type::GLOBAL);
+ detail += "\nMemory Tracker Summary:";
for (const auto& snapshot : snapshots) {
- if (snapshot.parent_label == "") {
+ if (snapshot.label == "" && snapshot.parent_label == "") {
+ detail += "\n " +
MemTrackerLimiter::type_log_usage(snapshot);
+ } else if (snapshot.parent_label == "") {
detail += "\n " + MemTrackerLimiter::log_usage(snapshot);
} else {
detail += "\n " + MemTracker::log_usage(snapshot);
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h
b/be/src/runtime/memory/mem_tracker_limiter.h
index c1c631e3ce..3c8876f408 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -87,11 +87,10 @@ public:
// tcmalloc/jemalloc allocator cache does not participate in the mem
check as part of the process physical memory.
// because `new/malloc` will trigger mem hook when using
tcmalloc/jemalloc allocator cache,
// but it may not actually alloc physical memory, which is not
expected in mem hook fail.
- //
- // TODO: In order to ensure no OOM, currently reserve 200M, and then
use the free mem in /proc/meminfo to ensure no OOM.
if (MemInfo::proc_mem_no_allocator_cache() + bytes >=
MemInfo::mem_limit() ||
- PerfCounters::get_vm_rss() + bytes >= MemInfo::hard_mem_limit()) {
- print_log_process_usage("sys mem exceed limit check faild");
+ MemInfo::sys_mem_available() <
MemInfo::sys_mem_available_low_water_mark()) {
+ print_log_process_usage(
+ fmt::format("System Mem Exceed Limit Check Faild, Try
Alloc: {}", bytes));
return true;
}
return false;
@@ -128,6 +127,7 @@ public:
static std::string log_usage(MemTracker::Snapshot snapshot);
std::string log_usage() { return log_usage(make_snapshot()); }
+ static std::string type_log_usage(MemTracker::Snapshot snapshot);
void print_log_usage(const std::string& msg);
void enable_print_log_usage() { _enable_print_log_usage = true; }
static void enable_print_log_process_usage() {
_enable_print_log_process_usage = true; }
@@ -173,18 +173,23 @@ private:
static std::string process_limit_exceeded_errmsg_str(int64_t bytes) {
return fmt::format(
- "process memory used {}, tc/jemalloc allocator cache {},
exceed limit {}, failed "
- "alloc size {}",
- PerfCounters::get_vm_rss_str(),
MemInfo::allocator_cache_mem_str(),
- MemInfo::mem_limit_str(), print_bytes(bytes));
+ "process memory used {} exceed limit {} or sys mem available
{} less than min "
+ "reserve {}, failed "
+ "alloc size {}, tc/jemalloc allocator cache {}",
+ PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(),
+ MemInfo::sys_mem_available_str(),
+
PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES),
+ print_bytes(bytes), MemInfo::allocator_cache_mem_str());
}
static std::string process_mem_log_str() {
return fmt::format(
- "process memory used {}, limit {}, hard limit {}, tc/jemalloc "
+ "process memory used {} limit {}, sys mem available {} min
reserve {}, tc/jemalloc "
"allocator cache {}",
PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(),
- print_bytes(MemInfo::hard_mem_limit()),
MemInfo::allocator_cache_mem_str());
+ MemInfo::sys_mem_available_str(),
+
PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES),
+ MemInfo::allocator_cache_mem_str());
}
private:
@@ -246,10 +251,10 @@ inline bool MemTrackerLimiter::try_consume(int64_t bytes,
std::string& failed_ms
}
inline Status MemTrackerLimiter::check_limit(int64_t bytes) {
- if (bytes <= 0) return Status::OK();
if (sys_mem_exceed_limit_check(bytes)) {
return
Status::MemoryLimitExceeded(process_limit_exceeded_errmsg_str(bytes));
}
+ if (bytes <= 0) return Status::OK();
if (_limit > 0 && _consumption->current_value() + bytes > _limit) {
return
Status::MemoryLimitExceeded(tracker_limit_exceeded_errmsg_str(bytes, this));
}
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index b193f0d8a4..0570e3c319 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -499,12 +499,13 @@ int main(int argc, char** argv) {
__lsan_do_leak_check();
#endif
doris::PerfCounters::refresh_proc_status();
+ doris::MemInfo::refresh_proc_meminfo();
doris::MemTrackerLimiter::refresh_global_counter();
doris::ExecEnv::GetInstance()->load_channel_mgr()->refresh_mem_tracker();
-#if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) &&
!defined(THREAD_SANITIZER) && \
- !defined(USE_JEMALLOC)
+#if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) &&
!defined(THREAD_SANITIZER)
doris::MemInfo::refresh_allocator_mem();
#endif
+ doris::MemInfo::refresh_proc_mem_no_allocator_cache();
if (doris::config::memory_debug) {
doris::MemTrackerLimiter::print_log_process_usage("memory_debug",
false);
}
diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp
index 9070b09527..aeaf73cb02 100644
--- a/be/src/util/mem_info.cpp
+++ b/be/src/util/mem_info.cpp
@@ -45,7 +45,6 @@ bool MemInfo::_s_initialized = false;
int64_t MemInfo::_s_physical_mem = -1;
int64_t MemInfo::_s_mem_limit = -1;
std::string MemInfo::_s_mem_limit_str = "";
-int64_t MemInfo::_s_hard_mem_limit = -1;
size_t MemInfo::_s_allocator_physical_mem = 0;
size_t MemInfo::_s_pageheap_unmapped_bytes = 0;
size_t MemInfo::_s_tcmalloc_pageheap_free_bytes = 0;
@@ -57,36 +56,43 @@ std::string MemInfo::_s_allocator_cache_mem_str = "";
size_t MemInfo::_s_virtual_memory_used = 0;
int64_t MemInfo::_s_proc_mem_no_allocator_cache = -1;
+static std::unordered_map<std::string, int64_t> _mem_info_bytes;
+int64_t MemInfo::_s_sys_mem_available = 0;
+std::string MemInfo::_s_sys_mem_available_str = "";
+int64_t MemInfo::_s_sys_mem_available_low_water_mark = 0;
+
#ifndef __APPLE__
-void MemInfo::init() {
- // Read from /proc/meminfo
+void MemInfo::refresh_proc_meminfo() {
std::ifstream meminfo("/proc/meminfo", std::ios::in);
std::string line;
while (meminfo.good() && !meminfo.eof()) {
getline(meminfo, line);
std::vector<std::string> fields = strings::Split(line, " ",
strings::SkipWhitespace());
-
- // We expect lines such as, e.g., 'MemTotal: 16129508 kB'
- if (fields.size() < 3) {
- continue;
- }
-
- if (fields[0].compare("MemTotal:") != 0) {
- continue;
- }
+ if (fields.size() < 2) continue;
+ std::string key = fields[0].substr(0, fields[0].size() - 1);
StringParser::ParseResult result;
- int64_t mem_total_kb =
+ int64_t mem_value =
StringParser::string_to_int<int64_t>(fields[1].data(),
fields[1].size(), &result);
if (result == StringParser::PARSE_SUCCESS) {
- // Entries in /proc/meminfo are in KB.
- _s_physical_mem = mem_total_kb * 1024L;
+ if (fields.size() == 2) {
+ _mem_info_bytes[key] = mem_value;
+ } else if (fields[2].compare("kB") == 0) {
+ _mem_info_bytes[key] = mem_value * 1024L;
+ }
}
-
- break;
}
+ if (meminfo.is_open()) meminfo.close();
+
+ _s_sys_mem_available = _mem_info_bytes["MemAvailable"];
+ _s_sys_mem_available_str = PrettyPrinter::print(_s_sys_mem_available,
TUnit::BYTES);
+}
+
+void MemInfo::init() {
+ refresh_proc_meminfo();
+ _s_physical_mem = _mem_info_bytes["MemTotal"];
int64_t cgroup_mem_limit = 0;
Status status = CGroupUtil::find_cgroup_mem_limit(&cgroup_mem_limit);
@@ -94,10 +100,6 @@ void MemInfo::init() {
_s_physical_mem = std::min(_s_physical_mem, cgroup_mem_limit);
}
- if (meminfo.is_open()) {
- meminfo.close();
- }
-
if (_s_physical_mem == -1) {
LOG(WARNING) << "Could not determine amount of physical memory on this
machine.";
}
@@ -115,15 +117,49 @@ void MemInfo::init() {
_s_mem_limit = _s_physical_mem;
}
_s_mem_limit_str = PrettyPrinter::print(_s_mem_limit, TUnit::BYTES);
- _s_hard_mem_limit =
- _s_physical_mem - std::max<int64_t>(209715200L, _s_physical_mem /
10); // 200M
+
+ std::string line;
+ int64_t _s_vm_min_free_kbytes = 0;
+ std::ifstream vminfo("/proc/sys/vm/min_free_kbytes", std::ios::in);
+ if (vminfo.good() && !vminfo.eof()) {
+ getline(vminfo, line);
+ boost::algorithm::trim(line);
+ StringParser::ParseResult result;
+ int64_t mem_value = StringParser::string_to_int<int64_t>(line.data(),
line.size(), &result);
+
+ if (result == StringParser::PARSE_SUCCESS) {
+ _s_vm_min_free_kbytes = mem_value * 1024L;
+ }
+ }
+ if (vminfo.is_open()) vminfo.close();
+
+ // MemAvailable = MemFree - LowWaterMark + (PageCache - min(PageCache / 2,
LowWaterMark))
+ // LowWaterMark = /proc/sys/vm/min_free_kbytes
+ // Ref:
+ //
https://serverfault.com/questions/940196/why-is-memavailable-a-lot-less-than-memfreebufferscached
+ //
https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/commit/?id=34e431b0ae398fc54ea69ff85ec700722c9da773
+ //
+ // available_low_water_mark = p1 - p2
+ // p1: max 3.2G, avoid wasting too much memory on machines with large
memory larger than 32G.
+ // p2: vm/min_free_kbytes is usually 0.4% - 5% of the total memory, some
cloud machines vm/min_free_kbytes is 5%,
+ // in order to avoid wasting too much memory, available_low_water_mark
minus 1% at most.
+ int64_t p1 = std::min<int64_t>(
+ std::min<int64_t>(_s_physical_mem - _s_mem_limit, _s_physical_mem
* 0.1), 3435973836L);
+ int64_t p2 = std::max<int64_t>(_s_vm_min_free_kbytes - _s_physical_mem *
0.01, 0);
+ _s_sys_mem_available_low_water_mark = std::max<int64_t>(p1 - p2, 0);
LOG(INFO) << "Physical Memory: " << PrettyPrinter::print(_s_physical_mem,
TUnit::BYTES)
<< ", Mem Limit: " << _s_mem_limit_str
- << ", origin config value: " << config::mem_limit;
+ << ", origin config value: " << config::mem_limit
+ << ", System Mem Available Min Reserve: "
+ << PrettyPrinter::print(_s_sys_mem_available_low_water_mark,
TUnit::BYTES)
+ << ", Vm Min Free KBytes: "
+ << PrettyPrinter::print(_s_vm_min_free_kbytes, TUnit::BYTES);
_s_initialized = true;
}
#else
+void MemInfo::refresh_proc_meminfo() {}
+
void MemInfo::init() {
size_t size = sizeof(_s_physical_mem);
if (sysctlbyname("hw.memsize", &_s_physical_mem, &size, nullptr, 0) != 0) {
@@ -134,8 +170,6 @@ void MemInfo::init() {
bool is_percent = true;
_s_mem_limit = ParseUtil::parse_mem_spec(config::mem_limit, -1,
_s_physical_mem, &is_percent);
_s_mem_limit_str = PrettyPrinter::print(_s_mem_limit, TUnit::BYTES);
- _s_hard_mem_limit =
- _s_physical_mem - std::max<int64_t>(209715200L, _s_physical_mem /
10); // 200M
LOG(INFO) << "Physical Memory: " << PrettyPrinter::print(_s_physical_mem,
TUnit::BYTES);
_s_initialized = true;
diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h
index d730c4ae1b..a4f10394f7 100644
--- a/be/src/util/mem_info.h
+++ b/be/src/util/mem_info.h
@@ -46,6 +46,14 @@ public:
return _s_physical_mem;
}
+ static void refresh_proc_meminfo();
+
+ static inline int64_t sys_mem_available() { return _s_sys_mem_available; }
+ static inline std::string sys_mem_available_str() { return
_s_sys_mem_available_str; }
+ static inline int64_t sys_mem_available_low_water_mark() {
+ return _s_sys_mem_available_low_water_mark;
+ }
+
static inline size_t current_mem() { return _s_allocator_physical_mem; }
static inline size_t allocator_virtual_mem() { return
_s_virtual_memory_used; }
static inline size_t allocator_cache_mem() { return
_s_allocator_cache_mem; }
@@ -55,6 +63,7 @@ public:
// Tcmalloc property `generic.total_physical_bytes` records the total
length of the virtual memory
// obtained by the process malloc, not the physical memory actually used
by the process in the OS.
static inline void refresh_allocator_mem() {
+#if !defined(USE_JEMALLOC)
MallocExtension::instance()->GetNumericProperty("generic.total_physical_bytes",
&_s_allocator_physical_mem);
MallocExtension::instance()->GetNumericProperty("tcmalloc.pageheap_unmapped_bytes",
@@ -72,6 +81,10 @@ public:
_s_allocator_cache_mem_str =
PrettyPrinter::print(static_cast<uint64_t>(_s_allocator_cache_mem),
TUnit::BYTES);
_s_virtual_memory_used = _s_allocator_physical_mem +
_s_pageheap_unmapped_bytes;
+#endif
+ }
+
+ static inline void refresh_proc_mem_no_allocator_cache() {
_s_proc_mem_no_allocator_cache =
PerfCounters::get_vm_rss() -
static_cast<int64_t>(_s_allocator_cache_mem);
}
@@ -84,7 +97,6 @@ public:
DCHECK(_s_initialized);
return _s_mem_limit_str;
}
- static inline int64_t hard_mem_limit() { return _s_hard_mem_limit; }
static std::string debug_string();
@@ -93,7 +105,6 @@ private:
static int64_t _s_physical_mem;
static int64_t _s_mem_limit;
static std::string _s_mem_limit_str;
- static int64_t _s_hard_mem_limit;
static size_t _s_allocator_physical_mem;
static size_t _s_pageheap_unmapped_bytes;
static size_t _s_tcmalloc_pageheap_free_bytes;
@@ -104,6 +115,10 @@ private:
static std::string _s_allocator_cache_mem_str;
static size_t _s_virtual_memory_used;
static int64_t _s_proc_mem_no_allocator_cache;
+
+ static int64_t _s_sys_mem_available;
+ static std::string _s_sys_mem_available_str;
+ static int64_t _s_sys_mem_available_low_water_mark;
};
} // namespace doris
diff --git a/be/src/vec/sink/vresult_sink.cpp b/be/src/vec/sink/vresult_sink.cpp
index 9fe8cf8b66..77e64a8959 100644
--- a/be/src/vec/sink/vresult_sink.cpp
+++ b/be/src/vec/sink/vresult_sink.cpp
@@ -115,6 +115,7 @@ Status VResultSink::close(RuntimeState* state, Status
exec_status) {
// close sender, this is normal path end
if (_sender) {
if (_writer)
_sender->update_num_written_rows(_writer->get_written_rows());
+ _sender->update_max_peak_memory_bytes();
_sender->close(final_status);
}
state->exec_env()->result_mgr()->cancel_at_time(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]