This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 70304bffd20 [refactor](wg) move memory gc logic to workload group 
(#31334)
70304bffd20 is described below

commit 70304bffd20ba53e54bfffe6adb0b2107a373270
Author: yiguolei <676222...@qq.com>
AuthorDate: Fri Feb 23 22:18:09 2024 +0800

    [refactor](wg) move memory gc logic to workload group (#31334)
    
    
    
    ---------
    
    Co-authored-by: yiguolei <yiguo...@gmail.com>
---
 be/src/runtime/memory/mem_tracker_limiter.cpp | 100 +++++---------------------
 be/src/runtime/memory/mem_tracker_limiter.h   |  22 +++---
 be/src/runtime/task_group/task_group.cpp      |  88 ++++++++++++++++++++---
 be/src/runtime/task_group/task_group.h        |   9 +--
 be/src/util/mem_info.cpp                      |  17 ++---
 5 files changed, 118 insertions(+), 118 deletions(-)

diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp 
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index f9980806f2c..680c2917cb8 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -395,6 +395,14 @@ int64_t MemTrackerLimiter::free_top_memory_query(int64_t 
min_free_mem,
             profile, GCType::PROCESS);
 }
 
+int64_t MemTrackerLimiter::tg_free_top_memory_query(
+        int64_t min_free_mem, Type type,
+        std::vector<taskgroup::TgTrackerLimiterGroup>& tracker_groups,
+        const std::function<std::string(int64_t, const std::string&)>& 
cancel_msg,
+        RuntimeProfile* profile, GCType gctype) {
+    return free_top_memory_query(min_free_mem, type, tracker_groups, 
cancel_msg, profile, gctype);
+}
+
 template <typename TrackerGroups>
 int64_t MemTrackerLimiter::free_top_memory_query(
         int64_t min_free_mem, Type type, std::vector<TrackerGroups>& 
tracker_groups,
@@ -522,6 +530,15 @@ int64_t 
MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem,
             profile, GCType::PROCESS);
 }
 
