This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
commit a117c4d05097f90df2022cd5ae9d33e58aed3a97 Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Fri Dec 9 14:09:05 2022 +0800 [enhancement](memory) Support query memroy overcommit #14948 Add conf enable_query_memroy_overcommit If true, when the process does not exceed the soft mem limit, the query memory will not be limited; when the process memory exceeds the soft mem limit, the query with the largest ratio between the currently used memory and the exec_mem_limit will be canceled. If false, cancel query when the memory used exceeds exec_mem_limit, same as before. --- be/src/common/config.h | 6 ++ be/src/runtime/memory/mem_tracker_limiter.cpp | 80 ++++++++++++++++++++++----- be/src/runtime/memory/mem_tracker_limiter.h | 21 +++++-- be/src/util/mem_info.cpp | 6 +- 4 files changed, 93 insertions(+), 20 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 86b278b76b..3bc843e422 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -72,6 +72,12 @@ CONF_Int64(max_sys_mem_available_low_water_mark_bytes, "1717986918"); CONF_mString(process_minor_gc_size, "10%"); CONF_mString(process_full_gc_size, "20%"); +// If true, when the process does not exceed the soft mem limit, the query memory will not be limited; +// when the process memory exceeds the soft mem limit, the query with the largest ratio between the currently +// used memory and the exec_mem_limit will be canceled. +// If false, cancel query when the memory used exceeds exec_mem_limit, same as before. +CONF_mBool(enable_query_memroy_overcommit, "true"); + // The maximum time a thread waits for a full GC. Currently only query will wait for full gc. CONF_mInt32(thread_wait_gc_max_milliseconds, "1000"); diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index a228dcf5a7..a37a0be0f9 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -28,7 +28,6 @@ #include "runtime/thread_context.h" #include "util/pretty_printer.h" #include "util/stack_util.h" -#include "util/string_util.h" namespace doris { @@ -240,7 +239,7 @@ Status MemTrackerLimiter::fragment_mem_limit_exceeded(RuntimeState* state, const return Status::MemoryLimitExceeded(failed_msg); } -int64_t MemTrackerLimiter::free_top_query(int64_t min_free_mem) { +int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem) { std::priority_queue<std::pair<int64_t, std::string>, std::vector<std::pair<int64_t, std::string>>, std::greater<std::pair<int64_t, std::string>>> @@ -248,16 +247,8 @@ int64_t MemTrackerLimiter::free_top_query(int64_t min_free_mem) { // After greater than min_free_mem, will not be modified. int64_t prepare_free_mem = 0; - auto label_to_queryid = [&](const std::string& label) -> TUniqueId { - auto queryid = split(label, "#Id=")[1]; - TUniqueId querytid; - parse_id(queryid, &querytid); - return querytid; - }; - - auto cancel_top_query = [&](auto min_pq, auto label_to_queryid) -> int64_t { + auto cancel_top_query = [&](auto min_pq) -> int64_t { std::vector<std::string> usage_strings; - bool had_cancel = false; int64_t freed_mem = 0; while (!min_pq.empty()) { TUniqueId cancelled_queryid = label_to_queryid(min_pq.top().second); @@ -276,10 +267,9 @@ int64_t MemTrackerLimiter::free_top_query(int64_t min_free_mem) { freed_mem += min_pq.top().first; usage_strings.push_back(fmt::format("{} memory usage {} Bytes", min_pq.top().second, min_pq.top().first)); - had_cancel = true; min_pq.pop(); } - if (had_cancel) { + if (!usage_strings.empty()) { LOG(INFO) << "Process GC Free Top Memory Usage Query: " << join(usage_strings, ","); } return freed_mem; @@ -297,7 +287,7 @@ int64_t MemTrackerLimiter::free_top_query(int64_t min_free_mem) { std::swap(min_pq, min_pq_null); min_pq.push( pair<int64_t, std::string>(tracker->consumption(), tracker->label())); - return cancel_top_query(min_pq, label_to_queryid); + return cancel_top_query(min_pq); } else if (tracker->consumption() + prepare_free_mem < min_free_mem) { min_pq.push( pair<int64_t, std::string>(tracker->consumption(), tracker->label())); @@ -311,7 +301,67 @@ int64_t MemTrackerLimiter::free_top_query(int64_t min_free_mem) { } } } - return cancel_top_query(min_pq, label_to_queryid); + return cancel_top_query(min_pq); +} + +int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem) { + std::priority_queue<std::pair<int64_t, std::string>, + std::vector<std::pair<int64_t, std::string>>, + std::greater<std::pair<int64_t, std::string>>> + min_pq; + std::unordered_map<std::string, int64_t> query_consumption; + + for (unsigned i = 1; i < mem_tracker_limiter_pool.size(); ++i) { + std::lock_guard<std::mutex> l(mem_tracker_limiter_pool[i].group_lock); + for (auto tracker : mem_tracker_limiter_pool[i].trackers) { + if (tracker->type() == Type::QUERY) { + int64_t overcommit_ratio = + (static_cast<double>(tracker->consumption()) / tracker->limit()) * 10000; + if (overcommit_ratio == 0) { // Small query does not cancel + continue; + } + min_pq.push(pair<int64_t, std::string>(overcommit_ratio, tracker->label())); + query_consumption[tracker->label()] = tracker->consumption(); + } + } + } + + std::priority_queue<std::pair<int64_t, std::string>> max_pq; + // Min-heap to Max-heap. + while (!min_pq.empty()) { + max_pq.push(min_pq.top()); + min_pq.pop(); + } + + std::vector<std::string> usage_strings; + int64_t freed_mem = 0; + while (!max_pq.empty()) { + TUniqueId cancelled_queryid = label_to_queryid(max_pq.top().second); + int64_t query_mem = query_consumption[max_pq.top().second]; + ExecEnv::GetInstance()->fragment_mgr()->cancel_query( + cancelled_queryid, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, + fmt::format("Process has no memory available, cancel top memory usage query: " + "query memory tracker <{}> consumption {}, backend {} " + "process memory used {} exceed limit {} or sys mem available {} " + "less than low water mark {}. Execute again after enough memory, " + "details see be.INFO.", + max_pq.top().second, print_bytes(query_mem), + BackendOptions::get_localhost(), PerfCounters::get_vm_rss_str(), + MemInfo::mem_limit_str(), MemInfo::sys_mem_available_str(), + print_bytes(MemInfo::sys_mem_available_low_water_mark()))); + + usage_strings.push_back(fmt::format("{} memory usage {} Bytes, overcommit ratio: {}", + max_pq.top().second, query_mem, max_pq.top().first)); + freed_mem += query_mem; + if (freed_mem > min_free_mem) { + break; + } + max_pq.pop(); + } + if (!usage_strings.empty()) { + LOG(INFO) << "Process GC Free Top Memory Overcommit Query: " << join(usage_strings, ","); + } + return freed_mem; } } // namespace doris diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 6415510315..617a7ffdee 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -25,6 +25,7 @@ #include "service/backend_options.h" #include "util/mem_info.h" #include "util/perf_counters.h" +#include "util/string_util.h" namespace doris { @@ -144,8 +145,18 @@ public: Status fragment_mem_limit_exceeded(RuntimeState* state, const std::string& msg, int64_t failed_allocation_size = 0); - // Start canceling from the query with the largest memory usage until the memory of min_free_mem size is released. - static int64_t free_top_query(int64_t min_free_mem); + // Start canceling from the query with the largest memory usage until the memory of min_free_mem size is freed. + static int64_t free_top_memory_query(int64_t min_free_mem); + // Start canceling from the query with the largest memory overcommit ratio until the memory + // of min_free_mem size is freed. + static int64_t free_top_overcommit_query(int64_t min_free_mem); + // only for Type::QUERY or Type::LOAD. + static TUniqueId label_to_queryid(const std::string& label) { + auto queryid = split(label, "#Id=")[1]; + TUniqueId querytid; + parse_id(queryid, &querytid); + return querytid; + }; static std::string process_mem_log_str() { return fmt::format( @@ -254,7 +265,7 @@ inline bool MemTrackerLimiter::try_consume(int64_t bytes, std::string& failed_ms return false; } - if (_limit < 0) { + if (_limit < 0 || (_type == Type::QUERY && config::enable_query_memroy_overcommit)) { _consumption->add(bytes); // No limit at this tracker. } else { if (!_consumption->try_add(bytes, _limit)) { @@ -271,7 +282,9 @@ inline Status MemTrackerLimiter::check_limit(int64_t bytes) { if (sys_mem_exceed_limit_check(bytes)) { return Status::MemoryLimitExceeded(process_limit_exceeded_errmsg_str(bytes)); } - if (bytes <= 0) return Status::OK(); + if (bytes <= 0 || (_type == Type::QUERY && config::enable_query_memroy_overcommit)) { + return Status::OK(); + } if (_limit > 0 && _consumption->current_value() + bytes > _limit) { return Status::MemoryLimitExceeded(tracker_limit_exceeded_errmsg_str(bytes, this)); } diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp index 453c1cd2bd..55500feea7 100644 --- a/be/src/util/mem_info.cpp +++ b/be/src/util/mem_info.cpp @@ -104,6 +104,10 @@ void MemInfo::process_minor_gc() { freed_mem += StoragePageCache::instance()->get_page_cache_mem_consumption(segment_v2::DATA_PAGE); StoragePageCache::instance()->prune(segment_v2::DATA_PAGE); + if (config::enable_query_memroy_overcommit) { + freed_mem += + MemTrackerLimiter::free_top_overcommit_query(_s_process_full_gc_size - freed_mem); + } } void MemInfo::process_full_gc() { @@ -122,7 +126,7 @@ void MemInfo::process_full_gc() { if (freed_mem > _s_process_full_gc_size) { return; } - freed_mem += MemTrackerLimiter::free_top_query(_s_process_full_gc_size - freed_mem); + freed_mem += MemTrackerLimiter::free_top_memory_query(_s_process_full_gc_size - freed_mem); } #ifndef __APPLE__ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org