This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 70304bffd20 [refactor](wg) move memory gc logic to workload group (#31334) 70304bffd20 is described below commit 70304bffd20ba53e54bfffe6adb0b2107a373270 Author: yiguolei <676222...@qq.com> AuthorDate: Fri Feb 23 22:18:09 2024 +0800 [refactor](wg) move memory gc logic to workload group (#31334) --------- Co-authored-by: yiguolei <yiguo...@gmail.com> --- be/src/runtime/memory/mem_tracker_limiter.cpp | 100 +++++--------------------- be/src/runtime/memory/mem_tracker_limiter.h | 22 +++--- be/src/runtime/task_group/task_group.cpp | 88 ++++++++++++++++++++--- be/src/runtime/task_group/task_group.h | 9 +-- be/src/util/mem_info.cpp | 17 ++--- 5 files changed, 118 insertions(+), 118 deletions(-) diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index f9980806f2c..680c2917cb8 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -395,6 +395,14 @@ int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem, profile, GCType::PROCESS); } +int64_t MemTrackerLimiter::tg_free_top_memory_query( + int64_t min_free_mem, Type type, + std::vector<taskgroup::TgTrackerLimiterGroup>& tracker_groups, + const std::function<std::string(int64_t, const std::string&)>& cancel_msg, + RuntimeProfile* profile, GCType gctype) { + return free_top_memory_query(min_free_mem, type, tracker_groups, cancel_msg, profile, gctype); +} + template <typename TrackerGroups> int64_t MemTrackerLimiter::free_top_memory_query( int64_t min_free_mem, Type type, std::vector<TrackerGroups>& tracker_groups, @@ -522,6 +530,15 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem, profile, GCType::PROCESS); } +int64_t MemTrackerLimiter::tg_free_top_overcommit_query( + int64_t min_free_mem, Type type, + std::vector<taskgroup::TgTrackerLimiterGroup>& tracker_groups, + const std::function<std::string(int64_t, const std::string&)>& cancel_msg, + RuntimeProfile* profile, GCType gctype) { + return free_top_overcommit_query(min_free_mem, type, tracker_groups, cancel_msg, profile, + gctype); +} + template <typename TrackerGroups> int64_t MemTrackerLimiter::free_top_overcommit_query( int64_t min_free_mem, Type type, std::vector<TrackerGroups>& tracker_groups, @@ -632,87 +649,4 @@ int64_t MemTrackerLimiter::free_top_overcommit_query( return freed_memory_counter->value(); } -int64_t MemTrackerLimiter::tg_memory_limit_gc( - int64_t need_free_mem, int64_t used_memory, uint64_t id, const std::string& name, - int64_t memory_limit, std::vector<taskgroup::TgTrackerLimiterGroup>& tracker_limiter_groups, - RuntimeProfile* profile) { - if (need_free_mem <= 0) { - return 0; - } - - int64_t freed_mem = 0; - - std::string cancel_str = fmt::format( - "work load group memory exceeded limit, group id:{}, name:{}, used:{}, limit:{}, " - "backend:{}.", - id, name, MemTracker::print_bytes(used_memory), MemTracker::print_bytes(memory_limit), - BackendOptions::get_localhost()); - auto cancel_top_overcommit_str = [cancel_str](int64_t mem_consumption, - const std::string& label) { - return fmt::format( - "{} cancel top memory overcommit tracker <{}> consumption {}. execute again after " - "enough memory, details see be.INFO.", - cancel_str, label, MemTracker::print_bytes(mem_consumption)); - }; - auto cancel_top_usage_str = [cancel_str](int64_t mem_consumption, const std::string& label) { - return fmt::format( - "{} cancel top memory used tracker <{}> consumption {}. execute again after " - "enough memory, details see be.INFO.", - cancel_str, label, MemTracker::print_bytes(mem_consumption)); - }; - - LOG(INFO) << fmt::format( - "[MemoryGC] work load group start gc, id:{} name:{}, memory limit: {}, used: {}, " - "need_free_mem: {}.", - id, name, memory_limit, used_memory, need_free_mem); - Defer defer {[&]() { - LOG(INFO) << fmt::format( - "[MemoryGC] work load group finished gc, id:{} name:{}, memory limit: {}, used: " - "{}, need_free_mem: {}, freed memory: {}.", - id, name, memory_limit, used_memory, need_free_mem, freed_mem); - }}; - - // 1. free top overcommit query - if (config::enable_query_memory_overcommit) { - RuntimeProfile* tmq_profile = profile->create_child( - fmt::format("FreeGroupTopOvercommitQuery:Name {}", name), true, true); - freed_mem += MemTrackerLimiter::free_top_overcommit_query( - need_free_mem - freed_mem, MemTrackerLimiter::Type::QUERY, tracker_limiter_groups, - cancel_top_overcommit_str, tmq_profile, GCType::WORK_LOAD_GROUP); - } - if (freed_mem >= need_free_mem) { - return freed_mem; - } - - // 2. free top usage query - RuntimeProfile* tmq_profile = - profile->create_child(fmt::format("FreeGroupTopUsageQuery:Name {}", name), true, true); - freed_mem += MemTrackerLimiter::free_top_memory_query( - need_free_mem - freed_mem, MemTrackerLimiter::Type::QUERY, tracker_limiter_groups, - cancel_top_usage_str, tmq_profile, GCType::WORK_LOAD_GROUP); - if (freed_mem >= need_free_mem) { - return freed_mem; - } - - // 3. free top overcommit load - if (config::enable_query_memory_overcommit) { - tmq_profile = profile->create_child(fmt::format("FreeGroupTopOvercommitLoad:Name {}", name), - true, true); - freed_mem += MemTrackerLimiter::free_top_overcommit_query( - need_free_mem - freed_mem, MemTrackerLimiter::Type::LOAD, tracker_limiter_groups, - cancel_top_overcommit_str, tmq_profile, GCType::WORK_LOAD_GROUP); - if (freed_mem >= need_free_mem) { - return freed_mem; - } - } - - // 4. free top usage load - tmq_profile = - profile->create_child(fmt::format("FreeGroupTopUsageLoad:Name {}", name), true, true); - freed_mem += MemTrackerLimiter::free_top_memory_query( - need_free_mem - freed_mem, MemTrackerLimiter::Type::LOAD, tracker_limiter_groups, - cancel_top_usage_str, tmq_profile, GCType::WORK_LOAD_GROUP); - return freed_mem; -} - } // namespace doris diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index df20448c010..9703981af63 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -192,7 +192,13 @@ public: static int64_t free_top_memory_query( int64_t min_free_mem, Type type, std::vector<TrackerGroups>& tracker_groups, const std::function<std::string(int64_t, const std::string&)>& cancel_msg, - RuntimeProfile* profile, GCType GCtype); + RuntimeProfile* profile, GCType gctype); + + static int64_t tg_free_top_memory_query( + int64_t min_free_mem, Type type, + std::vector<taskgroup::TgTrackerLimiterGroup>& tracker_groups, + const std::function<std::string(int64_t, const std::string&)>& cancel_msg, + RuntimeProfile* profile, GCType gctype); static int64_t free_top_memory_load(int64_t min_free_mem, const std::string& vm_rss_str, const std::string& mem_available_str, @@ -210,7 +216,13 @@ public: static int64_t free_top_overcommit_query( int64_t min_free_mem, Type type, std::vector<TrackerGroups>& tracker_groups, const std::function<std::string(int64_t, const std::string&)>& cancel_msg, - RuntimeProfile* profile, GCType GCtype); + RuntimeProfile* profile, GCType gctype); + + static int64_t tg_free_top_overcommit_query( + int64_t min_free_mem, Type type, + std::vector<taskgroup::TgTrackerLimiterGroup>& tracker_groups, + const std::function<std::string(int64_t, const std::string&)>& cancel_msg, + RuntimeProfile* profile, GCType gctype); static int64_t free_top_overcommit_load(int64_t min_free_mem, const std::string& vm_rss_str, const std::string& mem_available_str, @@ -219,12 +231,6 @@ public: Type::LOAD); } - static int64_t tg_memory_limit_gc( - int64_t request_free_memory, int64_t used_memory, uint64_t id, const std::string& name, - int64_t memory_limit, - std::vector<taskgroup::TgTrackerLimiterGroup>& tracker_limiter_groups, - RuntimeProfile* profile); - // only for Type::QUERY or Type::LOAD. static TUniqueId label_to_queryid(const std::string& label) { if (label.rfind("Query#Id=", 0) != 0 && label.rfind("Load#Id=", 0) != 0) { diff --git a/be/src/runtime/task_group/task_group.cpp b/be/src/runtime/task_group/task_group.cpp index ee1be702768..ddddf39dbc8 100644 --- a/be/src/runtime/task_group/task_group.cpp +++ b/be/src/runtime/task_group/task_group.cpp @@ -33,6 +33,7 @@ #include "runtime/memory/mem_tracker_limiter.h" #include "util/mem_info.h" #include "util/parse_util.h" +#include "util/runtime_profile.h" #include "vec/exec/scan/scanner_scheduler.h" namespace doris { @@ -113,14 +114,85 @@ void TaskGroup::remove_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> me _mem_tracker_limiter_pool[group_num].trackers.erase(mem_tracker_ptr); } -void TaskGroup::task_group_info(TaskGroupInfo* tg_info) const { - std::shared_lock<std::shared_mutex> r_lock(_mutex); - tg_info->id = _id; - tg_info->name = _name; - tg_info->cpu_share = _cpu_share; - tg_info->memory_limit = _memory_limit; - tg_info->enable_memory_overcommit = _enable_memory_overcommit; - tg_info->version = _version; +int64_t TaskGroup::gc_memory(int64_t need_free_mem, RuntimeProfile* profile) { + if (need_free_mem <= 0) { + return 0; + } + int64_t used_memory = memory_used(); + int64_t freed_mem = 0; + + std::string cancel_str = fmt::format( + "work load group memory exceeded limit, group id:{}, name:{}, used:{}, limit:{}, " + "backend:{}.", + _id, _name, MemTracker::print_bytes(used_memory), + MemTracker::print_bytes(_memory_limit), BackendOptions::get_localhost()); + auto cancel_top_overcommit_str = [cancel_str](int64_t mem_consumption, + const std::string& label) { + return fmt::format( + "{} cancel top memory overcommit tracker <{}> consumption {}. execute again after " + "enough memory, details see be.INFO.", + cancel_str, label, MemTracker::print_bytes(mem_consumption)); + }; + auto cancel_top_usage_str = [cancel_str](int64_t mem_consumption, const std::string& label) { + return fmt::format( + "{} cancel top memory used tracker <{}> consumption {}. execute again after " + "enough memory, details see be.INFO.", + cancel_str, label, MemTracker::print_bytes(mem_consumption)); + }; + + LOG(INFO) << fmt::format( + "[MemoryGC] work load group start gc, id:{} name:{}, memory limit: {}, used: {}, " + "need_free_mem: {}.", + _id, _name, _memory_limit, used_memory, need_free_mem); + Defer defer {[&]() { + LOG(INFO) << fmt::format( + "[MemoryGC] work load group finished gc, id:{} name:{}, memory limit: {}, used: " + "{}, need_free_mem: {}, freed memory: {}.", + _id, _name, _memory_limit, used_memory, need_free_mem, freed_mem); + }}; + + // 1. free top overcommit query + if (config::enable_query_memory_overcommit) { + RuntimeProfile* tmq_profile = profile->create_child( + fmt::format("FreeGroupTopOvercommitQuery:Name {}", _name), true, true); + freed_mem += MemTrackerLimiter::tg_free_top_overcommit_query( + need_free_mem - freed_mem, MemTrackerLimiter::Type::QUERY, + _mem_tracker_limiter_pool, cancel_top_overcommit_str, tmq_profile, + MemTrackerLimiter::GCType::WORK_LOAD_GROUP); + } + if (freed_mem >= need_free_mem) { + return freed_mem; + } + + // 2. free top usage query + RuntimeProfile* tmq_profile = + profile->create_child(fmt::format("FreeGroupTopUsageQuery:Name {}", _name), true, true); + freed_mem += MemTrackerLimiter::tg_free_top_memory_query( + need_free_mem - freed_mem, MemTrackerLimiter::Type::QUERY, _mem_tracker_limiter_pool, + cancel_top_usage_str, tmq_profile, MemTrackerLimiter::GCType::WORK_LOAD_GROUP); + if (freed_mem >= need_free_mem) { + return freed_mem; + } + + // 3. free top overcommit load + if (config::enable_query_memory_overcommit) { + tmq_profile = profile->create_child( + fmt::format("FreeGroupTopOvercommitLoad:Name {}", _name), true, true); + freed_mem += MemTrackerLimiter::tg_free_top_overcommit_query( + need_free_mem - freed_mem, MemTrackerLimiter::Type::LOAD, _mem_tracker_limiter_pool, + cancel_top_overcommit_str, tmq_profile, MemTrackerLimiter::GCType::WORK_LOAD_GROUP); + if (freed_mem >= need_free_mem) { + return freed_mem; + } + } + + // 4. free top usage load + tmq_profile = + profile->create_child(fmt::format("FreeGroupTopUsageLoad:Name {}", _name), true, true); + freed_mem += MemTrackerLimiter::tg_free_top_memory_query( + need_free_mem - freed_mem, MemTrackerLimiter::Type::LOAD, _mem_tracker_limiter_pool, + cancel_top_usage_str, tmq_profile, MemTrackerLimiter::GCType::WORK_LOAD_GROUP); + return freed_mem; } Status TaskGroupInfo::parse_topic_info(const TWorkloadGroupInfo& workload_group_info, diff --git a/be/src/runtime/task_group/task_group.h b/be/src/runtime/task_group/task_group.h index 3767731435a..7604ee45121 100644 --- a/be/src/runtime/task_group/task_group.h +++ b/be/src/runtime/task_group/task_group.h @@ -35,6 +35,7 @@ namespace doris { class MemTrackerLimiter; +class RuntimeProfile; namespace pipeline { class PipelineTask; @@ -83,12 +84,6 @@ public: void remove_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> mem_tracker_ptr); - void task_group_info(TaskGroupInfo* tg_info) const; - - std::vector<TgTrackerLimiterGroup>& mem_tracker_limiter_pool() { - return _mem_tracker_limiter_pool; - } - // when mem_limit <=0 , it's an invalid value, then current group not participating in memory GC // because mem_limit is not a required property bool is_mem_limit_valid() { @@ -124,6 +119,8 @@ public: return _query_id_set.size(); } + int64_t gc_memory(int64_t need_free_mem, RuntimeProfile* profile); + private: mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, _memory_limit const uint64_t _id; diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp index 1eaa3eacaf5..3d9c4c4b062 100644 --- a/be/src/util/mem_info.cpp +++ b/be/src/util/mem_info.cpp @@ -256,9 +256,7 @@ int64_t MemInfo::tg_not_enable_overcommit_group_gc() { std::vector<taskgroup::TaskGroupPtr> task_groups_overcommit; for (const auto& task_group : task_groups) { - taskgroup::TaskGroupInfo tg_info; - task_group->task_group_info(&tg_info); - if (task_group->memory_used() > tg_info.memory_limit) { + if (task_group->memory_used() > task_group->memory_limit()) { task_groups_overcommit.push_back(task_group); } } @@ -286,12 +284,9 @@ int64_t MemInfo::tg_not_enable_overcommit_group_gc() { }}; for (const auto& task_group : task_groups_overcommit) { - taskgroup::TaskGroupInfo tg_info; - task_group->task_group_info(&tg_info); auto used = task_group->memory_used(); - total_free_memory += MemTrackerLimiter::tg_memory_limit_gc( - used - tg_info.memory_limit, used, tg_info.id, tg_info.name, tg_info.memory_limit, - task_group->mem_tracker_limiter_pool(), tg_profile.get()); + total_free_memory += + task_group->gc_memory(used - task_group->memory_limit(), tg_profile.get()); } return total_free_memory; } @@ -362,11 +357,7 @@ int64_t MemInfo::tg_enable_overcommit_group_gc(int64_t request_free_memory, : static_cast<double>(exceeded_memorys[i]) / total_exceeded_memory * request_free_memory /* exceeded memory as a weight */; auto task_group = task_groups[i]; - taskgroup::TaskGroupInfo tg_info; - task_group->task_group_info(&tg_info); - total_free_memory += MemTrackerLimiter::tg_memory_limit_gc( - tg_need_free_memory, used_memorys[i], tg_info.id, tg_info.name, - tg_info.memory_limit, task_group->mem_tracker_limiter_pool(), profile); + total_free_memory += task_group->gc_memory(tg_need_free_memory, profile); } return total_free_memory; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org