+int64_t MemTrackerLimiter::tg_free_top_overcommit_query(
+        int64_t min_free_mem, Type type,
+        std::vector<taskgroup::TgTrackerLimiterGroup>& tracker_groups,
+        const std::function<std::string(int64_t, const std::string&)>& 
cancel_msg,
+        RuntimeProfile* profile, GCType gctype) {
+    return free_top_overcommit_query(min_free_mem, type, tracker_groups, 
cancel_msg, profile,
+                                     gctype);
+}
+
 template <typename TrackerGroups>
 int64_t MemTrackerLimiter::free_top_overcommit_query(
         int64_t min_free_mem, Type type, std::vector<TrackerGroups>& 
tracker_groups,
@@ -632,87 +649,4 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(
     return freed_memory_counter->value();
 }
 
-int64_t MemTrackerLimiter::tg_memory_limit_gc(
-        int64_t need_free_mem, int64_t used_memory, uint64_t id, const 
std::string& name,
-        int64_t memory_limit, std::vector<taskgroup::TgTrackerLimiterGroup>& 
tracker_limiter_groups,
-        RuntimeProfile* profile) {
-    if (need_free_mem <= 0) {
-        return 0;
-    }
-
-    int64_t freed_mem = 0;
-
-    std::string cancel_str = fmt::format(
-            "work load group memory exceeded limit, group id:{}, name:{}, 
used:{}, limit:{}, "
-            "backend:{}.",
-            id, name, MemTracker::print_bytes(used_memory), 
MemTracker::print_bytes(memory_limit),
-            BackendOptions::get_localhost());
-    auto cancel_top_overcommit_str = [cancel_str](int64_t mem_consumption,
-                                                  const std::string& label) {
-        return fmt::format(
-                "{} cancel top memory overcommit tracker <{}> consumption {}. 
execute again after "
-                "enough memory, details see be.INFO.",
-                cancel_str, label, MemTracker::print_bytes(mem_consumption));
-    };
-    auto cancel_top_usage_str = [cancel_str](int64_t mem_consumption, const 
std::string& label) {
-        return fmt::format(
-                "{} cancel top memory used tracker <{}> consumption {}. 
execute again after "
-                "enough memory, details see be.INFO.",
-                cancel_str, label, MemTracker::print_bytes(mem_consumption));
-    };
-
-    LOG(INFO) << fmt::format(
-            "[MemoryGC] work load group start gc, id:{} name:{}, memory limit: 
{}, used: {}, "
-            "need_free_mem: {}.",
-            id, name, memory_limit, used_memory, need_free_mem);
-    Defer defer {[&]() {
-        LOG(INFO) << fmt::format(
-                "[MemoryGC] work load group finished gc, id:{} name:{}, memory 
limit: {}, used: "
-                "{}, need_free_mem: {}, freed memory: {}.",
-                id, name, memory_limit, used_memory, need_free_mem, freed_mem);
-    }};
-
-    // 1. free top overcommit query
-    if (config::enable_query_memory_overcommit) {
-        RuntimeProfile* tmq_profile = profile->create_child(
-                fmt::format("FreeGroupTopOvercommitQuery:Name {}", name), 
true, true);
-        freed_mem += MemTrackerLimiter::free_top_overcommit_query(
-                need_free_mem - freed_mem, MemTrackerLimiter::Type::QUERY, 
tracker_limiter_groups,
-                cancel_top_overcommit_str, tmq_profile, 
GCType::WORK_LOAD_GROUP);
-    }
-    if (freed_mem >= need_free_mem) {
-        return freed_mem;
-    }
-
-    // 2. free top usage query
-    RuntimeProfile* tmq_profile =
-            profile->create_child(fmt::format("FreeGroupTopUsageQuery:Name 
{}", name), true, true);
-    freed_mem += MemTrackerLimiter::free_top_memory_query(
-            need_free_mem - freed_mem, MemTrackerLimiter::Type::QUERY, 
tracker_limiter_groups,
-            cancel_top_usage_str, tmq_profile, GCType::WORK_LOAD_GROUP);
-    if (freed_mem >= need_free_mem) {
-        return freed_mem;
-    }
-
-    // 3. free top overcommit load
-    if (config::enable_query_memory_overcommit) {
-        tmq_profile = 
profile->create_child(fmt::format("FreeGroupTopOvercommitLoad:Name {}", name),
-                                            true, true);
-        freed_mem += MemTrackerLimiter::free_top_overcommit_query(
-                need_free_mem - freed_mem, MemTrackerLimiter::Type::LOAD, 
tracker_limiter_groups,
-                cancel_top_overcommit_str, tmq_profile, 
GCType::WORK_LOAD_GROUP);
-        if (freed_mem >= need_free_mem) {
-            return freed_mem;
-        }
-    }
-
-    // 4. free top usage load
-    tmq_profile =
-            profile->create_child(fmt::format("FreeGroupTopUsageLoad:Name {}", 
name), true, true);
-    freed_mem += MemTrackerLimiter::free_top_memory_query(
-            need_free_mem - freed_mem, MemTrackerLimiter::Type::LOAD, 
tracker_limiter_groups,
-            cancel_top_usage_str, tmq_profile, GCType::WORK_LOAD_GROUP);
-    return freed_mem;
-}
-
 } // namespace doris
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h 
b/be/src/runtime/memory/mem_tracker_limiter.h
index df20448c010..9703981af63 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -192,7 +192,13 @@ public:
     static int64_t free_top_memory_query(
             int64_t min_free_mem, Type type, std::vector<TrackerGroups>& 
tracker_groups,
             const std::function<std::string(int64_t, const std::string&)>& 
cancel_msg,
-            RuntimeProfile* profile, GCType GCtype);
+            RuntimeProfile* profile, GCType gctype);
+
+    static int64_t tg_free_top_memory_query(
+            int64_t min_free_mem, Type type,
+            std::vector<taskgroup::TgTrackerLimiterGroup>& tracker_groups,
+            const std::function<std::string(int64_t, const std::string&)>& 
cancel_msg,
+            RuntimeProfile* profile, GCType gctype);
 
     static int64_t free_top_memory_load(int64_t min_free_mem, const 
std::string& vm_rss_str,
                                         const std::string& mem_available_str,
@@ -210,7 +216,13 @@ public:
     static int64_t free_top_overcommit_query(
             int64_t min_free_mem, Type type, std::vector<TrackerGroups>& 
tracker_groups,
             const std::function<std::string(int64_t, const std::string&)>& 
cancel_msg,
-            RuntimeProfile* profile, GCType GCtype);
+            RuntimeProfile* profile, GCType gctype);
+
+    static int64_t tg_free_top_overcommit_query(
+            int64_t min_free_mem, Type type,
+            std::vector<taskgroup::TgTrackerLimiterGroup>& tracker_groups,
+            const std::function<std::string(int64_t, const std::string&)>& 
cancel_msg,
+            RuntimeProfile* profile, GCType gctype);
 
     static int64_t free_top_overcommit_load(int64_t min_free_mem, const 
std::string& vm_rss_str,
                                             const std::string& 
mem_available_str,
@@ -219,12 +231,6 @@ public:
                                          Type::LOAD);
     }
 
