This is an automated email from the ASF dual-hosted git repository.

luozenglin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c9c9e9765 [feature-wip](resource-group) Supports memory hard 
isolation of resource group (#19526)
6c9c9e9765 is described below

commit 6c9c9e97658859168ac7f43d54aa293491a8d257
Author: luozenglin <luozeng...@baidu.com>
AuthorDate: Mon May 15 22:45:46 2023 +0800

    [feature-wip](resource-group) Supports memory hard isolation of resource 
group (#19526)
---
 be/src/common/daemon.cpp                           |  19 ++-
 be/src/pipeline/task_queue.cpp                     |   6 +-
 be/src/pipeline/task_queue.h                       |  14 +-
 be/src/pipeline/task_scheduler.cpp                 |   9 +-
 be/src/pipeline/task_scheduler.h                   |   4 +-
 be/src/runtime/fragment_mgr.cpp                    |  37 ++---
 be/src/runtime/memory/mem_tracker.h                |   6 +-
 be/src/runtime/memory/mem_tracker_limiter.cpp      | 180 +++++++++++++--------
 be/src/runtime/memory/mem_tracker_limiter.h        |  66 +++++++-
 be/src/runtime/query_context.h                     |   3 +
 be/src/runtime/task_group/task_group.cpp           | 107 +++++++++---
 be/src/runtime/task_group/task_group.h             |  39 +++--
 be/src/runtime/task_group/task_group_manager.cpp   |  35 +++-
 be/src/runtime/task_group/task_group_manager.h     |   2 +
 .../java/org/apache/doris/qe/StmtExecutor.java     |   9 +-
 .../resource/resourcegroup/ResourceGroup.java      |  43 +++--
 .../resource/resourcegroup/ResourceGroupMgr.java   |  23 ++-
 .../resourcegroup/ResourceGroupMgrTest.java        |   6 +
 .../resource/resourcegroup/ResourceGroupTest.java  |  21 ++-
 19 files changed, 440 insertions(+), 189 deletions(-)

diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index 5c832cfd77..592442b920 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -47,6 +47,7 @@
 #include "runtime/load_channel_mgr.h"
 #include "runtime/memory/mem_tracker.h"
 #include "runtime/memory/mem_tracker_limiter.h"
+#include "runtime/task_group/task_group_manager.h"
 #include "runtime/user_function_cache.h"
 #include "service/backend_options.h"
 #include "util/cpu_info.h"
@@ -230,10 +231,16 @@ void Daemon::memory_gc_thread() {
         if (!MemInfo::initialized() || !ExecEnv::GetInstance()->initialized()) 
{
             continue;
         }
+        auto sys_mem_available = doris::MemInfo::sys_mem_available();
+        auto proc_mem_no_allocator_cache = 
doris::MemInfo::proc_mem_no_allocator_cache();
+
+        auto tg_free_mem = 
taskgroup::TaskGroupManager::instance()->memory_limit_gc();
+        sys_mem_available += tg_free_mem;
+        proc_mem_no_allocator_cache -= tg_free_mem;
+
         if (memory_full_gc_sleep_time_ms <= 0 &&
-            (doris::MemInfo::sys_mem_available() <
-                     doris::MemInfo::sys_mem_available_low_water_mark() ||
-             doris::MemInfo::proc_mem_no_allocator_cache() >= 
doris::MemInfo::mem_limit())) {
+            (sys_mem_available < 
doris::MemInfo::sys_mem_available_low_water_mark() ||
+             proc_mem_no_allocator_cache >= doris::MemInfo::mem_limit())) {
             // No longer full gc and minor gc during sleep.
             memory_full_gc_sleep_time_ms = config::memory_gc_sleep_time_s * 
1000;
             memory_minor_gc_sleep_time_ms = config::memory_gc_sleep_time_s * 
1000;
@@ -243,10 +250,8 @@ void Daemon::memory_gc_thread() {
                 doris::MemTrackerLimiter::enable_print_log_process_usage();
             }
         } else if (memory_minor_gc_sleep_time_ms <= 0 &&
-                   (doris::MemInfo::sys_mem_available() <
-                            
doris::MemInfo::sys_mem_available_warning_water_mark() ||
-                    doris::MemInfo::proc_mem_no_allocator_cache() >=
-                            doris::MemInfo::soft_mem_limit())) {
+                   (sys_mem_available < 
doris::MemInfo::sys_mem_available_warning_water_mark() ||
+                    proc_mem_no_allocator_cache >= 
doris::MemInfo::soft_mem_limit())) {
             // No minor gc during sleep, but full gc is possible.
             memory_minor_gc_sleep_time_ms = config::memory_gc_sleep_time_s * 
1000;
             doris::MemTrackerLimiter::print_log_process_usage("process minor 
gc", false);
diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp
index 8078d7b414..807a344cf9 100644
--- a/be/src/pipeline/task_queue.cpp
+++ b/be/src/pipeline/task_queue.cpp
@@ -350,8 +350,8 @@ void TaskGroupTaskQueue::update_statistics(PipelineTask* 
task, int64_t time_spen
     }
 }
 
-void TaskGroupTaskQueue::update_task_group(const taskgroup::TaskGroupInfo& 
task_group_info,
-                                           taskgroup::TaskGroupPtr& 
task_group) {
+void TaskGroupTaskQueue::update_tg_cpu_share(const taskgroup::TaskGroupInfo& 
task_group_info,
+                                             taskgroup::TaskGroupPtr 
task_group) {
     std::unique_lock<std::mutex> lock(_rs_mutex);
     auto* entity = task_group->task_entity();
     bool is_in_queue = _group_entities.find(entity) != _group_entities.end();
@@ -359,7 +359,7 @@ void TaskGroupTaskQueue::update_task_group(const 
taskgroup::TaskGroupInfo& task_
         _group_entities.erase(entity);
         _total_cpu_share -= entity->cpu_share();
     }
-    task_group->check_and_update(task_group_info);
+    task_group->update_cpu_share_unlock(task_group_info);
     if (is_in_queue) {
         _group_entities.emplace(entity);
         _total_cpu_share += entity->cpu_share();
diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h
index 6b16b70f99..9956ba3cb9 100644
--- a/be/src/pipeline/task_queue.h
+++ b/be/src/pipeline/task_queue.h
@@ -53,8 +53,8 @@ public:
 
     virtual void update_statistics(PipelineTask* task, int64_t time_spent) {}
 
-    virtual void update_task_group(const taskgroup::TaskGroupInfo& 
task_group_info,
-                                   taskgroup::TaskGroupPtr& task_group) = 0;
+    virtual void update_tg_cpu_share(const taskgroup::TaskGroupInfo& 
task_group_info,
+                                     taskgroup::TaskGroupPtr task_group) = 0;
 
     int cores() const { return _core_size; }
 
@@ -154,9 +154,9 @@ public:
                                                                          
time_spent);
     }
 
-    void update_task_group(const taskgroup::TaskGroupInfo& task_group_info,
-                           taskgroup::TaskGroupPtr& task_group) override {
-        LOG(FATAL) << "update_task_group not implemented";
+    void update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info,
+                             taskgroup::TaskGroupPtr task_group) override {
+        LOG(FATAL) << "update_tg_cpu_share not implemented";
     }
 
 private:
@@ -184,8 +184,8 @@ public:
 
     void update_statistics(PipelineTask* task, int64_t time_spent) override;
 
-    void update_task_group(const taskgroup::TaskGroupInfo& task_group_info,
-                           taskgroup::TaskGroupPtr& task_group) override;
+    void update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info,
+                             taskgroup::TaskGroupPtr task_group) override;
 
 private:
     template <bool from_executor>
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index cd219a25e0..53e9a4d868 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -354,12 +354,9 @@ void TaskScheduler::shutdown() {
     }
 }
 
-void TaskScheduler::try_update_task_group(const taskgroup::TaskGroupInfo& 
task_group_info,
-                                          taskgroup::TaskGroupPtr& task_group) 
{
-    if (!task_group->check_version(task_group_info._version)) {
-        return;
-    }
-    _task_queue->update_task_group(task_group_info, task_group);
+void TaskScheduler::update_tg_cpu_share(const taskgroup::TaskGroupInfo& 
task_group_info,
+                                        taskgroup::TaskGroupPtr task_group) {
+    _task_queue->update_tg_cpu_share(task_group_info, task_group);
 }
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h
index b2a665f034..1fcbe2c068 100644
--- a/be/src/pipeline/task_scheduler.h
+++ b/be/src/pipeline/task_scheduler.h
@@ -91,8 +91,8 @@ public:
 
     void shutdown();
 
-    void try_update_task_group(const taskgroup::TaskGroupInfo& task_group_info,
-                               taskgroup::TaskGroupPtr& task_group);
+    void update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info,
+                             taskgroup::TaskGroupPtr task_group);
 
 private:
     std::unique_ptr<ThreadPool> _fix_thread_pool;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 0cca7fa10f..c8e6a8b25e 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -56,7 +56,6 @@
 #include "io/fs/stream_load_pipe.h"
 #include "opentelemetry/trace/scope.h"
 #include "pipeline/pipeline_fragment_context.h"
-#include "pipeline/task_scheduler.h"
 #include "runtime/client_cache.h"
 #include "runtime/descriptors.h"
 #include "runtime/exec_env.h"
@@ -691,6 +690,25 @@ Status FragmentMgr::_get_query_ctx(const Params& params, 
TUniqueId query_id, boo
             query_ctx->query_mem_tracker->enable_print_log_usage();
         }
 
+        if constexpr (std::is_same_v<TPipelineFragmentParams, Params>) {
+            if (params.__isset.resource_groups && 
!params.resource_groups.empty()) {
+                taskgroup::TaskGroupInfo task_group_info;
+                auto status = 
taskgroup::TaskGroupInfo::parse_group_info(params.resource_groups[0],
+                                                                         
&task_group_info);
+                if (status.ok()) {
+                    auto tg = 
taskgroup::TaskGroupManager::instance()->get_or_create_task_group(
+                            task_group_info);
+                    tg->add_mem_tracker_limiter(query_ctx->query_mem_tracker);
+                    query_ctx->set_task_group(tg);
+                    LOG(INFO) << "Query/load id: " << 
print_id(query_ctx->query_id)
+                              << " use task group: " << tg->debug_string();
+                }
+            } else {
+                VLOG_DEBUG << "Query/load id: " << 
print_id(query_ctx->query_id)
+                           << " does not use task group.";
+            }
+        }
+
         {
             // Find _query_ctx_map again, in case some other request has 
already
             // create the query fragments context.
@@ -803,23 +821,6 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
     std::shared_ptr<QueryContext> query_ctx;
     RETURN_IF_ERROR(_get_query_ctx(params, params.query_id, true, query_ctx));
 
-    if (params.__isset.resource_groups && !params.resource_groups.empty()) {
-        taskgroup::TaskGroupInfo task_group_info;
-        auto status = 
taskgroup::TaskGroupInfo::parse_group_info(params.resource_groups[0],
-                                                                 
&task_group_info);
-        if (status.ok()) {
-            auto tg = 
taskgroup::TaskGroupManager::instance()->get_or_create_task_group(
-                    task_group_info);
-            
_exec_env->pipeline_task_group_scheduler()->try_update_task_group(task_group_info,
 tg);
-            query_ctx->set_task_group(tg);
-            LOG(INFO) << "Query/load id: " << print_id(query_ctx->query_id)
-                      << " use task group: " << tg->debug_string();
-        }
-    } else {
-        VLOG_DEBUG << "Query/load id: " << print_id(query_ctx->query_id)
-                   << " does not use task group.";
-    }
-
     for (size_t i = 0; i < params.local_params.size(); i++) {
         const auto& local_params = params.local_params[i];
 
diff --git a/be/src/runtime/memory/mem_tracker.h 
b/be/src/runtime/memory/mem_tracker.h
index 81f901879c..c21ef478e0 100644
--- a/be/src/runtime/memory/mem_tracker.h
+++ b/be/src/runtime/memory/mem_tracker.h
@@ -115,7 +115,7 @@ public:
     // For MemTrackerLimiter
     MemTracker() { _parent_group_num = -1; }
 
-    ~MemTracker();
+    virtual ~MemTracker();
 
     static std::string print_bytes(int64_t bytes) {
         return bytes >= 0 ? PrettyPrinter::print(bytes, TUnit::BYTES)
@@ -154,13 +154,13 @@ public:
     static void refresh_all_tracker_profile();
 
 public:
-    Snapshot make_snapshot() const;
+    virtual Snapshot make_snapshot() const;
     // Specify group_num from mem_tracker_pool to generate snapshot.
     static void make_group_snapshot(std::vector<Snapshot>* snapshots, int64_t 
group_num,
                                     std::string parent_label);
     static std::string log_usage(MemTracker::Snapshot snapshot);
 
-    std::string debug_string() {
+    virtual std::string debug_string() {
         std::stringstream msg;
         msg << "label: " << _label << "; "
             << "consumption: " << consumption() << "; "
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp 
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index d9974bef7f..076af90fba 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -29,6 +29,7 @@
 #include "runtime/exec_env.h"
 #include "runtime/fragment_mgr.h"
 #include "runtime/load_channel_mgr.h"
+#include "runtime/task_group/task_group.h"
 #include "service/backend_options.h"
 #include "util/mem_info.h"
 #include "util/perf_counters.h"
@@ -37,15 +38,10 @@
 
 namespace doris {
 
-struct TrackerLimiterGroup {
-    std::list<MemTrackerLimiter*> trackers;
-    std::mutex group_lock;
-};
-
 // Save all MemTrackerLimiters in use.
 // Each group corresponds to several MemTrackerLimiters and has a lock.
 // Multiple groups are used to reduce the impact of locks.
-static std::vector<TrackerLimiterGroup> mem_tracker_limiter_pool(1000);
+static std::vector<TrackerLimiterGroup> 
mem_tracker_limiter_pool(MEM_TRACKER_GROUP_NUM);
 
 std::atomic<bool> MemTrackerLimiter::_enable_print_log_process_usage {true};
 bool MemTrackerLimiter::_oom_avoidance {true};
@@ -94,7 +90,7 @@ MemTrackerLimiter::~MemTrackerLimiter() {
 
 MemTracker::Snapshot MemTrackerLimiter::make_snapshot() const {
     Snapshot snapshot;
-    snapshot.type = TypeString[_type];
+    snapshot.type = type_string(_type);
     snapshot.label = _label;
     snapshot.limit = _limit;
     snapshot.cur_consumption = _consumption->current_value();
@@ -131,7 +127,7 @@ void 
MemTrackerLimiter::make_process_snapshots(std::vector<MemTracker::Snapshot>
     int64_t process_mem_sum = 0;
     Snapshot snapshot;
     for (auto it : MemTrackerLimiter::TypeMemSum) {
-        snapshot.type = TypeString[it.first];
+        snapshot.type = type_string(it.first);
         snapshot.label = "";
         snapshot.limit = -1;
         snapshot.cur_consumption = it.second->current_value();
@@ -315,14 +311,35 @@ std::string 
MemTrackerLimiter::tracker_limit_exceeded_str(int64_t bytes) {
 int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem,
                                                  const std::string& vm_rss_str,
                                                  const std::string& 
mem_available_str, Type type) {
-    std::priority_queue<std::pair<int64_t, std::string>,
-                        std::vector<std::pair<int64_t, std::string>>,
-                        std::greater<std::pair<int64_t, std::string>>>
-            min_pq;
+    return free_top_memory_query(
+            min_free_mem, type, mem_tracker_limiter_pool,
+            [&vm_rss_str, &mem_available_str, &type](int64_t mem_consumption,
+                                                     const std::string& label) 
{
+                return fmt::format(
+                        "Process has no memory available, cancel top memory 
usage {}: "
+                        "{} memory tracker <{}> consumption {}, backend {} "
+                        "process memory used {} exceed limit {} or sys mem 
available {} "
+                        "less than low water mark {}. Execute again after 
enough memory, "
+                        "details see be.INFO.",
+                        type_string(type), type_string(type), label, 
print_bytes(mem_consumption),
+                        BackendOptions::get_localhost(), vm_rss_str, 
MemInfo::mem_limit_str(),
+                        mem_available_str,
+                        
print_bytes(MemInfo::sys_mem_available_low_water_mark()));
+            });
+}
+
+template <typename TrackerGroups>
+int64_t MemTrackerLimiter::free_top_memory_query(
+        int64_t min_free_mem, Type type, std::vector<TrackerGroups>& 
tracker_groups,
+        const std::function<std::string(int64_t, const std::string&)>& 
cancel_msg) {
+    using MemTrackerMinQueue = std::priority_queue<std::pair<int64_t, 
std::string>,
+                                                   
std::vector<std::pair<int64_t, std::string>>,
+                                                   
std::greater<std::pair<int64_t, std::string>>>;
+    MemTrackerMinQueue min_pq;
     // After greater than min_free_mem, will not be modified.
     int64_t prepare_free_mem = 0;
 
-    auto cancel_top_query = [&](auto min_pq) -> int64_t {
+    auto cancel_top_query = [&cancel_msg, type](auto& min_pq) -> int64_t {
         std::vector<std::string> usage_strings;
         int64_t freed_mem = 0;
         while (!min_pq.empty()) {
@@ -333,15 +350,7 @@ int64_t MemTrackerLimiter::free_top_memory_query(int64_t 
min_free_mem,
             }
             ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
                     cancelled_queryid, 
PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED,
-                    fmt::format("Process has no memory available, cancel top 
memory usage {}: "
-                                "{} memory tracker <{}> consumption {}, 
backend {} "
-                                "process memory used {} exceed limit {} or sys 
mem available {} "
-                                "less than low water mark {}. Execute again 
after enough memory, "
-                                "details see be.INFO.",
-                                TypeString[type], TypeString[type], 
min_pq.top().second,
-                                print_bytes(min_pq.top().first), 
BackendOptions::get_localhost(),
-                                vm_rss_str, MemInfo::mem_limit_str(), 
mem_available_str,
-                                
print_bytes(MemInfo::sys_mem_available_low_water_mark())));
+                    cancel_msg(min_pq.top().first, min_pq.top().second));
 
             freed_mem += min_pq.top().first;
             usage_strings.push_back(fmt::format("{} memory usage {} Bytes", 
min_pq.top().second,
@@ -349,38 +358,34 @@ int64_t MemTrackerLimiter::free_top_memory_query(int64_t 
min_free_mem,
             min_pq.pop();
         }
         if (!usage_strings.empty()) {
-            LOG(INFO) << "Process GC Free Top Memory Usage " << 
TypeString[type] << ": "
+            LOG(INFO) << "Process GC Free Top Memory Usage " << 
type_string(type) << ": "
                       << join(usage_strings, ",");
         }
         return freed_mem;
     };
 
-    for (unsigned i = 1; i < mem_tracker_limiter_pool.size(); ++i) {
-        std::lock_guard<std::mutex> l(mem_tracker_limiter_pool[i].group_lock);
-        for (auto tracker : mem_tracker_limiter_pool[i].trackers) {
+    for (unsigned i = 1; i < tracker_groups.size(); ++i) {
+        std::lock_guard<std::mutex> l(tracker_groups[i].group_lock);
+        for (auto tracker : tracker_groups[i].trackers) {
             if (tracker->type() == type) {
                 if (ExecEnv::GetInstance()->fragment_mgr()->query_is_canceled(
                             label_to_queryid(tracker->label()))) {
                     continue;
                 }
                 if (tracker->consumption() > min_free_mem) {
-                    std::priority_queue<std::pair<int64_t, std::string>,
-                                        std::vector<std::pair<int64_t, 
std::string>>,
-                                        std::greater<std::pair<int64_t, 
std::string>>>
-                            min_pq_null;
-                    std::swap(min_pq, min_pq_null);
-                    min_pq.push(std::pair<int64_t, 
std::string>(tracker->consumption(),
-                                                                
tracker->label()));
-                    return cancel_top_query(min_pq);
+                    MemTrackerMinQueue min_pq_single;
+                    min_pq_single.emplace(tracker->consumption(), 
tracker->label());
+                    return cancel_top_query(min_pq_single);
                 } else if (tracker->consumption() + prepare_free_mem < 
min_free_mem) {
-                    min_pq.push(std::pair<int64_t, 
std::string>(tracker->consumption(),
-                                                                
tracker->label()));
+                    min_pq.emplace(tracker->consumption(), tracker->label());
                     prepare_free_mem += tracker->consumption();
                 } else if (tracker->consumption() > min_pq.top().first) {
-                    // No need to modify prepare_free_mem, prepare_free_mem 
will always be greater than min_free_mem.
-                    min_pq.push(std::pair<int64_t, 
std::string>(tracker->consumption(),
-                                                                
tracker->label()));
-                    min_pq.pop();
+                    min_pq.emplace(tracker->consumption(), tracker->label());
+                    prepare_free_mem += tracker->consumption();
+                    while (prepare_free_mem - min_pq.top().first > 
min_free_mem) {
+                        prepare_free_mem -= min_pq.top().first;
+                        min_pq.pop();
+                    }
                 }
             }
         }
@@ -392,15 +397,33 @@ int64_t 
MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem,
                                                      const std::string& 
vm_rss_str,
                                                      const std::string& 
mem_available_str,
                                                      Type type) {
-    std::priority_queue<std::pair<int64_t, std::string>,
-                        std::vector<std::pair<int64_t, std::string>>,
-                        std::greater<std::pair<int64_t, std::string>>>
-            min_pq;
+    return free_top_overcommit_query(
+            min_free_mem, type, mem_tracker_limiter_pool,
+            [&vm_rss_str, &mem_available_str, &type](int64_t mem_consumption,
+                                                     const std::string& label) 
{
+                return fmt::format(
+                        "Process has less memory, cancel top memory overcommit 
{}: "
+                        "{} memory tracker <{}> consumption {}, backend {} "
+                        "process memory used {} exceed soft limit {} or sys 
mem available {} "
+                        "less than warning water mark {}. Execute again after 
enough memory, "
+                        "details see be.INFO.",
+                        type_string(type), type_string(type), label, 
print_bytes(mem_consumption),
+                        BackendOptions::get_localhost(), vm_rss_str, 
MemInfo::soft_mem_limit_str(),
+                        mem_available_str,
+                        
print_bytes(MemInfo::sys_mem_available_warning_water_mark()));
+            });
+}
+
+template <typename TrackerGroups>
+int64_t MemTrackerLimiter::free_top_overcommit_query(
+        int64_t min_free_mem, Type type, std::vector<TrackerGroups>& 
tracker_groups,
+        const std::function<std::string(int64_t, const std::string&)>& 
cancel_msg) {
+    std::priority_queue<std::pair<int64_t, std::string>> max_pq;
     std::unordered_map<std::string, int64_t> query_consumption;
 
-    for (unsigned i = 1; i < mem_tracker_limiter_pool.size(); ++i) {
-        std::lock_guard<std::mutex> l(mem_tracker_limiter_pool[i].group_lock);
-        for (auto tracker : mem_tracker_limiter_pool[i].trackers) {
+    for (unsigned i = 1; i < tracker_groups.size(); ++i) {
+        std::lock_guard<std::mutex> l(tracker_groups[i].group_lock);
+        for (auto tracker : tracker_groups[i].trackers) {
             if (tracker->type() == type) {
                 if (tracker->consumption() <= 33554432) { // 32M small query 
does not cancel
                     continue;
@@ -411,7 +434,7 @@ int64_t 
MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem,
                 }
                 int64_t overcommit_ratio =
                         (static_cast<double>(tracker->consumption()) / 
tracker->limit()) * 10000;
-                min_pq.push(std::pair<int64_t, std::string>(overcommit_ratio, 
tracker->label()));
+                max_pq.emplace(overcommit_ratio, tracker->label());
                 query_consumption[tracker->label()] = tracker->consumption();
             }
         }
@@ -422,13 +445,6 @@ int64_t 
MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem,
         return 0;
     }
 
-    std::priority_queue<std::pair<int64_t, std::string>> max_pq;
-    // Min-heap to Max-heap.
-    while (!min_pq.empty()) {
-        max_pq.push(min_pq.top());
-        min_pq.pop();
-    }
-
     std::vector<std::string> usage_strings;
     int64_t freed_mem = 0;
     while (!max_pq.empty()) {
@@ -440,15 +456,7 @@ int64_t 
MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem,
         int64_t query_mem = query_consumption[max_pq.top().second];
         ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
                 cancelled_queryid, 
PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED,
-                fmt::format("Process has less memory, cancel top memory 
overcommit {}: "
-                            "{} memory tracker <{}> consumption {}, backend {} 
"
-                            "process memory used {} exceed soft limit {} or 
sys mem available {} "
-                            "less than warning water mark {}. Execute again 
after enough memory, "
-                            "details see be.INFO.",
-                            TypeString[type], TypeString[type], 
max_pq.top().second,
-                            print_bytes(query_mem), 
BackendOptions::get_localhost(), vm_rss_str,
-                            MemInfo::soft_mem_limit_str(), mem_available_str,
-                            
print_bytes(MemInfo::sys_mem_available_warning_water_mark())));
+                cancel_msg(query_mem, max_pq.top().second));
 
         usage_strings.push_back(fmt::format("{} memory usage {} Bytes, 
overcommit ratio: {}",
                                             max_pq.top().second, query_mem, 
max_pq.top().first));
@@ -459,10 +467,52 @@ int64_t 
MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem,
         max_pq.pop();
     }
     if (!usage_strings.empty()) {
-        LOG(INFO) << "Process GC Free Top Memory Overcommit " << 
TypeString[type] << ": "
+        LOG(INFO) << "Process GC Free Top Memory Overcommit " << 
type_string(type) << ": "
                   << join(usage_strings, ",");
     }
     return freed_mem;
 }
 
+int64_t MemTrackerLimiter::tg_memory_limit_gc(
+        uint64_t id, const std::string& name, int64_t memory_limit,
+        std::vector<taskgroup::TgTrackerLimiterGroup>& tracker_limiter_groups) 
{
+    int64_t used_memory = 0;
+    for (auto& mem_tracker_group : tracker_limiter_groups) {
+        std::lock_guard<std::mutex> l(mem_tracker_group.group_lock);
+        for (const auto& tracker : mem_tracker_group.trackers) {
+            used_memory += tracker->consumption();
+        }
+    }
+
+    if (used_memory <= memory_limit) {
+        return 0;
+    }
+
+    int64_t need_free_mem = used_memory - memory_limit;
+    int64_t freed_mem = 0;
+    constexpr auto query_type = MemTrackerLimiter::Type::QUERY;
+    auto cancel_str = [id, &name, memory_limit, used_memory](int64_t 
mem_consumption,
+                                                             const 
std::string& label) {
+        return fmt::format(
+                "Resource group id:{}, name:{} memory exceeded limit, cancel 
top memory {}: "
+                "memory tracker <{}> consumption {}, backend {}, "
+                "resource group memory used {}, memory limit {}.",
+                id, name, MemTrackerLimiter::type_string(query_type), label,
+                MemTracker::print_bytes(mem_consumption), 
BackendOptions::get_localhost(),
+                MemTracker::print_bytes(used_memory), 
MemTracker::print_bytes(memory_limit));
+    };
+    if (config::enable_query_memroy_overcommit) {
+        freed_mem += MemTrackerLimiter::free_top_overcommit_query(
+                need_free_mem - freed_mem, query_type, tracker_limiter_groups, 
cancel_str);
+    }
+    if (freed_mem < need_free_mem) {
+        freed_mem += MemTrackerLimiter::free_top_memory_query(need_free_mem - 
freed_mem, query_type,
+                                                              
tracker_limiter_groups, cancel_str);
+    }
+    LOG(INFO) << fmt::format(
+            "task group {} finished gc, memory_limit: {}, used_memory: {}, 
freed_mem: {}.", name,
+            memory_limit, used_memory, freed_mem);
+    return freed_mem;
+}
+
 } // namespace doris
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h 
b/be/src/runtime/memory/mem_tracker_limiter.h
index 7665a029f8..3cc7108b27 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -28,6 +28,7 @@
 #include <list>
 #include <memory>
 #include <ostream>
+#include <queue>
 #include <string>
 #include <unordered_map>
 #include <vector>
@@ -41,6 +42,21 @@
 
 namespace doris {
 
+constexpr auto MEM_TRACKER_GROUP_NUM = 1000;
+
+namespace taskgroup {
+struct TgTrackerLimiterGroup;
+class TaskGroup;
+using TaskGroupPtr = std::shared_ptr<TaskGroup>;
+} // namespace taskgroup
+
+class MemTrackerLimiter;
+
+struct TrackerLimiterGroup {
+    std::list<MemTrackerLimiter*> trackers;
+    std::mutex group_lock;
+};
+
 // Track and limit the memory usage of process and query.
 // Contains an limit, arranged into a tree structure.
 //
@@ -49,7 +65,7 @@ namespace doris {
 // will be recorded on this Query, otherwise it will be recorded in Orphan 
Tracker by default.
 class MemTrackerLimiter final : public MemTracker {
 public:
-    enum Type {
+    enum class Type {
         GLOBAL = 0,        // Life cycle is the same as the process, e.g. 
Cache and default Orphan
         QUERY = 1,         // Count the memory consumption of all Query tasks.
         LOAD = 2,          // Count the memory consumption of all Load tasks.
@@ -76,9 +92,6 @@ public:
                           {Type::EXPERIMENTAL,
                            
std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES)}};
 
-    inline static const std::string TypeString[] = {
-            "global", "query", "load", "compaction", "schema_change", "clone", 
"experimental"};
-
 public:
     // byte_limit equal to -1 means no consumption limit, only participate in 
process memory statistics.
     MemTrackerLimiter(Type type, const std::string& label = std::string(), 
int64_t byte_limit = -1,
@@ -87,6 +100,28 @@ public:
 
     ~MemTrackerLimiter();
 
+    static std::string type_string(Type type) {
+        switch (type) {
+        case Type::GLOBAL:
+            return "global";
+        case Type::QUERY:
+            return "query";
+        case Type::LOAD:
+            return "load";
+        case Type::COMPACTION:
+            return "compaction";
+        case Type::SCHEMA_CHANGE:
+            return "schema_change";
+        case Type::CLONE:
+            return "clone";
+        case Type::EXPERIMENTAL:
+            return "experimental";
+        default:
+            LOG(FATAL) << "not match type of mem tracker limiter :" << 
static_cast<int>(type);
+        }
+        __builtin_unreachable();
+    }
+
     static bool sys_mem_exceed_limit_check(int64_t bytes);
 
     void set_consumption() { LOG(FATAL) << "MemTrackerLimiter set_consumption 
not supported"; }
@@ -118,7 +153,7 @@ public:
     static void refresh_global_counter();
     static void refresh_all_tracker_profile();
 
-    Snapshot make_snapshot() const;
+    Snapshot make_snapshot() const override;
     // Returns a list of all the valid tracker snapshots.
     static void make_process_snapshots(std::vector<MemTracker::Snapshot>* 
snapshots);
     static void make_type_snapshots(std::vector<MemTracker::Snapshot>* 
snapshots, Type type);
@@ -137,6 +172,12 @@ public:
     static int64_t free_top_memory_query(int64_t min_free_mem, const 
std::string& vm_rss_str,
                                          const std::string& mem_available_str,
                                          Type type = Type::QUERY);
+
+    template <typename TrackerGroups>
+    static int64_t free_top_memory_query(
+            int64_t min_free_mem, Type type, std::vector<TrackerGroups>& 
tracker_groups,
+            const std::function<std::string(int64_t, const std::string&)>& 
cancel_msg);
+
     static int64_t free_top_memory_load(int64_t min_free_mem, const 
std::string& vm_rss_str,
                                         const std::string& mem_available_str) {
         return free_top_memory_query(min_free_mem, vm_rss_str, 
mem_available_str, Type::LOAD);
@@ -146,10 +187,21 @@ public:
     static int64_t free_top_overcommit_query(int64_t min_free_mem, const 
std::string& vm_rss_str,
                                              const std::string& 
mem_available_str,
                                              Type type = Type::QUERY);
+
+    template <typename TrackerGroups>
+    static int64_t free_top_overcommit_query(
+            int64_t min_free_mem, Type type, std::vector<TrackerGroups>& 
tracker_groups,
+            const std::function<std::string(int64_t, const std::string&)>& 
cancel_msg);
+
     static int64_t free_top_overcommit_load(int64_t min_free_mem, const 
std::string& vm_rss_str,
                                             const std::string& 
mem_available_str) {
         return free_top_overcommit_query(min_free_mem, vm_rss_str, 
mem_available_str, Type::LOAD);
     }
+
+    static int64_t tg_memory_limit_gc(
+            uint64_t id, const std::string& name, int64_t memory_limit,
+            std::vector<taskgroup::TgTrackerLimiterGroup>& 
tracker_limiter_groups);
+
     // only for Type::QUERY or Type::LOAD.
     static TUniqueId label_to_queryid(const std::string& label) {
         if (label.rfind("Query#Id=", 0) != 0 && label.rfind("Load#Id=", 0) != 
0) {
@@ -170,12 +222,12 @@ public:
     std::string tracker_limit_exceeded_str();
     std::string tracker_limit_exceeded_str(int64_t bytes);
 
-    std::string debug_string() {
+    std::string debug_string() override {
         std::stringstream msg;
         msg << "limit: " << _limit << "; "
             << "consumption: " << _consumption->current_value() << "; "
             << "label: " << _label << "; "
-            << "type: " << TypeString[_type] << "; ";
+            << "type: " << type_string(_type) << "; ";
         return msg.str();
     }
 
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 0909702c70..7158f8b054 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -73,6 +73,9 @@ public:
                     MemTracker::print_bytes(query_mem_tracker->consumption()),
                     
MemTracker::print_bytes(query_mem_tracker->peak_consumption()));
         }
+        if (_task_group) {
+            _task_group->remove_mem_tracker_limiter(query_mem_tracker);
+        }
     }
 
     // Notice. For load fragments, the fragment_num sent by FE has a small 
probability of 0.
diff --git a/be/src/runtime/task_group/task_group.cpp 
b/be/src/runtime/task_group/task_group.cpp
index ebe775a485..16a6356f30 100644
--- a/be/src/runtime/task_group/task_group.cpp
+++ b/be/src/runtime/task_group/task_group.cpp
@@ -27,10 +27,19 @@
 #include <utility>
 
 #include "common/logging.h"
+#include "pipeline/task_scheduler.h"
+#include "runtime/exec_env.h"
+#include "runtime/memory/mem_tracker_limiter.h"
+#include "service/backend_options.h"
+#include "util/mem_info.h"
+#include "util/parse_util.h"
 
 namespace doris {
 namespace taskgroup {
 
+const static std::string CPU_SHARE = "cpu_share";
+const static std::string MEMORY_LIMIT = "memory_limit";
+
 pipeline::PipelineTask* TaskGroupEntity::take() {
     if (_queue.empty()) {
         return nullptr;
@@ -67,36 +76,76 @@ std::string TaskGroupEntity::debug_string() const {
                        cpu_share(), _queue.size(), _vruntime_ns);
 }
 
-TaskGroup::TaskGroup(uint64_t id, std::string name, uint64_t cpu_share, 
int64_t version)
-        : _id(id), _name(name), _cpu_share(cpu_share), _task_entity(this), 
_version(version) {}
+TaskGroup::TaskGroup(const TaskGroupInfo& tg_info)
+        : _id(tg_info.id),
+          _name(tg_info.name),
+          _cpu_share(tg_info.cpu_share),
+          _memory_limit(tg_info.memory_limit),
+          _version(tg_info.version),
+          _task_entity(this),
+          _mem_tracker_limiter_pool(MEM_TRACKER_GROUP_NUM) {}
 
 std::string TaskGroup::debug_string() const {
-    std::shared_lock<std::shared_mutex> rl {mutex};
-    return fmt::format("TG[id = {}, name = {}, cpu_share = {}, version = {}]", 
_id, _name,
-                       cpu_share(), _version);
-}
-
-bool TaskGroup::check_version(int64_t version) const {
-    std::shared_lock<std::shared_mutex> rl {mutex};
-    return version > _version;
+    std::shared_lock<std::shared_mutex> rl {_mutex};
+    return fmt::format("TG[id = {}, name = {}, cpu_share = {}, memory_limit = 
{}, version = {}]",
+                       _id, _name, cpu_share(), 
PrettyPrinter::print(_memory_limit, TUnit::BYTES),
+                       _version);
 }
 
 void TaskGroup::check_and_update(const TaskGroupInfo& tg_info) {
-    if (tg_info._id != _id) {
+    if (UNLIKELY(tg_info.id != _id)) {
         return;
     }
+    {
+        std::shared_lock<std::shared_mutex> rl {_mutex};
+        if (LIKELY(tg_info.version <= _version)) {
+            return;
+        }
+    }
+
+    std::lock_guard<std::shared_mutex> wl {_mutex};
+    if (tg_info.version > _version) {
+        _name = tg_info.name;
+        _version = tg_info.version;
+        _memory_limit = tg_info.memory_limit;
+        if (_cpu_share != tg_info.cpu_share) {
+            
ExecEnv::GetInstance()->pipeline_task_group_scheduler()->update_tg_cpu_share(
+                    tg_info, shared_from_this());
+        }
+    }
+}
+
+void TaskGroup::update_cpu_share_unlock(const TaskGroupInfo& tg_info) {
+    _cpu_share = tg_info.cpu_share;
+}
 
-    std::lock_guard<std::shared_mutex> wl {mutex};
-    if (tg_info._version > _version) {
-        _name = tg_info._name;
-        _cpu_share = tg_info._cpu_share;
-        _version = tg_info._version;
+void TaskGroup::add_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> 
mem_tracker_ptr) {
+    auto group_num = mem_tracker_ptr->group_num();
+    std::lock_guard<std::mutex> 
l(_mem_tracker_limiter_pool[group_num].group_lock);
+    _mem_tracker_limiter_pool[group_num].trackers.insert(mem_tracker_ptr);
+}
+
+void TaskGroup::remove_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> 
mem_tracker_ptr) {
+    auto group_num = mem_tracker_ptr->group_num();
+    std::lock_guard<std::mutex> 
l(_mem_tracker_limiter_pool[group_num].group_lock);
+    _mem_tracker_limiter_pool[group_num].trackers.erase(mem_tracker_ptr);
+}
+
+int64_t TaskGroup::memory_limit_gc() {
+    std::string name;
+    int64_t memory_limit;
+    {
+        std::shared_lock<std::shared_mutex> rl {_mutex};
+        name = _name;
+        memory_limit = _memory_limit;
     }
+    return MemTrackerLimiter::tg_memory_limit_gc(_id, name, memory_limit,
+                                                 _mem_tracker_limiter_pool);
 }
 
 Status TaskGroupInfo::parse_group_info(const TPipelineResourceGroup& 
resource_group,
                                        TaskGroupInfo* task_group_info) {
-    if (!check_group_info(resource_group)) {
+    if (UNLIKELY(!check_group_info(resource_group))) {
         std::stringstream ss;
         ss << "incomplete resource group parameters: ";
         resource_group.printTo(ss);
@@ -108,17 +157,31 @@ Status TaskGroupInfo::parse_group_info(const 
TPipelineResourceGroup& resource_gr
     uint64_t share = 0;
     std::from_chars(iter->second.c_str(), iter->second.c_str() + 
iter->second.size(), share);
 
-    task_group_info->_id = resource_group.id;
-    task_group_info->_name = resource_group.name;
-    task_group_info->_version = resource_group.version;
-    task_group_info->_cpu_share = share;
+    task_group_info->id = resource_group.id;
+    task_group_info->name = resource_group.name;
+    task_group_info->version = resource_group.version;
+    task_group_info->cpu_share = share;
+
+    bool is_percent = true;
+    auto mem_limit_str = resource_group.properties.find(MEMORY_LIMIT)->second;
+    auto mem_limit =
+            ParseUtil::parse_mem_spec(mem_limit_str, -1, MemInfo::mem_limit(), 
&is_percent);
+    if (UNLIKELY(mem_limit <= 0)) {
+        std::stringstream ss;
+        ss << "parse memory limit from TPipelineResourceGroup error, " << 
MEMORY_LIMIT << ": "
+           << mem_limit_str;
+        LOG(WARNING) << ss.str();
+        return Status::InternalError(ss.str());
+    }
+    task_group_info->memory_limit = mem_limit;
     return Status::OK();
 }
 
 bool TaskGroupInfo::check_group_info(const TPipelineResourceGroup& 
resource_group) {
     return resource_group.__isset.id && resource_group.__isset.version &&
            resource_group.__isset.name && resource_group.__isset.properties &&
-           resource_group.properties.count(CPU_SHARE) > 0;
+           resource_group.properties.count(CPU_SHARE) > 0 &&
+           resource_group.properties.count(MEMORY_LIMIT) > 0;
 }
 
 } // namespace taskgroup
diff --git a/be/src/runtime/task_group/task_group.h 
b/be/src/runtime/task_group/task_group.h
index e66ef2e1b0..a7854158d2 100644
--- a/be/src/runtime/task_group/task_group.h
+++ b/be/src/runtime/task_group/task_group.h
@@ -25,6 +25,7 @@
 #include <queue>
 #include <shared_mutex>
 #include <string>
+#include <unordered_set>
 
 #include "common/status.h"
 
@@ -35,14 +36,13 @@ class PipelineTask;
 }
 
 class TPipelineResourceGroup;
+class MemTrackerLimiter;
 
 namespace taskgroup {
 
 class TaskGroup;
 struct TaskGroupInfo;
 
-const static std::string CPU_SHARE = "cpu_share";
-
 class TaskGroupEntity {
 public:
     explicit TaskGroupEntity(taskgroup::TaskGroup* ts) : _tg(ts) {}
@@ -72,9 +72,14 @@ private:
 
 using TGEntityPtr = TaskGroupEntity*;
 
-class TaskGroup {
+struct TgTrackerLimiterGroup {
+    std::unordered_set<std::shared_ptr<MemTrackerLimiter>> trackers;
+    std::mutex group_lock;
+};
+
+class TaskGroup : public std::enable_shared_from_this<TaskGroup> {
 public:
-    TaskGroup(uint64_t id, std::string name, uint64_t cpu_share, int64_t 
version);
+    explicit TaskGroup(const TaskGroupInfo& tg_info);
 
     TaskGroupEntity* task_entity() { return &_task_entity; }
 
@@ -84,26 +89,36 @@ public:
 
     std::string debug_string() const;
 
-    bool check_version(int64_t version) const;
-
     void check_and_update(const TaskGroupInfo& tg_info);
 
+    void update_cpu_share_unlock(const TaskGroupInfo& tg_info);
+
+    void add_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> 
mem_tracker_ptr);
+
+    void remove_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> 
mem_tracker_ptr);
+
+    int64_t memory_limit_gc();
+
 private:
-    mutable std::shared_mutex mutex;
+    mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, 
_memory_limit
     const uint64_t _id;
     std::string _name;
     std::atomic<uint64_t> _cpu_share;
-    TaskGroupEntity _task_entity;
+    int64_t _memory_limit; // bytes
     int64_t _version;
+    TaskGroupEntity _task_entity;
+
+    std::vector<TgTrackerLimiterGroup> _mem_tracker_limiter_pool;
 };
 
 using TaskGroupPtr = std::shared_ptr<TaskGroup>;
 
 struct TaskGroupInfo {
-    uint64_t _id;
-    std::string _name;
-    uint64_t _cpu_share;
-    int64_t _version;
+    uint64_t id;
+    std::string name;
+    uint64_t cpu_share;
+    int64_t version;
+    int64_t memory_limit;
 
     static Status parse_group_info(const TPipelineResourceGroup& 
resource_group,
                                    TaskGroupInfo* task_group_info);
diff --git a/be/src/runtime/task_group/task_group_manager.cpp 
b/be/src/runtime/task_group/task_group_manager.cpp
index d393264983..c741a0bffd 100644
--- a/be/src/runtime/task_group/task_group_manager.cpp
+++ b/be/src/runtime/task_group/task_group_manager.cpp
@@ -20,6 +20,7 @@
 #include <memory>
 #include <mutex>
 
+#include "runtime/memory/mem_tracker_limiter.h"
 #include "runtime/task_group/task_group.h"
 
 namespace doris::taskgroup {
@@ -35,20 +36,38 @@ TaskGroupManager* TaskGroupManager::instance() {
 TaskGroupPtr TaskGroupManager::get_or_create_task_group(const TaskGroupInfo& 
task_group_info) {
     {
         std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
-        if (_task_groups.count(task_group_info._id)) {
-            return _task_groups[task_group_info._id];
+        if (LIKELY(_task_groups.count(task_group_info.id))) {
+            auto task_group = _task_groups[task_group_info.id];
+            task_group->check_and_update(task_group_info);
+            return task_group;
         }
     }
 
-    auto new_task_group =
-            std::make_shared<TaskGroup>(task_group_info._id, 
task_group_info._name,
-                                        task_group_info._cpu_share, 
task_group_info._version);
+    auto new_task_group = std::make_shared<TaskGroup>(task_group_info);
     std::lock_guard<std::shared_mutex> w_lock(_group_mutex);
-    if (_task_groups.count(task_group_info._id)) {
-        return _task_groups[task_group_info._id];
+    if (_task_groups.count(task_group_info.id)) {
+        auto task_group = _task_groups[task_group_info.id];
+        task_group->check_and_update(task_group_info);
+        return task_group;
     }
-    _task_groups[task_group_info._id] = new_task_group;
+    _task_groups[task_group_info.id] = new_task_group;
     return new_task_group;
 }
 
+int64_t TaskGroupManager::memory_limit_gc() {
+    int64_t total_free_memory = 0;
+    std::vector<TaskGroupPtr> task_groups;
+    {
+        std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
+        task_groups.reserve(_task_groups.size());
+        for (const auto& [id, task_group] : _task_groups) {
+            task_groups.push_back(task_group);
+        }
+    }
+    for (const auto& task_group : task_groups) {
+        total_free_memory += task_group->memory_limit_gc();
+    }
+    return total_free_memory;
+}
+
 } // namespace doris::taskgroup
diff --git a/be/src/runtime/task_group/task_group_manager.h 
b/be/src/runtime/task_group/task_group_manager.h
index 0f415498a0..c053a3a45d 100644
--- a/be/src/runtime/task_group/task_group_manager.h
+++ b/be/src/runtime/task_group/task_group_manager.h
@@ -33,6 +33,8 @@ public:
 
     TaskGroupPtr get_or_create_task_group(const TaskGroupInfo& 
task_group_info);
 
+    int64_t memory_limit_gc();
+
 private:
     std::shared_mutex _group_mutex;
     std::unordered_map<uint64_t, TaskGroupPtr> _task_groups;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 3a02eb67c0..4531535983 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -871,10 +871,6 @@ public class StmtExecutor {
                 || (parsedStmt instanceof InsertStmt && !((InsertStmt) 
parsedStmt).needLoadManager())
                 || parsedStmt instanceof CreateTableAsSelectStmt
                 || parsedStmt instanceof InsertOverwriteTableStmt) {
-            if (Config.enable_resource_group && 
context.sessionVariable.enablePipelineEngine()) {
-                
analyzer.setResourceGroups(analyzer.getEnv().getResourceGroupMgr()
-                        
.getResourceGroup(context.sessionVariable.resourceGroup));
-            }
             Map<Long, TableIf> tableMap = Maps.newTreeMap();
             QueryStmt queryStmt;
             Set<String> parentViewNameSet = Sets.newHashSet();
@@ -1060,6 +1056,11 @@ public class StmtExecutor {
                     parsedStmt.setIsExplain(explainOptions);
                 }
             }
+            if (parsedStmt instanceof QueryStmt && Config.enable_resource_group
+                    && context.sessionVariable.enablePipelineEngine()) {
+                
analyzer.setResourceGroups(analyzer.getEnv().getResourceGroupMgr()
+                        
.getResourceGroup(context.sessionVariable.resourceGroup));
+            }
         }
         profile.getSummaryProfile().setQueryAnalysisFinishTime();
         planner = new OriginalPlanner(analyzer);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroup.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroup.java
index 538c064247..b859b95752 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroup.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroup.java
@@ -30,6 +30,8 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.gson.annotations.SerializedName;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -38,14 +40,17 @@ import java.util.HashMap;
 import java.util.Map;
 
 public class ResourceGroup implements Writable {
+    private static final Logger LOG = 
LogManager.getLogger(ResourceGroup.class);
 
     public static final String CPU_SHARE = "cpu_share";
 
+    public static final String MEMORY_LIMIT = "memory_limit";
+
     private static final ImmutableSet<String> REQUIRED_PROPERTIES_NAME = new 
ImmutableSet.Builder<String>().add(
-            CPU_SHARE).build();
+            CPU_SHARE).add(MEMORY_LIMIT).build();
 
     private static final ImmutableSet<String> ALL_PROPERTIES_NAME = new 
ImmutableSet.Builder<String>().add(
-            CPU_SHARE).build();
+            CPU_SHARE).add(MEMORY_LIMIT).build();
 
     @SerializedName(value = "id")
     private long id;
@@ -60,11 +65,10 @@ public class ResourceGroup implements Writable {
     @SerializedName(value = "version")
     private long version;
 
+    private double memoryLimitPercent;
+
     private ResourceGroup(long id, String name, Map<String, String> 
properties) {
-        this.id = id;
-        this.name = name;
-        this.properties = properties;
-        this.version = 0;
+        this(id, name, properties, 0);
     }
 
     private ResourceGroup(long id, String name, Map<String, String> 
properties, long version) {
@@ -72,6 +76,8 @@ public class ResourceGroup implements Writable {
         this.name = name;
         this.properties = properties;
         this.version = version;
+        String memoryLimitString = properties.get(MEMORY_LIMIT);
+        this.memoryLimitPercent = 
Double.parseDouble(memoryLimitString.substring(0, memoryLimitString.length() - 
1));
     }
 
     public static ResourceGroup create(String name, Map<String, String> 
properties) throws DdlException {
@@ -79,10 +85,9 @@ public class ResourceGroup implements Writable {
         return new ResourceGroup(Env.getCurrentEnv().getNextId(), name, 
properties);
     }
 
-    public static ResourceGroup create(ResourceGroup resourceGroup, 
Map<String, String> updateProperties)
+    public static ResourceGroup copyAndUpdate(ResourceGroup resourceGroup, 
Map<String, String> updateProperties)
             throws DdlException {
-        Map<String, String> newProperties = new HashMap<>();
-        newProperties.putAll(resourceGroup.getProperties());
+        Map<String, String> newProperties = new 
HashMap<>(resourceGroup.getProperties());
         for (Map.Entry<String, String> kv : updateProperties.entrySet()) {
             if (!Strings.isNullOrEmpty(kv.getValue())) {
                 newProperties.put(kv.getKey(), kv.getValue());
@@ -108,7 +113,21 @@ public class ResourceGroup implements Writable {
 
         String cpuSchedulingWeight = properties.get(CPU_SHARE);
         if (!StringUtils.isNumeric(cpuSchedulingWeight) || 
Long.parseLong(cpuSchedulingWeight) <= 0) {
-            throw new DdlException(CPU_SHARE + " requires a positive 
integer.");
+            throw new DdlException(CPU_SHARE + " " + cpuSchedulingWeight + " 
requires a positive integer.");
+        }
+
+        String memoryLimit = properties.get(MEMORY_LIMIT);
+        if (!memoryLimit.endsWith("%")) {
+            throw new DdlException(MEMORY_LIMIT + " " + memoryLimit + " 
requires a percentage and ends with a '%'");
+        }
+        String memLimitErr = MEMORY_LIMIT + " " + memoryLimit + " requires a 
positive floating point number.";
+        try {
+            if (Double.parseDouble(memoryLimit.substring(0, 
memoryLimit.length() - 1)) <= 0) {
+                throw new DdlException(memLimitErr);
+            }
+        } catch (NumberFormatException  e) {
+            LOG.debug(memLimitErr, e);
+            throw new DdlException(memLimitErr);
         }
     }
 
@@ -128,6 +147,10 @@ public class ResourceGroup implements Writable {
         return version;
     }
 
+    public double getMemoryLimitPercent() {
+        return memoryLimitPercent;
+    }
+
     public void getProcNodeData(BaseProcResult result) {
         for (Map.Entry<String, String> entry : properties.entrySet()) {
             result.addRow(Lists.newArrayList(String.valueOf(id), name, 
entry.getKey(), entry.getValue()));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroupMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroupMgr.java
index d4e824365c..d4d22790d7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroupMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/resourcegroup/ResourceGroupMgr.java
@@ -46,6 +46,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public class ResourceGroupMgr implements Writable, GsonPostProcessable {
@@ -122,6 +123,7 @@ public class ResourceGroupMgr implements Writable, 
GsonPostProcessable {
             }
             Map<String, String> properties = Maps.newHashMap();
             properties.put(ResourceGroup.CPU_SHARE, "10");
+            properties.put(ResourceGroup.MEMORY_LIMIT, "100%");
             defaultResourceGroup = ResourceGroup.create(DEFAULT_GROUP_NAME, 
properties);
             nameToResourceGroup.put(DEFAULT_GROUP_NAME, defaultResourceGroup);
             idToResourceGroup.put(defaultResourceGroup.getId(), 
defaultResourceGroup);
@@ -141,12 +143,14 @@ public class ResourceGroupMgr implements Writable, 
GsonPostProcessable {
         String resourceGroupName = resourceGroup.getName();
         writeLock();
         try {
-            if (nameToResourceGroup.putIfAbsent(resourceGroupName, 
resourceGroup) != null) {
+            if (nameToResourceGroup.containsKey(resourceGroupName)) {
                 if (stmt.isIfNotExists()) {
                     return;
                 }
                 throw new DdlException("Resource group " + resourceGroupName + 
" already exist");
             }
+            checkGlobalUnlock(resourceGroup, null);
+            nameToResourceGroup.put(resourceGroupName, resourceGroup);
             idToResourceGroup.put(resourceGroup.getId(), resourceGroup);
             
Env.getCurrentEnv().getEditLog().logCreateResourceGroup(resourceGroup);
         } finally {
@@ -155,6 +159,18 @@ public class ResourceGroupMgr implements Writable, 
GsonPostProcessable {
         LOG.info("Create resource group success: {}", resourceGroup);
     }
 
+    private void checkGlobalUnlock(ResourceGroup resourceGroup, ResourceGroup 
old) throws DdlException {
+        double totalMemoryLimit = 
idToResourceGroup.values().stream().mapToDouble(ResourceGroup::getMemoryLimitPercent)
+                .sum() + resourceGroup.getMemoryLimitPercent();
+        if (!Objects.isNull(old)) {
+            totalMemoryLimit -= old.getMemoryLimitPercent();
+        }
+        if (totalMemoryLimit > 100.0 + 1e-6) {
+            throw new DdlException(
+                    "The sum of all resource group " + 
ResourceGroup.MEMORY_LIMIT + " cannot be greater than 100.0%.");
+        }
+    }
+
     public void alterResourceGroup(AlterResourceGroupStmt stmt) throws 
DdlException {
         checkResourceGroupEnabled();
 
@@ -167,7 +183,8 @@ public class ResourceGroupMgr implements Writable, 
GsonPostProcessable {
                 throw new DdlException("Resource Group(" + resourceGroupName + 
") does not exist.");
             }
             ResourceGroup resourceGroup = 
nameToResourceGroup.get(resourceGroupName);
-            newResourceGroup = ResourceGroup.create(resourceGroup, properties);
+            newResourceGroup = ResourceGroup.copyAndUpdate(resourceGroup, 
properties);
+            checkGlobalUnlock(newResourceGroup, resourceGroup);
             nameToResourceGroup.put(resourceGroupName, newResourceGroup);
             idToResourceGroup.put(newResourceGroup.getId(), newResourceGroup);
             
Env.getCurrentEnv().getEditLog().logAlterResourceGroup(newResourceGroup);
@@ -181,7 +198,7 @@ public class ResourceGroupMgr implements Writable, 
GsonPostProcessable {
         checkResourceGroupEnabled();
 
         String resourceGroupName = stmt.getResourceGroupName();
-        if (resourceGroupName == DEFAULT_GROUP_NAME) {
+        if (DEFAULT_GROUP_NAME.equals(resourceGroupName)) {
             throw new DdlException("Dropping default resource group " + 
resourceGroupName + " is not allowed");
         }
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/resource/resourcegroup/ResourceGroupMgrTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/resource/resourcegroup/ResourceGroupMgrTest.java
index 4a0f18914a..f6562f584a 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/resource/resourcegroup/ResourceGroupMgrTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/resource/resourcegroup/ResourceGroupMgrTest.java
@@ -82,6 +82,7 @@ public class ResourceGroupMgrTest {
         ResourceGroupMgr resourceGroupMgr = new ResourceGroupMgr();
         Map<String, String> properties1 = Maps.newHashMap();
         properties1.put(ResourceGroup.CPU_SHARE, "10");
+        properties1.put(ResourceGroup.MEMORY_LIMIT, "30%");
         String name1 = "g1";
         CreateResourceGroupStmt stmt1 = new CreateResourceGroupStmt(false, 
name1, properties1);
         resourceGroupMgr.createResourceGroup(stmt1);
@@ -98,6 +99,7 @@ public class ResourceGroupMgrTest {
 
         Map<String, String> properties2 = Maps.newHashMap();
         properties2.put(ResourceGroup.CPU_SHARE, "20");
+        properties2.put(ResourceGroup.MEMORY_LIMIT, "30%");
         String name2 = "g2";
         CreateResourceGroupStmt stmt2 = new CreateResourceGroupStmt(false, 
name2, properties2);
         resourceGroupMgr.createResourceGroup(stmt2);
@@ -129,6 +131,7 @@ public class ResourceGroupMgrTest {
         ResourceGroupMgr resourceGroupMgr = new ResourceGroupMgr();
         Map<String, String> properties1 = Maps.newHashMap();
         properties1.put(ResourceGroup.CPU_SHARE, "10");
+        properties1.put(ResourceGroup.MEMORY_LIMIT, "30%");
         String name1 = "g1";
         CreateResourceGroupStmt stmt1 = new CreateResourceGroupStmt(false, 
name1, properties1);
         resourceGroupMgr.createResourceGroup(stmt1);
@@ -152,6 +155,7 @@ public class ResourceGroupMgrTest {
         ResourceGroupMgr resourceGroupMgr = new ResourceGroupMgr();
         Map<String, String> properties = Maps.newHashMap();
         properties.put(ResourceGroup.CPU_SHARE, "10");
+        properties.put(ResourceGroup.MEMORY_LIMIT, "30%");
         String name = "g1";
         CreateResourceGroupStmt createStmt = new 
CreateResourceGroupStmt(false, name, properties);
         resourceGroupMgr.createResourceGroup(createStmt);
@@ -188,11 +192,13 @@ public class ResourceGroupMgrTest {
         }
 
         properties.put(ResourceGroup.CPU_SHARE, "10");
+        properties.put(ResourceGroup.MEMORY_LIMIT, "30%");
         CreateResourceGroupStmt createStmt = new 
CreateResourceGroupStmt(false, name, properties);
         resourceGroupMgr.createResourceGroup(createStmt);
 
         Map<String, String> newProperties = Maps.newHashMap();
         newProperties.put(ResourceGroup.CPU_SHARE, "5");
+        newProperties.put(ResourceGroup.MEMORY_LIMIT, "30%");
         AlterResourceGroupStmt stmt2 = new AlterResourceGroupStmt(name, 
newProperties);
         resourceGroupMgr.alterResourceGroup(stmt2);
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/resource/resourcegroup/ResourceGroupTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/resource/resourcegroup/ResourceGroupTest.java
index 978d28b8f1..9f174e201c 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/resource/resourcegroup/ResourceGroupTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/resource/resourcegroup/ResourceGroupTest.java
@@ -20,7 +20,6 @@ package org.apache.doris.resource.resourcegroup;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.proc.BaseProcResult;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.junit.Assert;
 import org.junit.Test;
@@ -34,17 +33,20 @@ public class ResourceGroupTest {
     public void testCreateNormal() throws DdlException {
         Map<String, String> properties1 = Maps.newHashMap();
         properties1.put(ResourceGroup.CPU_SHARE, "10");
+        properties1.put(ResourceGroup.MEMORY_LIMIT, "30%");
         String name1 = "g1";
         ResourceGroup group1 = ResourceGroup.create(name1, properties1);
         Assert.assertEquals(name1, group1.getName());
-        Assert.assertEquals(1, group1.getProperties().size());
+        Assert.assertEquals(2, group1.getProperties().size());
         
Assert.assertTrue(group1.getProperties().containsKey(ResourceGroup.CPU_SHARE));
+        Assert.assertTrue(Math.abs(group1.getMemoryLimitPercent() - 30) < 
1e-6);
     }
 
     @Test(expected = DdlException.class)
     public void testNotSupportProperty() throws DdlException {
         Map<String, String> properties1 = Maps.newHashMap();
         properties1.put(ResourceGroup.CPU_SHARE, "10");
+        properties1.put(ResourceGroup.MEMORY_LIMIT, "30%");
         properties1.put("share", "10");
         String name1 = "g1";
         ResourceGroup.create(name1, properties1);
@@ -61,12 +63,13 @@ public class ResourceGroupTest {
     public void testCpuShareValue() {
         Map<String, String> properties1 = Maps.newHashMap();
         properties1.put(ResourceGroup.CPU_SHARE, "0");
+        properties1.put(ResourceGroup.MEMORY_LIMIT, "30%");
         String name1 = "g1";
         try {
             ResourceGroup.create(name1, properties1);
             Assert.fail();
         } catch (DdlException e) {
-            Assert.assertTrue(e.getMessage().contains(ResourceGroup.CPU_SHARE 
+ " requires a positive integer."));
+            Assert.assertTrue(e.getMessage().contains("requires a positive 
integer."));
         }
 
         properties1.put(ResourceGroup.CPU_SHARE, "cpu");
@@ -74,7 +77,7 @@ public class ResourceGroupTest {
             ResourceGroup.create(name1, properties1);
             Assert.fail();
         } catch (DdlException e) {
-            Assert.assertTrue(e.getMessage().contains(ResourceGroup.CPU_SHARE 
+ " requires a positive integer."));
+            Assert.assertTrue(e.getMessage().contains("requires a positive 
integer."));
         }
     }
 
@@ -82,19 +85,13 @@ public class ResourceGroupTest {
     public void testGetProcNodeData() throws DdlException {
         Map<String, String> properties1 = Maps.newHashMap();
         properties1.put(ResourceGroup.CPU_SHARE, "10");
+        properties1.put(ResourceGroup.MEMORY_LIMIT, "30%");
         String name1 = "g1";
         ResourceGroup group1 = ResourceGroup.create(name1, properties1);
 
         BaseProcResult result = new BaseProcResult();
         group1.getProcNodeData(result);
         List<List<String>> rows = result.getRows();
-        Assert.assertEquals(1, rows.size());
-        List<List<String>> expectedRows = Lists.newArrayList();
-        expectedRows.add(Lists.newArrayList(String.valueOf(group1.getId()), 
name1, ResourceGroup.CPU_SHARE, "10"));
-        for (int i = 0; i < expectedRows.size(); ++i) {
-            for (int j = 0; j < expectedRows.get(i).size(); ++j) {
-                Assert.assertEquals(expectedRows.get(i).get(j), 
rows.get(i).get(j));
-            }
-        }
+        Assert.assertEquals(2, rows.size());
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to