-    static int64_t tg_memory_limit_gc(
-            int64_t request_free_memory, int64_t used_memory, uint64_t id, 
const std::string& name,
-            int64_t memory_limit,
-            std::vector<taskgroup::TgTrackerLimiterGroup>& 
tracker_limiter_groups,
-            RuntimeProfile* profile);
-
     // only for Type::QUERY or Type::LOAD.
     static TUniqueId label_to_queryid(const std::string& label) {
         if (label.rfind("Query#Id=", 0) != 0 && label.rfind("Load#Id=", 0) != 
0) {
diff --git a/be/src/runtime/task_group/task_group.cpp 
b/be/src/runtime/task_group/task_group.cpp
index ee1be702768..ddddf39dbc8 100644
--- a/be/src/runtime/task_group/task_group.cpp
+++ b/be/src/runtime/task_group/task_group.cpp
@@ -33,6 +33,7 @@
 #include "runtime/memory/mem_tracker_limiter.h"
 #include "util/mem_info.h"
 #include "util/parse_util.h"
+#include "util/runtime_profile.h"
 #include "vec/exec/scan/scanner_scheduler.h"
 
 namespace doris {
@@ -113,14 +114,85 @@ void 
TaskGroup::remove_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> me
     _mem_tracker_limiter_pool[group_num].trackers.erase(mem_tracker_ptr);
 }
 
-void TaskGroup::task_group_info(TaskGroupInfo* tg_info) const {
-    std::shared_lock<std::shared_mutex> r_lock(_mutex);
-    tg_info->id = _id;
-    tg_info->name = _name;
-    tg_info->cpu_share = _cpu_share;
-    tg_info->memory_limit = _memory_limit;
-    tg_info->enable_memory_overcommit = _enable_memory_overcommit;
-    tg_info->version = _version;
+int64_t TaskGroup::gc_memory(int64_t need_free_mem, RuntimeProfile* profile) {
+    if (need_free_mem <= 0) {
+        return 0;
+    }
+    int64_t used_memory = memory_used();
+    int64_t freed_mem = 0;
+
+    std::string cancel_str = fmt::format(
+            "work load group memory exceeded limit, group id:{}, name:{}, 
used:{}, limit:{}, "
+            "backend:{}.",
+            _id, _name, MemTracker::print_bytes(used_memory),
+            MemTracker::print_bytes(_memory_limit), 
BackendOptions::get_localhost());
+    auto cancel_top_overcommit_str = [cancel_str](int64_t mem_consumption,
+                                                  const std::string& label) {
+        return fmt::format(
+                "{} cancel top memory overcommit tracker <{}> consumption {}. 
execute again after "
+                "enough memory, details see be.INFO.",
+                cancel_str, label, MemTracker::print_bytes(mem_consumption));
+    };
+    auto cancel_top_usage_str = [cancel_str](int64_t mem_consumption, const 
std::string& label) {
+        return fmt::format(
+                "{} cancel top memory used tracker <{}> consumption {}. 
execute again after "
+                "enough memory, details see be.INFO.",
+                cancel_str, label, MemTracker::print_bytes(mem_consumption));
+    };
+
+    LOG(INFO) << fmt::format(
+            "[MemoryGC] work load group start gc, id:{} name:{}, memory limit: 
{}, used: {}, "
+            "need_free_mem: {}.",
+            _id, _name, _memory_limit, used_memory, need_free_mem);
+    Defer defer {[&]() {
+        LOG(INFO) << fmt::format(
+                "[MemoryGC] work load group finished gc, id:{} name:{}, memory 
limit: {}, used: "
+                "{}, need_free_mem: {}, freed memory: {}.",
+                _id, _name, _memory_limit, used_memory, need_free_mem, 
freed_mem);
+    }};
+
+    // 1. free top overcommit query
+    if (config::enable_query_memory_overcommit) {
+        RuntimeProfile* tmq_profile = profile->create_child(
+                fmt::format("FreeGroupTopOvercommitQuery:Name {}", _name), 
true, true);
+        freed_mem += MemTrackerLimiter::tg_free_top_overcommit_query(
+                need_free_mem - freed_mem, MemTrackerLimiter::Type::QUERY,
+                _mem_tracker_limiter_pool, cancel_top_overcommit_str, 
tmq_profile,
+                MemTrackerLimiter::GCType::WORK_LOAD_GROUP);
+    }
+    if (freed_mem >= need_free_mem) {
+        return freed_mem;
+    }
+
+    // 2. free top usage query
+    RuntimeProfile* tmq_profile =
+            profile->create_child(fmt::format("FreeGroupTopUsageQuery:Name 
{}", _name), true, true);
+    freed_mem += MemTrackerLimiter::tg_free_top_memory_query(
+            need_free_mem - freed_mem, MemTrackerLimiter::Type::QUERY, 
_mem_tracker_limiter_pool,
+            cancel_top_usage_str, tmq_profile, 
MemTrackerLimiter::GCType::WORK_LOAD_GROUP);
+    if (freed_mem >= need_free_mem) {
+        return freed_mem;
+    }
+
+    // 3. free top overcommit load
+    if (config::enable_query_memory_overcommit) {
+        tmq_profile = profile->create_child(
+                fmt::format("FreeGroupTopOvercommitLoad:Name {}", _name), 
true, true);
+        freed_mem += MemTrackerLimiter::tg_free_top_overcommit_query(
+                need_free_mem - freed_mem, MemTrackerLimiter::Type::LOAD, 
_mem_tracker_limiter_pool,
+                cancel_top_overcommit_str, tmq_profile, 
MemTrackerLimiter::GCType::WORK_LOAD_GROUP);
+        if (freed_mem >= need_free_mem) {
+            return freed_mem;
+        }
+    }
+
+    // 4. free top usage load
+    tmq_profile =
+            profile->create_child(fmt::format("FreeGroupTopUsageLoad:Name {}", 
_name), true, true);
+    freed_mem += MemTrackerLimiter::tg_free_top_memory_query(
+            need_free_mem - freed_mem, MemTrackerLimiter::Type::LOAD, 
_mem_tracker_limiter_pool,
+            cancel_top_usage_str, tmq_profile, 
MemTrackerLimiter::GCType::WORK_LOAD_GROUP);
+    return freed_mem;
 }
 
 Status TaskGroupInfo::parse_topic_info(const TWorkloadGroupInfo& 
workload_group_info,
diff --git a/be/src/runtime/task_group/task_group.h 
b/be/src/runtime/task_group/task_group.h
index 3767731435a..7604ee45121 100644
--- a/be/src/runtime/task_group/task_group.h
+++ b/be/src/runtime/task_group/task_group.h
@@ -35,6 +35,7 @@
 namespace doris {
 
 class MemTrackerLimiter;
+class RuntimeProfile;
 
 namespace pipeline {
 class PipelineTask;
@@ -83,12 +84,6 @@ public:
 
     void remove_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> 
mem_tracker_ptr);
 
-    void task_group_info(TaskGroupInfo* tg_info) const;
-
-    std::vector<TgTrackerLimiterGroup>& mem_tracker_limiter_pool() {
-        return _mem_tracker_limiter_pool;
-    }
-
     // when mem_limit <=0 , it's an invalid value, then current group not 
participating in memory GC
     // because mem_limit is not a required property
     bool is_mem_limit_valid() {
@@ -124,6 +119,8 @@ public:
         return _query_id_set.size();
     }
 
+    int64_t gc_memory(int64_t need_free_mem, RuntimeProfile* profile);
+
 private:
     mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, 
_memory_limit
     const uint64_t _id;
diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp
index 1eaa3eacaf5..3d9c4c4b062 100644
--- a/be/src/util/mem_info.cpp
+++ b/be/src/util/mem_info.cpp
@@ -256,9 +256,7 @@ int64_t MemInfo::tg_not_enable_overcommit_group_gc() {
 
     std::vector<taskgroup::TaskGroupPtr> task_groups_overcommit;
     for (const auto& task_group : task_groups) {
-        taskgroup::TaskGroupInfo tg_info;
-        task_group->task_group_info(&tg_info);
-        if (task_group->memory_used() > tg_info.memory_limit) {
+        if (task_group->memory_used() > task_group->memory_limit()) {
             task_groups_overcommit.push_back(task_group);
         }
     }
@@ -286,12 +284,9 @@ int64_t MemInfo::tg_not_enable_overcommit_group_gc() {
     }};
 
     for (const auto& task_group : task_groups_overcommit) {
-        taskgroup::TaskGroupInfo tg_info;
-        task_group->task_group_info(&tg_info);
         auto used = task_group->memory_used();
-        total_free_memory += MemTrackerLimiter::tg_memory_limit_gc(
-                used - tg_info.memory_limit, used, tg_info.id, tg_info.name, 
tg_info.memory_limit,
-                task_group->mem_tracker_limiter_pool(), tg_profile.get());
+        total_free_memory +=
+                task_group->gc_memory(used - task_group->memory_limit(), 
tg_profile.get());
     }
     return total_free_memory;
 }
@@ -362,11 +357,7 @@ int64_t MemInfo::tg_enable_overcommit_group_gc(int64_t 
request_free_memory,
                                 : static_cast<double>(exceeded_memorys[i]) / 
total_exceeded_memory *
                                           request_free_memory /* exceeded 
memory as a weight */;
         auto task_group = task_groups[i];
-        taskgroup::TaskGroupInfo tg_info;
-        task_group->task_group_info(&tg_info);
-        total_free_memory += MemTrackerLimiter::tg_memory_limit_gc(
-                tg_need_free_memory, used_memorys[i], tg_info.id, tg_info.name,
-                tg_info.memory_limit, task_group->mem_tracker_limiter_pool(), 
profile);
+        total_free_memory += task_group->gc_memory(tg_need_free_memory, 
profile);
     }
     return total_free_memory;
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to