xinyiZzz commented on code in PR #47462:
URL: https://github.com/apache/doris/pull/47462#discussion_r1967593129


##########
be/src/runtime/workload_group/workload_group_manager.cpp:
##########
@@ -287,6 +257,642 @@ void WorkloadGroupMgr::refresh_workload_group_metrics() {
     }
 }
 
+void WorkloadGroupMgr::add_paused_query(const std::shared_ptr<QueryContext>& 
query_ctx,
+                                        int64_t reserve_size, const Status& 
status) {
+    DCHECK(query_ctx != nullptr);
+    query_ctx->update_paused_reason(status);
+    query_ctx->set_low_memory_mode();
+    query_ctx->set_memory_sufficient(false);
+    std::lock_guard<std::mutex> lock(_paused_queries_lock);
+    auto wg = query_ctx->workload_group();
+    auto&& [it, inserted] = _paused_queries_list[wg].emplace(
+            query_ctx, 
doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted,
+            doris::GlobalMemoryArbitrator::any_workload_group_exceed_limit, 
reserve_size);
+    // Check if this is an invalid reserve, for example, if the reserve size 
is too large, larger than the query limit
+    // if hard limit is enabled, then not need enable other queries hard limit.
+    if (inserted) {
+        LOG(INFO) << "Insert one new paused query: " << 
query_ctx->debug_string()
+                  << ", workload group: " << wg->debug_string();
+    }
+}
+
+/**
+ * Strategy 1: A revocable query should not have any running 
task(PipelineTask).
+ * strategy 2: If the workload group has any task exceed workload group 
memlimit, then set all queryctx's memlimit
+ * strategy 3: If any query exceed process memlimit, then should clear all 
caches.
+ * strategy 4: If any query exceed query's memlimit, then do spill disk or 
cancel it.
+ * strategy 5: If any query exceed process's memlimit and cache is zero, then 
do following:
+ */
+void WorkloadGroupMgr::handle_paused_queries() {
+    {
+        std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
+        for (auto& [wg_id, wg] : _workload_groups) {
+            std::unique_lock<std::mutex> lock(_paused_queries_lock);
+            if (_paused_queries_list[wg].empty()) {
+                // Add an empty set to wg that not contains paused queries.
+            }
+        }
+    }
+
+    std::unique_lock<std::mutex> lock(_paused_queries_lock);
+    bool has_revoked_from_other_group = false;
+    bool has_query_exceed_process_memlimit = false;
+    for (auto it = _paused_queries_list.begin(); it != 
_paused_queries_list.end();) {
+        auto& queries_list = it->second;
+        auto query_count = queries_list.size();
+        const auto& wg = it->first;
+
+        if (query_count != 0) {
+            LOG_EVERY_T(INFO, 1) << "Paused queries count of wg " << 
wg->name() << ": "
+                                 << query_count;
+        }
+
+        bool has_changed_hard_limit = false;
+        int64_t flushed_memtable_bytes = 0;
+        // If the query is paused because its limit exceed the query itself's 
memlimit, then just spill disk.
+        // The query's memlimit is set using slot mechanism and its value is 
set using the user settings, not
+        // by weighted value. So if reserve failed, then it is actually exceed 
limit.
+        for (auto query_it = queries_list.begin(); query_it != 
queries_list.end();) {
+            auto query_ctx = query_it->query_ctx_.lock();
+            // The query is finished during in paused list.
+            if (query_ctx == nullptr) {
+                LOG(INFO) << "Query: " << query_it->query_id() << " is 
nullptr, erase it.";
+                query_it = queries_list.erase(query_it);
+                continue;
+            }
+            if (query_ctx->is_cancelled()) {
+                LOG(INFO) << "Query: " << print_id(query_ctx->query_id())
+                          << " was canceled, remove from paused list";
+                query_it = queries_list.erase(query_it);
+                continue;
+            }
+
+            if 
(query_ctx->paused_reason().is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) {
+                // Streamload, kafka load, group commit will never have query 
memory exceeded error because
+                // their  query limit is very large.
+                bool spill_res =
+                        handle_single_query_(query_ctx, 
query_it->reserve_size_,
+                                             query_it->elapsed_time(), 
query_ctx->paused_reason());
+                if (!spill_res) {
+                    ++query_it;
+                    continue;
+                } else {
+                    VLOG_DEBUG << "Query: " << print_id(query_ctx->query_id())
+                               << " remove from paused list";
+                    query_it = queries_list.erase(query_it);
+                    continue;
+                }
+            } else if 
(query_ctx->paused_reason().is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) {
+                // Only deal with non overcommit workload group.
+                if (wg->enable_memory_overcommit()) {
+                    // Soft limit wg will only reserve failed when process 
limit exceed. But in some corner case,
+                    // when reserve, the wg is hard limit, the query reserve 
failed, but when this loop run
+                    // the wg is converted to soft limit.
+                    // So that should resume the query.
+                    LOG(WARNING)
+                            << "Query: " << print_id(query_ctx->query_id())
+                            << " reserve memory failed because exceed workload 
group memlimit, it "
+                               "should not happen, resume it again. paused 
reason: "
+                            << query_ctx->paused_reason();
+                    query_ctx->set_memory_sufficient(true);
+                    query_it = queries_list.erase(query_it);
+                    continue;
+                }
+                // check if the reserve is too large, if it is too large,
+                // should set the query's limit only.
+                // Check the query's reserve with expected limit.
+                if (query_ctx->adjusted_mem_limit() <
+                    query_ctx->get_mem_tracker()->consumption() + 
query_it->reserve_size_) {
+                    query_ctx->set_mem_limit(query_ctx->adjusted_mem_limit());
+                    query_ctx->set_memory_sufficient(true);
+                    LOG(INFO) << "Workload group memory reserve failed because 
"
+                              << query_ctx->debug_string() << " reserve size "
+                              << 
PrettyPrinter::print_bytes(query_it->reserve_size_)
+                              << " is too large, set hard limit to "
+                              << 
PrettyPrinter::print_bytes(query_ctx->adjusted_mem_limit())
+                              << " and resume running.";
+                    query_it = queries_list.erase(query_it);
+                    continue;
+                }
+                if (flushed_memtable_bytes <= 0) {
+                    flushed_memtable_bytes =
+                            flush_memtable_from_current_group_(wg, 
query_it->reserve_size_);
+                }
+                if (flushed_memtable_bytes > 0) {
+                    // Flushed some memtable, just wait flush finished and not 
do anything more.
+                    wg->enable_write_buffer_limit(true);
+                    ++query_it;
+                    continue;
+                }
+                if (!has_changed_hard_limit) {
+                    update_queries_limit_(wg, true);
+                    has_changed_hard_limit = true;
+                    LOG(INFO) << "Query: " << print_id(query_ctx->query_id()) 
<< " reserve memory("
+                              << 
PrettyPrinter::print_bytes(query_it->reserve_size_)
+                              << ") failed due to workload group memory 
exceed, "
+                                 "should set the workload group work in memory 
insufficent mode, "
+                                 "so that other query will reduce their 
memory."
+                              << " Query mem limit: "
+                              << 
PrettyPrinter::print_bytes(query_ctx->get_mem_limit())
+                              << " mem usage: "
+                              << PrettyPrinter::print_bytes(
+                                         
query_ctx->get_mem_tracker()->consumption())
+                              << ", wg: " << wg->debug_string();
+                }
+                if (wg->slot_memory_policy() == TWgSlotMemoryPolicy::NONE) {
+                    // If not enable slot memory policy, then should spill 
directly
+                    // Maybe there are another query that use too much memory, 
but we
+                    // not encourage not enable slot memory.
+                    // TODO should kill the query that exceed limit.
+                    bool spill_res = handle_single_query_(query_ctx, 
query_it->reserve_size_,
+                                                          
query_it->elapsed_time(),
+                                                          
query_ctx->paused_reason());
+                    if (!spill_res) {
+                        ++query_it;
+                        continue;
+                    } else {
+                        VLOG_DEBUG << "Query: " << 
print_id(query_ctx->query_id())
+                                   << " remove from paused list";
+                        query_it = queries_list.erase(query_it);
+                        continue;
+                    }
+                } else {
+                    // Should not put the query back to task scheduler 
immediately, because when wg's memory not sufficient,
+                    // and then set wg's flag, other query may not free memory 
very quickly.
+                    if (query_it->elapsed_time() > 
config::spill_in_paused_queue_timeout_ms) {
+                        // set wg's memory to insufficent, then add it back to 
task scheduler to run.
+                        LOG(INFO) << "Query: " << 
print_id(query_ctx->query_id())
+                                  << " will be resume.";
+                        query_ctx->set_memory_sufficient(true);
+                        query_it = queries_list.erase(query_it);
+                        continue;
+                    } else {
+                        ++query_it;
+                        continue;
+                    }
+                }
+            } else {
+                has_query_exceed_process_memlimit = true;
+                // If wg's memlimit not exceed, but process memory exceed, it 
means cache or other metadata
+                // used too much memory. Should clean all cache here.
+                // 1. Check cache used, if cache is larger than > 0, then just 
return and wait for it to 0 to release some memory.
+                if 
(doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted >
+                            0.05 &&
+                    
doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted >
+                            0.05) {
+                    
doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted =
+                            0.04;
+                    
doris::GlobalMemoryArbitrator::notify_cache_adjust_capacity();
+                    LOG(INFO) << "There are some queries need process memory, 
so that set cache "
+                                 "capacity "
+                                 "to 0 now";
+                }
+                // need to check config::disable_memory_gc here, if not, when 
config::disable_memory_gc == true,
+                // cache is not adjusted, query_it->cache_ratio_ will always 
be 1, and this if branch will nenver
+                // execute, this query will never be resumed, and will 
deadlock here
+                if ((!config::disable_memory_gc && query_it->cache_ratio_ < 
0.05) ||
+                    config::disable_memory_gc) {
+                    // 1. Check if could revoke some memory from memtable
+                    if (flushed_memtable_bytes <= 0) {
+                        flushed_memtable_bytes =
+                                flush_memtable_from_current_group_(wg, 
query_it->reserve_size_);
+                    }
+                    if (flushed_memtable_bytes > 0) {
+                        // Flushed some memtable, just wait flush finished and 
not do anything more.
+                        ++query_it;
+                        continue;
+                    }
+                    // TODO should wait here to check if the process has 
release revoked_size memory and then continue.
+                    if (!has_revoked_from_other_group) {
+                        int64_t revoked_size = revoke_memory_from_other_group_(
+                                query_ctx, wg->enable_memory_overcommit(), 
query_it->reserve_size_);
+                        if (revoked_size > 0) {
+                            has_revoked_from_other_group = true;
+                            query_ctx->set_memory_sufficient(true);
+                            VLOG_DEBUG << "Query: " << 
print_id(query_ctx->query_id())
+                                       << " is resumed after revoke memory 
from other group.";
+                            query_it = queries_list.erase(query_it);
+                            // Do not care if the revoked_size > reserve size, 
and try to run again.
+                            continue;
+                        } else {
+                            bool spill_res = handle_single_query_(
+                                    query_ctx, query_it->reserve_size_, 
query_it->elapsed_time(),
+                                    query_ctx->paused_reason());
+                            if (spill_res) {
+                                VLOG_DEBUG << "Query: " << 
print_id(query_ctx->query_id())
+                                           << " remove from paused list";
+                                query_it = queries_list.erase(query_it);
+                                continue;
+                            } else {
+                                ++query_it;
+                                continue;
+                            }
+                        }
+                    } else {
+                        // If any query is cancelled during process limit 
stage, should resume other query and
+                        // do not do any check now.
+                        query_ctx->set_memory_sufficient(true);
+                        VLOG_DEBUG << "Query: " << 
print_id(query_ctx->query_id())
+                                   << " remove from paused list";
+                        query_it = queries_list.erase(query_it);
+                        continue;
+                    }
+                }
+                if 
(doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted <
+                            0.05 &&
+                    query_it->cache_ratio_ > 0.05) {
+                    LOG(INFO) << "Query: " << print_id(query_ctx->query_id())
+                              << " will be resume after cache adjust.";
+                    query_ctx->set_memory_sufficient(true);
+                    query_it = queries_list.erase(query_it);
+                    continue;
+                }
+                ++query_it;
+            }
+        }
+
+        bool is_low_watermark = false;
+        bool is_high_watermark = false;
+        wg->check_mem_used(&is_low_watermark, &is_high_watermark);
+        // Not need waiting flush memtable and below low watermark disable 
load buffer limit
+        if (flushed_memtable_bytes <= 0 && !is_low_watermark) {
+            wg->enable_write_buffer_limit(false);
+        }
+
+        if (queries_list.empty()) {
+            it = _paused_queries_list.erase(it);
+            continue;
+        } else {
+            // Finished deal with one workload group, and should deal with 
next one.
+            ++it;
+        }
+    }
+
+    if (has_query_exceed_process_memlimit) {

Review Comment:
   这里应该是 `if (!has_query_exceed_process_memlimit)` 吧,取反



##########
be/src/runtime/workload_group/workload_group_manager.cpp:
##########
@@ -287,6 +257,642 @@ void WorkloadGroupMgr::refresh_workload_group_metrics() {
     }
 }
 
+void WorkloadGroupMgr::add_paused_query(const std::shared_ptr<QueryContext>& 
query_ctx,
+                                        int64_t reserve_size, const Status& 
status) {
+    DCHECK(query_ctx != nullptr);
+    query_ctx->update_paused_reason(status);
+    query_ctx->set_low_memory_mode();
+    query_ctx->set_memory_sufficient(false);
+    std::lock_guard<std::mutex> lock(_paused_queries_lock);
+    auto wg = query_ctx->workload_group();
+    auto&& [it, inserted] = _paused_queries_list[wg].emplace(
+            query_ctx, 
doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted,
+            doris::GlobalMemoryArbitrator::any_workload_group_exceed_limit, 
reserve_size);
+    // Check if this is an invalid reserve, for example, if the reserve size 
is too large, larger than the query limit
+    // if hard limit is enabled, then not need enable other queries hard limit.
+    if (inserted) {
+        LOG(INFO) << "Insert one new paused query: " << 
query_ctx->debug_string()
+                  << ", workload group: " << wg->debug_string();
+    }
+}
+
+/**
+ * Strategy 1: A revocable query should not have any running 
task(PipelineTask).
+ * strategy 2: If the workload group has any task exceed workload group 
memlimit, then set all queryctx's memlimit
+ * strategy 3: If any query exceed process memlimit, then should clear all 
caches.
+ * strategy 4: If any query exceed query's memlimit, then do spill disk or 
cancel it.
+ * strategy 5: If any query exceed process's memlimit and cache is zero, then 
do following:
+ */
+void WorkloadGroupMgr::handle_paused_queries() {
+    {
+        std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
+        for (auto& [wg_id, wg] : _workload_groups) {
+            std::unique_lock<std::mutex> lock(_paused_queries_lock);
+            if (_paused_queries_list[wg].empty()) {
+                // Add an empty set to wg that not contains paused queries.
+            }
+        }
+    }
+
+    std::unique_lock<std::mutex> lock(_paused_queries_lock);
+    bool has_revoked_from_other_group = false;
+    bool has_query_exceed_process_memlimit = false;
+    for (auto it = _paused_queries_list.begin(); it != 
_paused_queries_list.end();) {
+        auto& queries_list = it->second;
+        auto query_count = queries_list.size();
+        const auto& wg = it->first;
+
+        if (query_count != 0) {
+            LOG_EVERY_T(INFO, 1) << "Paused queries count of wg " << 
wg->name() << ": "
+                                 << query_count;
+        }
+
+        bool has_changed_hard_limit = false;
+        int64_t flushed_memtable_bytes = 0;
+        // If the query is paused because its limit exceed the query itself's 
memlimit, then just spill disk.
+        // The query's memlimit is set using slot mechanism and its value is 
set using the user settings, not
+        // by weighted value. So if reserve failed, then it is actually exceed 
limit.
+        for (auto query_it = queries_list.begin(); query_it != 
queries_list.end();) {
+            auto query_ctx = query_it->query_ctx_.lock();
+            // The query is finished during in paused list.
+            if (query_ctx == nullptr) {
+                LOG(INFO) << "Query: " << query_it->query_id() << " is 
nullptr, erase it.";
+                query_it = queries_list.erase(query_it);
+                continue;
+            }
+            if (query_ctx->is_cancelled()) {
+                LOG(INFO) << "Query: " << print_id(query_ctx->query_id())
+                          << " was canceled, remove from paused list";
+                query_it = queries_list.erase(query_it);
+                continue;
+            }
+
+            if 
(query_ctx->paused_reason().is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) {
+                // Streamload, kafka load, group commit will never have query 
memory exceeded error because
+                // their  query limit is very large.
+                bool spill_res =
+                        handle_single_query_(query_ctx, 
query_it->reserve_size_,
+                                             query_it->elapsed_time(), 
query_ctx->paused_reason());
+                if (!spill_res) {
+                    ++query_it;
+                    continue;
+                } else {
+                    VLOG_DEBUG << "Query: " << print_id(query_ctx->query_id())
+                               << " remove from paused list";
+                    query_it = queries_list.erase(query_it);
+                    continue;
+                }
+            } else if 
(query_ctx->paused_reason().is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) {
+                // Only deal with non overcommit workload group.
+                if (wg->enable_memory_overcommit()) {
+                    // Soft limit wg will only reserve failed when process 
limit exceed. But in some corner case,
+                    // when reserve, the wg is hard limit, the query reserve 
failed, but when this loop run
+                    // the wg is converted to soft limit.
+                    // So that should resume the query.
+                    LOG(WARNING)
+                            << "Query: " << print_id(query_ctx->query_id())
+                            << " reserve memory failed because exceed workload 
group memlimit, it "
+                               "should not happen, resume it again. paused 
reason: "
+                            << query_ctx->paused_reason();
+                    query_ctx->set_memory_sufficient(true);
+                    query_it = queries_list.erase(query_it);
+                    continue;
+                }
+                // check if the reserve is too large, if it is too large,
+                // should set the query's limit only.
+                // Check the query's reserve with expected limit.
+                if (query_ctx->adjusted_mem_limit() <
+                    query_ctx->get_mem_tracker()->consumption() + 
query_it->reserve_size_) {
+                    query_ctx->set_mem_limit(query_ctx->adjusted_mem_limit());
+                    query_ctx->set_memory_sufficient(true);
+                    LOG(INFO) << "Workload group memory reserve failed because 
"
+                              << query_ctx->debug_string() << " reserve size "
+                              << 
PrettyPrinter::print_bytes(query_it->reserve_size_)
+                              << " is too large, set hard limit to "
+                              << 
PrettyPrinter::print_bytes(query_ctx->adjusted_mem_limit())
+                              << " and resume running.";
+                    query_it = queries_list.erase(query_it);
+                    continue;
+                }
+                if (flushed_memtable_bytes <= 0) {
+                    flushed_memtable_bytes =
+                            flush_memtable_from_current_group_(wg, 
query_it->reserve_size_);
+                }
+                if (flushed_memtable_bytes > 0) {
+                    // Flushed some memtable, just wait flush finished and not 
do anything more.
+                    wg->enable_write_buffer_limit(true);
+                    ++query_it;
+                    continue;
+                }
+                if (!has_changed_hard_limit) {
+                    update_queries_limit_(wg, true);
+                    has_changed_hard_limit = true;
+                    LOG(INFO) << "Query: " << print_id(query_ctx->query_id()) 
<< " reserve memory("
+                              << 
PrettyPrinter::print_bytes(query_it->reserve_size_)
+                              << ") failed due to workload group memory 
exceed, "
+                                 "should set the workload group work in memory 
insufficent mode, "
+                                 "so that other query will reduce their 
memory."
+                              << " Query mem limit: "
+                              << 
PrettyPrinter::print_bytes(query_ctx->get_mem_limit())
+                              << " mem usage: "
+                              << PrettyPrinter::print_bytes(
+                                         
query_ctx->get_mem_tracker()->consumption())
+                              << ", wg: " << wg->debug_string();
+                }
+                if (wg->slot_memory_policy() == TWgSlotMemoryPolicy::NONE) {
+                    // If not enable slot memory policy, then should spill 
directly
+                    // Maybe there are another query that use too much memory, 
but we
+                    // not encourage not enable slot memory.
+                    // TODO should kill the query that exceed limit.
+                    bool spill_res = handle_single_query_(query_ctx, 
query_it->reserve_size_,
+                                                          
query_it->elapsed_time(),
+                                                          
query_ctx->paused_reason());
+                    if (!spill_res) {
+                        ++query_it;
+                        continue;
+                    } else {
+                        VLOG_DEBUG << "Query: " << 
print_id(query_ctx->query_id())
+                                   << " remove from paused list";
+                        query_it = queries_list.erase(query_it);
+                        continue;
+                    }
+                } else {
+                    // Should not put the query back to task scheduler 
immediately, because when wg's memory not sufficient,
+                    // and then set wg's flag, other query may not free memory 
very quickly.
+                    if (query_it->elapsed_time() > 
config::spill_in_paused_queue_timeout_ms) {
+                        // set wg's memory to insufficent, then add it back to 
task scheduler to run.
+                        LOG(INFO) << "Query: " << 
print_id(query_ctx->query_id())
+                                  << " will be resume.";
+                        query_ctx->set_memory_sufficient(true);
+                        query_it = queries_list.erase(query_it);
+                        continue;
+                    } else {
+                        ++query_it;
+                        continue;
+                    }
+                }
+            } else {
+                has_query_exceed_process_memlimit = true;
+                // If wg's memlimit not exceed, but process memory exceed, it 
means cache or other metadata
+                // used too much memory. Should clean all cache here.
+                // 1. Check cache used, if cache is larger than > 0, then just 
return and wait for it to 0 to release some memory.
+                if 
(doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted >
+                            0.05 &&
+                    
doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted >
+                            0.05) {
+                    
doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted =

Review Comment:
   TODO:对 cache 容量的调整放在这里不太好,后面看有没有更好的实现方式



##########
be/src/runtime/workload_group/workload_group_manager.cpp:
##########
@@ -287,6 +257,642 @@ void WorkloadGroupMgr::refresh_workload_group_metrics() {
     }
 }
 
+void WorkloadGroupMgr::add_paused_query(const std::shared_ptr<QueryContext>& 
query_ctx,
+                                        int64_t reserve_size, const Status& 
status) {
+    DCHECK(query_ctx != nullptr);
+    query_ctx->update_paused_reason(status);
+    query_ctx->set_low_memory_mode();
+    query_ctx->set_memory_sufficient(false);
+    std::lock_guard<std::mutex> lock(_paused_queries_lock);
+    auto wg = query_ctx->workload_group();
+    auto&& [it, inserted] = _paused_queries_list[wg].emplace(
+            query_ctx, 
doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted,
+            doris::GlobalMemoryArbitrator::any_workload_group_exceed_limit, 
reserve_size);
+    // Check if this is an invalid reserve, for example, if the reserve size 
is too large, larger than the query limit
+    // if hard limit is enabled, then not need enable other queries hard limit.
+    if (inserted) {
+        LOG(INFO) << "Insert one new paused query: " << 
query_ctx->debug_string()
+                  << ", workload group: " << wg->debug_string();
+    }
+}
+
+/**
+ * Strategy 1: A revocable query should not have any running 
task(PipelineTask).
+ * strategy 2: If the workload group has any task exceed workload group 
memlimit, then set all queryctx's memlimit
+ * strategy 3: If any query exceed process memlimit, then should clear all 
caches.
+ * strategy 4: If any query exceed query's memlimit, then do spill disk or 
cancel it.
+ * strategy 5: If any query exceed process's memlimit and cache is zero, then 
do following:
+ */
+void WorkloadGroupMgr::handle_paused_queries() {
+    {
+        std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
+        for (auto& [wg_id, wg] : _workload_groups) {
+            std::unique_lock<std::mutex> lock(_paused_queries_lock);
+            if (_paused_queries_list[wg].empty()) {
+                // Add an empty set to wg that not contains paused queries.
+            }
+        }
+    }
+
+    std::unique_lock<std::mutex> lock(_paused_queries_lock);
+    bool has_revoked_from_other_group = false;
+    bool has_query_exceed_process_memlimit = false;
+    for (auto it = _paused_queries_list.begin(); it != 
_paused_queries_list.end();) {
+        auto& queries_list = it->second;
+        auto query_count = queries_list.size();
+        const auto& wg = it->first;
+
+        if (query_count != 0) {
+            LOG_EVERY_T(INFO, 1) << "Paused queries count of wg " << 
wg->name() << ": "
+                                 << query_count;
+        }
+
+        bool has_changed_hard_limit = false;
+        int64_t flushed_memtable_bytes = 0;
+        // If the query is paused because its limit exceed the query itself's 
memlimit, then just spill disk.
+        // The query's memlimit is set using slot mechanism and its value is 
set using the user settings, not
+        // by weighted value. So if reserve failed, then it is actually exceed 
limit.
+        for (auto query_it = queries_list.begin(); query_it != 
queries_list.end();) {
+            auto query_ctx = query_it->query_ctx_.lock();
+            // The query is finished during in paused list.
+            if (query_ctx == nullptr) {
+                LOG(INFO) << "Query: " << query_it->query_id() << " is 
nullptr, erase it.";
+                query_it = queries_list.erase(query_it);
+                continue;
+            }
+            if (query_ctx->is_cancelled()) {
+                LOG(INFO) << "Query: " << print_id(query_ctx->query_id())
+                          << " was canceled, remove from paused list";
+                query_it = queries_list.erase(query_it);
+                continue;
+            }
+
+            if 
(query_ctx->paused_reason().is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) {
+                // Streamload, kafka load, group commit will never have query 
memory exceeded error because
+                // their  query limit is very large.
+                bool spill_res =
+                        handle_single_query_(query_ctx, 
query_it->reserve_size_,
+                                             query_it->elapsed_time(), 
query_ctx->paused_reason());
+                if (!spill_res) {
+                    ++query_it;
+                    continue;
+                } else {
+                    VLOG_DEBUG << "Query: " << print_id(query_ctx->query_id())
+                               << " remove from paused list";
+                    query_it = queries_list.erase(query_it);
+                    continue;
+                }
+            } else if 
(query_ctx->paused_reason().is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) {
+                // Only deal with non overcommit workload group.
+                if (wg->enable_memory_overcommit()) {
+                    // Soft limit wg will only reserve failed when process 
limit exceed. But in some corner case,
+                    // when reserve, the wg is hard limit, the query reserve 
failed, but when this loop run
+                    // the wg is converted to soft limit.
+                    // So that should resume the query.
+                    LOG(WARNING)
+                            << "Query: " << print_id(query_ctx->query_id())
+                            << " reserve memory failed because exceed workload 
group memlimit, it "
+                               "should not happen, resume it again. paused 
reason: "
+                            << query_ctx->paused_reason();
+                    query_ctx->set_memory_sufficient(true);
+                    query_it = queries_list.erase(query_it);
+                    continue;
+                }
+                // check if the reserve is too large, if it is too large,
+                // should set the query's limit only.
+                // Check the query's reserve with expected limit.
+                if (query_ctx->adjusted_mem_limit() <
+                    query_ctx->get_mem_tracker()->consumption() + 
query_it->reserve_size_) {
+                    query_ctx->set_mem_limit(query_ctx->adjusted_mem_limit());
+                    query_ctx->set_memory_sufficient(true);
+                    LOG(INFO) << "Workload group memory reserve failed because 
"
+                              << query_ctx->debug_string() << " reserve size "
+                              << 
PrettyPrinter::print_bytes(query_it->reserve_size_)
+                              << " is too large, set hard limit to "
+                              << 
PrettyPrinter::print_bytes(query_ctx->adjusted_mem_limit())
+                              << " and resume running.";
+                    query_it = queries_list.erase(query_it);
+                    continue;
+                }
+                if (flushed_memtable_bytes <= 0) {
+                    flushed_memtable_bytes =
+                            flush_memtable_from_current_group_(wg, 
query_it->reserve_size_);
+                }
+                if (flushed_memtable_bytes > 0) {
+                    // Flushed some memtable, just wait flush finished and not 
do anything more.
+                    wg->enable_write_buffer_limit(true);
+                    ++query_it;
+                    continue;
+                }
+                if (!has_changed_hard_limit) {
+                    update_queries_limit_(wg, true);
+                    has_changed_hard_limit = true;
+                    LOG(INFO) << "Query: " << print_id(query_ctx->query_id()) 
<< " reserve memory("
+                              << 
PrettyPrinter::print_bytes(query_it->reserve_size_)
+                              << ") failed due to workload group memory 
exceed, "
+                                 "should set the workload group work in memory 
insufficent mode, "
+                                 "so that other query will reduce their 
memory."
+                              << " Query mem limit: "
+                              << 
PrettyPrinter::print_bytes(query_ctx->get_mem_limit())
+                              << " mem usage: "
+                              << PrettyPrinter::print_bytes(
+                                         
query_ctx->get_mem_tracker()->consumption())
+                              << ", wg: " << wg->debug_string();
+                }
+                if (wg->slot_memory_policy() == TWgSlotMemoryPolicy::NONE) {
+                    // If not enable slot memory policy, then should spill 
directly
+                    // Maybe there are another query that use too much memory, 
but we
+                    // not encourage not enable slot memory.
+                    // TODO should kill the query that exceed limit.
+                    bool spill_res = handle_single_query_(query_ctx, 
query_it->reserve_size_,
+                                                          
query_it->elapsed_time(),
+                                                          
query_ctx->paused_reason());
+                    if (!spill_res) {
+                        ++query_it;
+                        continue;
+                    } else {
+                        VLOG_DEBUG << "Query: " << 
print_id(query_ctx->query_id())
+                                   << " remove from paused list";
+                        query_it = queries_list.erase(query_it);
+                        continue;
+                    }
+                } else {
+                    // Should not put the query back to task scheduler 
immediately, because when wg's memory not sufficient,
+                    // and then set wg's flag, other query may not free memory 
very quickly.
+                    if (query_it->elapsed_time() > 
config::spill_in_paused_queue_timeout_ms) {
+                        // set wg's memory to insufficent, then add it back to 
task scheduler to run.
+                        LOG(INFO) << "Query: " << 
print_id(query_ctx->query_id())
+                                  << " will be resume.";
+                        query_ctx->set_memory_sufficient(true);
+                        query_it = queries_list.erase(query_it);
+                        continue;
+                    } else {
+                        ++query_it;
+                        continue;
+                    }
+                }
+            } else {
+                has_query_exceed_process_memlimit = true;
+                // If wg's memlimit not exceed, but process memory exceed, it 
means cache or other metadata
+                // used too much memory. Should clean all cache here.
+                // 1. Check cache used, if cache is larger than > 0, then just 
return and wait for it to 0 to release some memory.
+                if 
(doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted >
+                            0.05 &&
+                    
doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted >
+                            0.05) {
+                    
doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted =
+                            0.04;
+                    
doris::GlobalMemoryArbitrator::notify_cache_adjust_capacity();
+                    LOG(INFO) << "There are some queries need process memory, 
so that set cache "
+                                 "capacity "
+                                 "to 0 now";
+                }
+                // need to check config::disable_memory_gc here, if not, when 
config::disable_memory_gc == true,
+                // cache is not adjusted, query_it->cache_ratio_ will always 
be 1, and this if branch will nenver
+                // execute, this query will never be resumed, and will 
deadlock here
+                if ((!config::disable_memory_gc && query_it->cache_ratio_ < 
0.05) ||
+                    config::disable_memory_gc) {
+                    // 1. Check if could revoke some memory from memtable
+                    if (flushed_memtable_bytes <= 0) {
+                        flushed_memtable_bytes =
+                                flush_memtable_from_current_group_(wg, 
query_it->reserve_size_);
+                    }
+                    if (flushed_memtable_bytes > 0) {
+                        // Flushed some memtable, just wait flush finished and 
not do anything more.
+                        ++query_it;
+                        continue;
+                    }
+                    // TODO should wait here to check if the process has 
release revoked_size memory and then continue.
+                    if (!has_revoked_from_other_group) {
+                        int64_t revoked_size = revoke_memory_from_other_group_(
+                                query_ctx, wg->enable_memory_overcommit(), 
query_it->reserve_size_);
+                        if (revoked_size > 0) {
+                            has_revoked_from_other_group = true;
+                            query_ctx->set_memory_sufficient(true);
+                            VLOG_DEBUG << "Query: " << 
print_id(query_ctx->query_id())
+                                       << " is resumed after revoke memory 
from other group.";
+                            query_it = queries_list.erase(query_it);
+                            // Do not care if the revoked_size > reserve size, 
and try to run again.

Review Comment:
   这里不管 revoked_size 是否大于 reserve size,都继续执行 query,真的没问题么。
   
   因为没有让 adjusted_mem_limit 生效,所以 query 重新执行后不会因为 `QUERY_MEMORY_EXCEEDED` 进入暂停状态
   
   Query 再次进入暂停状态大概率还是因为 `process memory exceed`,那就会不断 revoke  overcommited 的 
wg,直到 revoke = 0,没有 overcommited 的 wg 后,才会去 spill 当前 query。看似合理,但总感觉线上会有坑,比如 
query 看起来`一卡一卡`的



##########
be/src/runtime/workload_group/workload_group_manager.cpp:
##########
@@ -287,6 +257,642 @@ void WorkloadGroupMgr::refresh_workload_group_metrics() {
     }
 }
 
+void WorkloadGroupMgr::add_paused_query(const std::shared_ptr<QueryContext>& 
query_ctx,
+                                        int64_t reserve_size, const Status& 
status) {
+    DCHECK(query_ctx != nullptr);
+    query_ctx->update_paused_reason(status);
+    query_ctx->set_low_memory_mode();
+    query_ctx->set_memory_sufficient(false);
+    std::lock_guard<std::mutex> lock(_paused_queries_lock);
+    auto wg = query_ctx->workload_group();
+    auto&& [it, inserted] = _paused_queries_list[wg].emplace(
+            query_ctx, 
doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted,
+            doris::GlobalMemoryArbitrator::any_workload_group_exceed_limit, 
reserve_size);
+    // Check if this is an invalid reserve, for example, if the reserve size 
is too large, larger than the query limit
+    // if hard limit is enabled, then not need enable other queries hard limit.
+    if (inserted) {
+        LOG(INFO) << "Insert one new paused query: " << 
query_ctx->debug_string()
+                  << ", workload group: " << wg->debug_string();
+    }
+}
+
+/**
+ * Strategy 1: A revocable query should not have any running 
task(PipelineTask).
+ * strategy 2: If the workload group has any task exceed workload group 
memlimit, then set all queryctx's memlimit
+ * strategy 3: If any query exceed process memlimit, then should clear all 
caches.
+ * strategy 4: If any query exceed query's memlimit, then do spill disk or 
cancel it.
+ * strategy 5: If any query exceed process's memlimit and cache is zero, then 
do following:
+ */
+void WorkloadGroupMgr::handle_paused_queries() {
+    {
+        std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
+        for (auto& [wg_id, wg] : _workload_groups) {
+            std::unique_lock<std::mutex> lock(_paused_queries_lock);
+            if (_paused_queries_list[wg].empty()) {
+                // Add an empty set to wg that not contains paused queries.
+            }
+        }
+    }
+
+    std::unique_lock<std::mutex> lock(_paused_queries_lock);
+    bool has_revoked_from_other_group = false;
+    bool has_query_exceed_process_memlimit = false;
+    for (auto it = _paused_queries_list.begin(); it != 
_paused_queries_list.end();) {
+        auto& queries_list = it->second;
+        auto query_count = queries_list.size();
+        const auto& wg = it->first;
+
+        if (query_count != 0) {
+            LOG_EVERY_T(INFO, 1) << "Paused queries count of wg " << 
wg->name() << ": "
+                                 << query_count;
+        }
+
+        bool has_changed_hard_limit = false;
+        int64_t flushed_memtable_bytes = 0;
+        // If the query is paused because its limit exceed the query itself's 
memlimit, then just spill disk.
+        // The query's memlimit is set using slot mechanism and its value is 
set using the user settings, not
+        // by weighted value. So if reserve failed, then it is actually exceed 
limit.
+        for (auto query_it = queries_list.begin(); query_it != 
queries_list.end();) {
+            auto query_ctx = query_it->query_ctx_.lock();
+            // The query is finished during in paused list.
+            if (query_ctx == nullptr) {
+                LOG(INFO) << "Query: " << query_it->query_id() << " is 
nullptr, erase it.";
+                query_it = queries_list.erase(query_it);
+                continue;
+            }
+            if (query_ctx->is_cancelled()) {
+                LOG(INFO) << "Query: " << print_id(query_ctx->query_id())
+                          << " was canceled, remove from paused list";
+                query_it = queries_list.erase(query_it);
+                continue;
+            }
+
+            if 
(query_ctx->paused_reason().is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) {
+                // Streamload, kafka load, group commit will never have query 
memory exceeded error because
+                // their  query limit is very large.
+                bool spill_res =
+                        handle_single_query_(query_ctx, 
query_it->reserve_size_,
+                                             query_it->elapsed_time(), 
query_ctx->paused_reason());
+                if (!spill_res) {
+                    ++query_it;
+                    continue;
+                } else {
+                    VLOG_DEBUG << "Query: " << print_id(query_ctx->query_id())
+                               << " remove from paused list";
+                    query_it = queries_list.erase(query_it);
+                    continue;
+                }
+            } else if 
(query_ctx->paused_reason().is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) {
+                // Only deal with non overcommit workload group.
+                if (wg->enable_memory_overcommit()) {
+                    // Soft limit wg will only reserve failed when process 
limit exceed. But in some corner case,
+                    // when reserve, the wg is hard limit, the query reserve 
failed, but when this loop run
+                    // the wg is converted to soft limit.
+                    // So that should resume the query.
+                    LOG(WARNING)
+                            << "Query: " << print_id(query_ctx->query_id())
+                            << " reserve memory failed because exceed workload 
group memlimit, it "
+                               "should not happen, resume it again. paused 
reason: "
+                            << query_ctx->paused_reason();
+                    query_ctx->set_memory_sufficient(true);
+                    query_it = queries_list.erase(query_it);
+                    continue;
+                }
+                // check if the reserve is too large, if it is too large,
+                // should set the query's limit only.
+                // Check the query's reserve with expected limit.
+                if (query_ctx->adjusted_mem_limit() <
+                    query_ctx->get_mem_tracker()->consumption() + 
query_it->reserve_size_) {
+                    query_ctx->set_mem_limit(query_ctx->adjusted_mem_limit());
+                    query_ctx->set_memory_sufficient(true);
+                    LOG(INFO) << "Workload group memory reserve failed because 
"
+                              << query_ctx->debug_string() << " reserve size "
+                              << 
PrettyPrinter::print_bytes(query_it->reserve_size_)
+                              << " is too large, set hard limit to "
+                              << 
PrettyPrinter::print_bytes(query_ctx->adjusted_mem_limit())
+                              << " and resume running.";
+                    query_it = queries_list.erase(query_it);
+                    continue;
+                }
+                if (flushed_memtable_bytes <= 0) {
+                    flushed_memtable_bytes =
+                            flush_memtable_from_current_group_(wg, 
query_it->reserve_size_);
+                }
+                if (flushed_memtable_bytes > 0) {
+                    // Flushed some memtable, just wait flush finished and not 
do anything more.
+                    wg->enable_write_buffer_limit(true);
+                    ++query_it;
+                    continue;
+                }
+                if (!has_changed_hard_limit) {
+                    update_queries_limit_(wg, true);
+                    has_changed_hard_limit = true;
+                    LOG(INFO) << "Query: " << print_id(query_ctx->query_id()) 
<< " reserve memory("
+                              << 
PrettyPrinter::print_bytes(query_it->reserve_size_)
+                              << ") failed due to workload group memory 
exceed, "
+                                 "should set the workload group work in memory 
insufficent mode, "
+                                 "so that other query will reduce their 
memory."
+                              << " Query mem limit: "
+                              << 
PrettyPrinter::print_bytes(query_ctx->get_mem_limit())
+                              << " mem usage: "
+                              << PrettyPrinter::print_bytes(
+                                         
query_ctx->get_mem_tracker()->consumption())
+                              << ", wg: " << wg->debug_string();
+                }
+                if (wg->slot_memory_policy() == TWgSlotMemoryPolicy::NONE) {
+                    // If not enable slot memory policy, then should spill 
directly
+                    // Maybe there are another query that use too much memory, 
but we
+                    // not encourage not enable slot memory.
+                    // TODO should kill the query that exceed limit.
+                    bool spill_res = handle_single_query_(query_ctx, 
query_it->reserve_size_,
+                                                          
query_it->elapsed_time(),
+                                                          
query_ctx->paused_reason());
+                    if (!spill_res) {
+                        ++query_it;
+                        continue;
+                    } else {
+                        VLOG_DEBUG << "Query: " << 
print_id(query_ctx->query_id())
+                                   << " remove from paused list";
+                        query_it = queries_list.erase(query_it);
+                        continue;
+                    }
+                } else {
+                    // Should not put the query back to task scheduler 
immediately, because when wg's memory not sufficient,
+                    // and then set wg's flag, other query may not free memory 
very quickly.
+                    if (query_it->elapsed_time() > 
config::spill_in_paused_queue_timeout_ms) {
+                        // set wg's memory to insufficent, then add it back to 
task scheduler to run.
+                        LOG(INFO) << "Query: " << 
print_id(query_ctx->query_id())
+                                  << " will be resume.";
+                        query_ctx->set_memory_sufficient(true);
+                        query_it = queries_list.erase(query_it);
+                        continue;
+                    } else {
+                        ++query_it;
+                        continue;
+                    }
+                }
+            } else {
+                has_query_exceed_process_memlimit = true;
+                // If wg's memlimit not exceed, but process memory exceed, it 
means cache or other metadata
+                // used too much memory. Should clean all cache here.
+                // 1. Check cache used, if cache is larger than > 0, then just 
return and wait for it to 0 to release some memory.
+                if 
(doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted >
+                            0.05 &&
+                    
doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted >
+                            0.05) {
+                    
doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted =
+                            0.04;
+                    
doris::GlobalMemoryArbitrator::notify_cache_adjust_capacity();
+                    LOG(INFO) << "There are some queries need process memory, 
so that set cache "
+                                 "capacity "
+                                 "to 0 now";
+                }
+                // need to check config::disable_memory_gc here, if not, when 
config::disable_memory_gc == true,
+                // cache is not adjusted, query_it->cache_ratio_ will always 
be 1, and this if branch will nenver
+                // execute, this query will never be resumed, and will 
deadlock here
+                if ((!config::disable_memory_gc && query_it->cache_ratio_ < 
0.05) ||
+                    config::disable_memory_gc) {
+                    // 1. Check if could revoke some memory from memtable
+                    if (flushed_memtable_bytes <= 0) {
+                        flushed_memtable_bytes =
+                                flush_memtable_from_current_group_(wg, 
query_it->reserve_size_);
+                    }
+                    if (flushed_memtable_bytes > 0) {
+                        // Flushed some memtable, just wait flush finished and 
not do anything more.
+                        ++query_it;
+                        continue;
+                    }
+                    // TODO should wait here to check if the process has 
release revoked_size memory and then continue.
+                    if (!has_revoked_from_other_group) {
+                        int64_t revoked_size = revoke_memory_from_other_group_(
+                                query_ctx, wg->enable_memory_overcommit(), 
query_it->reserve_size_);
+                        if (revoked_size > 0) {
+                            has_revoked_from_other_group = true;
+                            query_ctx->set_memory_sufficient(true);
+                            VLOG_DEBUG << "Query: " << 
print_id(query_ctx->query_id())
+                                       << " is resumed after revoke memory 
from other group.";
+                            query_it = queries_list.erase(query_it);
+                            // Do not care if the revoked_size > reserve size, 
and try to run again.
+                            continue;
+                        } else {
+                            bool spill_res = handle_single_query_(
+                                    query_ctx, query_it->reserve_size_, 
query_it->elapsed_time(),
+                                    query_ctx->paused_reason());
+                            if (spill_res) {
+                                VLOG_DEBUG << "Query: " << 
print_id(query_ctx->query_id())
+                                           << " remove from paused list";
+                                query_it = queries_list.erase(query_it);
+                                continue;
+                            } else {
+                                ++query_it;
+                                continue;
+                            }
+                        }
+                    } else {

Review Comment:
   如果已经有 query A 去 revoke 了其他 overcommited wg,并将 has_revoked_from_other_group 
设为 true
   
   此处让其他 query 直接继续执行不合理,因为 query A  的 reserve size 可能很小,所以 revoke 其他 
overcommited wg 很小一块内存。
   
   其他 query 继续执行的语义是 "所有能释放的内存都释放了,所以不管三七二十一全跑起来吧! ",但可能还有 overcommited wg 
没有全部释放 overcommited 的部分。



##########
be/src/runtime/workload_group/workload_group_manager.cpp:
##########
@@ -287,6 +257,642 @@ void WorkloadGroupMgr::refresh_workload_group_metrics() {
     }
 }
 
+void WorkloadGroupMgr::add_paused_query(const std::shared_ptr<QueryContext>& 
query_ctx,
+                                        int64_t reserve_size, const Status& 
status) {
+    DCHECK(query_ctx != nullptr);
+    query_ctx->update_paused_reason(status);
+    query_ctx->set_low_memory_mode();
+    query_ctx->set_memory_sufficient(false);
+    std::lock_guard<std::mutex> lock(_paused_queries_lock);
+    auto wg = query_ctx->workload_group();
+    auto&& [it, inserted] = _paused_queries_list[wg].emplace(
+            query_ctx, 
doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted,
+            doris::GlobalMemoryArbitrator::any_workload_group_exceed_limit, 
reserve_size);
+    // Check if this is an invalid reserve, for example, if the reserve size 
is too large, larger than the query limit
+    // if hard limit is enabled, then not need enable other queries hard limit.
+    if (inserted) {
+        LOG(INFO) << "Insert one new paused query: " << 
query_ctx->debug_string()
+                  << ", workload group: " << wg->debug_string();
+    }
+}
+
+/**
+ * Strategy 1: A revocable query should not have any running 
task(PipelineTask).
+ * strategy 2: If the workload group has any task exceed workload group 
memlimit, then set all queryctx's memlimit
+ * strategy 3: If any query exceed process memlimit, then should clear all 
caches.
+ * strategy 4: If any query exceed query's memlimit, then do spill disk or 
cancel it.
+ * strategy 5: If any query exceed process's memlimit and cache is zero, then 
do following:
+ */
+void WorkloadGroupMgr::handle_paused_queries() {
+    {
+        std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
+        for (auto& [wg_id, wg] : _workload_groups) {
+            std::unique_lock<std::mutex> lock(_paused_queries_lock);
+            if (_paused_queries_list[wg].empty()) {
+                // Add an empty set to wg that not contains paused queries.
+            }
+        }
+    }
+
+    std::unique_lock<std::mutex> lock(_paused_queries_lock);
+    bool has_revoked_from_other_group = false;
+    bool has_query_exceed_process_memlimit = false;
+    for (auto it = _paused_queries_list.begin(); it != 
_paused_queries_list.end();) {
+        auto& queries_list = it->second;
+        auto query_count = queries_list.size();
+        const auto& wg = it->first;
+
+        if (query_count != 0) {
+            LOG_EVERY_T(INFO, 1) << "Paused queries count of wg " << 
wg->name() << ": "
+                                 << query_count;
+        }
+
+        bool has_changed_hard_limit = false;
+        int64_t flushed_memtable_bytes = 0;
+        // If the query is paused because its limit exceed the query itself's 
memlimit, then just spill disk.
+        // The query's memlimit is set using slot mechanism and its value is 
set using the user settings, not
+        // by weighted value. So if reserve failed, then it is actually exceed 
limit.
+        for (auto query_it = queries_list.begin(); query_it != 
queries_list.end();) {
+            auto query_ctx = query_it->query_ctx_.lock();
+            // The query is finished during in paused list.
+            if (query_ctx == nullptr) {
+                LOG(INFO) << "Query: " << query_it->query_id() << " is 
nullptr, erase it.";
+                query_it = queries_list.erase(query_it);
+                continue;
+            }
+            if (query_ctx->is_cancelled()) {
+                LOG(INFO) << "Query: " << print_id(query_ctx->query_id())
+                          << " was canceled, remove from paused list";
+                query_it = queries_list.erase(query_it);
+                continue;
+            }
+
+            if 
(query_ctx->paused_reason().is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) {
+                // Streamload, kafka load, group commit will never have query 
memory exceeded error because
+                // their  query limit is very large.
+                bool spill_res =
+                        handle_single_query_(query_ctx, 
query_it->reserve_size_,
+                                             query_it->elapsed_time(), 
query_ctx->paused_reason());
+                if (!spill_res) {
+                    ++query_it;
+                    continue;
+                } else {
+                    VLOG_DEBUG << "Query: " << print_id(query_ctx->query_id())
+                               << " remove from paused list";
+                    query_it = queries_list.erase(query_it);
+                    continue;
+                }
+            } else if 
(query_ctx->paused_reason().is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) {
+                // Only deal with non overcommit workload group.
+                if (wg->enable_memory_overcommit()) {
+                    // Soft limit wg will only reserve failed when process 
limit exceed. But in some corner case,
+                    // when reserve, the wg is hard limit, the query reserve 
failed, but when this loop run
+                    // the wg is converted to soft limit.
+                    // So that should resume the query.
+                    LOG(WARNING)
+                            << "Query: " << print_id(query_ctx->query_id())
+                            << " reserve memory failed because exceed workload 
group memlimit, it "
+                               "should not happen, resume it again. paused 
reason: "
+                            << query_ctx->paused_reason();
+                    query_ctx->set_memory_sufficient(true);
+                    query_it = queries_list.erase(query_it);
+                    continue;
+                }
+                // check if the reserve is too large, if it is too large,
+                // should set the query's limit only.
+                // Check the query's reserve with expected limit.
+                if (query_ctx->adjusted_mem_limit() <
+                    query_ctx->get_mem_tracker()->consumption() + 
query_it->reserve_size_) {
+                    query_ctx->set_mem_limit(query_ctx->adjusted_mem_limit());
+                    query_ctx->set_memory_sufficient(true);
+                    LOG(INFO) << "Workload group memory reserve failed because 
"
+                              << query_ctx->debug_string() << " reserve size "
+                              << 
PrettyPrinter::print_bytes(query_it->reserve_size_)
+                              << " is too large, set hard limit to "
+                              << 
PrettyPrinter::print_bytes(query_ctx->adjusted_mem_limit())
+                              << " and resume running.";
+                    query_it = queries_list.erase(query_it);
+                    continue;
+                }
+                if (flushed_memtable_bytes <= 0) {
+                    flushed_memtable_bytes =
+                            flush_memtable_from_current_group_(wg, 
query_it->reserve_size_);
+                }
+                if (flushed_memtable_bytes > 0) {
+                    // Flushed some memtable, just wait flush finished and not 
do anything more.
+                    wg->enable_write_buffer_limit(true);
+                    ++query_it;
+                    continue;
+                }
+                if (!has_changed_hard_limit) {
+                    update_queries_limit_(wg, true);
+                    has_changed_hard_limit = true;
+                    LOG(INFO) << "Query: " << print_id(query_ctx->query_id()) 
<< " reserve memory("
+                              << 
PrettyPrinter::print_bytes(query_it->reserve_size_)
+                              << ") failed due to workload group memory 
exceed, "
+                                 "should set the workload group work in memory 
insufficent mode, "
+                                 "so that other query will reduce their 
memory."
+                              << " Query mem limit: "
+                              << 
PrettyPrinter::print_bytes(query_ctx->get_mem_limit())
+                              << " mem usage: "
+                              << PrettyPrinter::print_bytes(
+                                         
query_ctx->get_mem_tracker()->consumption())
+                              << ", wg: " << wg->debug_string();
+                }
+                if (wg->slot_memory_policy() == TWgSlotMemoryPolicy::NONE) {
+                    // If not enable slot memory policy, then should spill 
directly
+                    // Maybe there are another query that use too much memory, 
but we
+                    // not encourage not enable slot memory.
+                    // TODO should kill the query that exceed limit.
+                    bool spill_res = handle_single_query_(query_ctx, 
query_it->reserve_size_,
+                                                          
query_it->elapsed_time(),
+                                                          
query_ctx->paused_reason());
+                    if (!spill_res) {
+                        ++query_it;
+                        continue;
+                    } else {
+                        VLOG_DEBUG << "Query: " << 
print_id(query_ctx->query_id())
+                                   << " remove from paused list";
+                        query_it = queries_list.erase(query_it);
+                        continue;
+                    }
+                } else {
+                    // Should not put the query back to task scheduler 
immediately, because when wg's memory not sufficient,
+                    // and then set wg's flag, other query may not free memory 
very quickly.
+                    if (query_it->elapsed_time() > 
config::spill_in_paused_queue_timeout_ms) {
+                        // set wg's memory to insufficent, then add it back to 
task scheduler to run.
+                        LOG(INFO) << "Query: " << 
print_id(query_ctx->query_id())
+                                  << " will be resume.";
+                        query_ctx->set_memory_sufficient(true);
+                        query_it = queries_list.erase(query_it);
+                        continue;
+                    } else {
+                        ++query_it;
+                        continue;
+                    }
+                }
+            } else {
+                has_query_exceed_process_memlimit = true;
+                // If wg's memlimit not exceed, but process memory exceed, it 
means cache or other metadata
+                // used too much memory. Should clean all cache here.
+                // 1. Check cache used, if cache is larger than > 0, then just 
return and wait for it to 0 to release some memory.
+                if 
(doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted >
+                            0.05 &&
+                    
doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted >
+                            0.05) {
+                    
doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted =
+                            0.04;
+                    
doris::GlobalMemoryArbitrator::notify_cache_adjust_capacity();
+                    LOG(INFO) << "There are some queries need process memory, 
so that set cache "
+                                 "capacity "
+                                 "to 0 now";
+                }
+                // need to check config::disable_memory_gc here, if not, when 
config::disable_memory_gc == true,
+                // cache is not adjusted, query_it->cache_ratio_ will always 
be 1, and this if branch will nenver
+                // execute, this query will never be resumed, and will 
deadlock here
+                if ((!config::disable_memory_gc && query_it->cache_ratio_ < 
0.05) ||
+                    config::disable_memory_gc) {
+                    // 1. Check if could revoke some memory from memtable
+                    if (flushed_memtable_bytes <= 0) {
+                        flushed_memtable_bytes =
+                                flush_memtable_from_current_group_(wg, 
query_it->reserve_size_);
+                    }
+                    if (flushed_memtable_bytes > 0) {
+                        // Flushed some memtable, just wait flush finished and 
not do anything more.
+                        ++query_it;
+                        continue;
+                    }
+                    // TODO should wait here to check if the process has 
release revoked_size memory and then continue.
+                    if (!has_revoked_from_other_group) {
+                        int64_t revoked_size = revoke_memory_from_other_group_(
+                                query_ctx, wg->enable_memory_overcommit(), 
query_it->reserve_size_);
+                        if (revoked_size > 0) {
+                            has_revoked_from_other_group = true;
+                            query_ctx->set_memory_sufficient(true);
+                            VLOG_DEBUG << "Query: " << 
print_id(query_ctx->query_id())
+                                       << " is resumed after revoke memory 
from other group.";
+                            query_it = queries_list.erase(query_it);
+                            // Do not care if the revoked_size > reserve size, 
and try to run again.
+                            continue;
+                        } else {
+                            bool spill_res = handle_single_query_(
+                                    query_ctx, query_it->reserve_size_, 
query_it->elapsed_time(),
+                                    query_ctx->paused_reason());
+                            if (spill_res) {
+                                VLOG_DEBUG << "Query: " << 
print_id(query_ctx->query_id())
+                                           << " remove from paused list";
+                                query_it = queries_list.erase(query_it);
+                                continue;
+                            } else {
+                                ++query_it;
+                                continue;
+                            }
+                        }
+                    } else {
+                        // If any query is cancelled during process limit 
stage, should resume other query and
+                        // do not do any check now.
+                        query_ctx->set_memory_sufficient(true);
+                        VLOG_DEBUG << "Query: " << 
print_id(query_ctx->query_id())
+                                   << " remove from paused list";
+                        query_it = queries_list.erase(query_it);
+                        continue;
+                    }
+                }
+                if 
(doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted <
+                            0.05 &&
+                    query_it->cache_ratio_ > 0.05) {

Review Comment:
   这里的问题和上面对 `cache_ratio_ < 0.05` 含义的分析一样,
   
   TODO:给 CacheManager 加个方法,返回所有 Cache 的使用率,若低于某个值,则认为 
"所有Cache释放完了,不必等待Cache释放内存了"



##########
be/src/runtime/workload_group/workload_group_manager.cpp:
##########
@@ -287,6 +257,642 @@ void WorkloadGroupMgr::refresh_workload_group_metrics() {
     }
 }
 
+void WorkloadGroupMgr::add_paused_query(const std::shared_ptr<QueryContext>& 
query_ctx,
+                                        int64_t reserve_size, const Status& 
status) {
+    DCHECK(query_ctx != nullptr);
+    query_ctx->update_paused_reason(status);
+    query_ctx->set_low_memory_mode();
+    query_ctx->set_memory_sufficient(false);
+    std::lock_guard<std::mutex> lock(_paused_queries_lock);
+    auto wg = query_ctx->workload_group();
+    auto&& [it, inserted] = _paused_queries_list[wg].emplace(
+            query_ctx, 
doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted,
+            doris::GlobalMemoryArbitrator::any_workload_group_exceed_limit, 
reserve_size);
+    // Check if this is an invalid reserve, for example, if the reserve size 
is too large, larger than the query limit
+    // if hard limit is enabled, then not need enable other queries hard limit.
+    if (inserted) {
+        LOG(INFO) << "Insert one new paused query: " << 
query_ctx->debug_string()
+                  << ", workload group: " << wg->debug_string();
+    }
+}
+
+/**
+ * Strategy 1: A revocable query should not have any running 
task(PipelineTask).
+ * strategy 2: If the workload group has any task exceed workload group 
memlimit, then set all queryctx's memlimit
+ * strategy 3: If any query exceed process memlimit, then should clear all 
caches.
+ * strategy 4: If any query exceed query's memlimit, then do spill disk or 
cancel it.
+ * strategy 5: If any query exceed process's memlimit and cache is zero, then 
do following:
+ */
+void WorkloadGroupMgr::handle_paused_queries() {
+    {
+        std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
+        for (auto& [wg_id, wg] : _workload_groups) {
+            std::unique_lock<std::mutex> lock(_paused_queries_lock);
+            if (_paused_queries_list[wg].empty()) {
+                // Add an empty set to wg that not contains paused queries.
+            }
+        }
+    }
+
+    std::unique_lock<std::mutex> lock(_paused_queries_lock);
+    bool has_revoked_from_other_group = false;
+    bool has_query_exceed_process_memlimit = false;
+    for (auto it = _paused_queries_list.begin(); it != 
_paused_queries_list.end();) {
+        auto& queries_list = it->second;
+        auto query_count = queries_list.size();
+        const auto& wg = it->first;
+
+        if (query_count != 0) {
+            LOG_EVERY_T(INFO, 1) << "Paused queries count of wg " << 
wg->name() << ": "
+                                 << query_count;
+        }
+
+        bool has_changed_hard_limit = false;
+        int64_t flushed_memtable_bytes = 0;
+        // If the query is paused because its limit exceed the query itself's 
memlimit, then just spill disk.
+        // The query's memlimit is set using slot mechanism and its value is 
set using the user settings, not
+        // by weighted value. So if reserve failed, then it is actually exceed 
limit.
+        for (auto query_it = queries_list.begin(); query_it != 
queries_list.end();) {
+            auto query_ctx = query_it->query_ctx_.lock();
+            // The query is finished during in paused list.
+            if (query_ctx == nullptr) {
+                LOG(INFO) << "Query: " << query_it->query_id() << " is 
nullptr, erase it.";
+                query_it = queries_list.erase(query_it);
+                continue;
+            }
+            if (query_ctx->is_cancelled()) {
+                LOG(INFO) << "Query: " << print_id(query_ctx->query_id())
+                          << " was canceled, remove from paused list";
+                query_it = queries_list.erase(query_it);
+                continue;
+            }
+
+            if 
(query_ctx->paused_reason().is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) {
+                // Streamload, kafka load, group commit will never have query 
memory exceeded error because
+                // their  query limit is very large.
+                bool spill_res =
+                        handle_single_query_(query_ctx, 
query_it->reserve_size_,
+                                             query_it->elapsed_time(), 
query_ctx->paused_reason());
+                if (!spill_res) {
+                    ++query_it;
+                    continue;
+                } else {
+                    VLOG_DEBUG << "Query: " << print_id(query_ctx->query_id())
+                               << " remove from paused list";
+                    query_it = queries_list.erase(query_it);
+                    continue;
+                }
+            } else if 
(query_ctx->paused_reason().is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) {
+                // Only deal with non overcommit workload group.
+                if (wg->enable_memory_overcommit()) {
+                    // Soft limit wg will only reserve failed when process 
limit exceed. But in some corner case,
+                    // when reserve, the wg is hard limit, the query reserve 
failed, but when this loop run
+                    // the wg is converted to soft limit.
+                    // So that should resume the query.
+                    LOG(WARNING)
+                            << "Query: " << print_id(query_ctx->query_id())
+                            << " reserve memory failed because exceed workload 
group memlimit, it "
+                               "should not happen, resume it again. paused 
reason: "
+                            << query_ctx->paused_reason();
+                    query_ctx->set_memory_sufficient(true);
+                    query_it = queries_list.erase(query_it);
+                    continue;
+                }
+                // check if the reserve is too large, if it is too large,
+                // should set the query's limit only.
+                // Check the query's reserve with expected limit.
+                if (query_ctx->adjusted_mem_limit() <
+                    query_ctx->get_mem_tracker()->consumption() + 
query_it->reserve_size_) {
+                    query_ctx->set_mem_limit(query_ctx->adjusted_mem_limit());
+                    query_ctx->set_memory_sufficient(true);
+                    LOG(INFO) << "Workload group memory reserve failed because 
"
+                              << query_ctx->debug_string() << " reserve size "
+                              << 
PrettyPrinter::print_bytes(query_it->reserve_size_)
+                              << " is too large, set hard limit to "
+                              << 
PrettyPrinter::print_bytes(query_ctx->adjusted_mem_limit())
+                              << " and resume running.";
+                    query_it = queries_list.erase(query_it);
+                    continue;
+                }
+                if (flushed_memtable_bytes <= 0) {
+                    flushed_memtable_bytes =
+                            flush_memtable_from_current_group_(wg, 
query_it->reserve_size_);
+                }
+                if (flushed_memtable_bytes > 0) {
+                    // Flushed some memtable, just wait flush finished and not 
do anything more.
+                    wg->enable_write_buffer_limit(true);
+                    ++query_it;
+                    continue;
+                }
+                if (!has_changed_hard_limit) {
+                    update_queries_limit_(wg, true);
+                    has_changed_hard_limit = true;
+                    LOG(INFO) << "Query: " << print_id(query_ctx->query_id()) 
<< " reserve memory("
+                              << 
PrettyPrinter::print_bytes(query_it->reserve_size_)
+                              << ") failed due to workload group memory 
exceed, "
+                                 "should set the workload group work in memory 
insufficent mode, "
+                                 "so that other query will reduce their 
memory."
+                              << " Query mem limit: "
+                              << 
PrettyPrinter::print_bytes(query_ctx->get_mem_limit())
+                              << " mem usage: "
+                              << PrettyPrinter::print_bytes(
+                                         
query_ctx->get_mem_tracker()->consumption())
+                              << ", wg: " << wg->debug_string();
+                }
+                if (wg->slot_memory_policy() == TWgSlotMemoryPolicy::NONE) {
+                    // If not enable slot memory policy, then should spill 
directly
+                    // Maybe there are another query that use too much memory, 
but we
+                    // not encourage not enable slot memory.
+                    // TODO should kill the query that exceed limit.
+                    bool spill_res = handle_single_query_(query_ctx, 
query_it->reserve_size_,
+                                                          
query_it->elapsed_time(),
+                                                          
query_ctx->paused_reason());
+                    if (!spill_res) {
+                        ++query_it;
+                        continue;
+                    } else {
+                        VLOG_DEBUG << "Query: " << 
print_id(query_ctx->query_id())
+                                   << " remove from paused list";
+                        query_it = queries_list.erase(query_it);
+                        continue;
+                    }
+                } else {
+                    // Should not put the query back to task scheduler 
immediately, because when wg's memory not sufficient,
+                    // and then set wg's flag, other query may not free memory 
very quickly.
+                    if (query_it->elapsed_time() > 
config::spill_in_paused_queue_timeout_ms) {
+                        // set wg's memory to insufficent, then add it back to 
task scheduler to run.
+                        LOG(INFO) << "Query: " << 
print_id(query_ctx->query_id())
+                                  << " will be resume.";
+                        query_ctx->set_memory_sufficient(true);
+                        query_it = queries_list.erase(query_it);
+                        continue;
+                    } else {
+                        ++query_it;
+                        continue;
+                    }
+                }
+            } else {
+                has_query_exceed_process_memlimit = true;
+                // If wg's memlimit not exceed, but process memory exceed, it 
means cache or other metadata
+                // used too much memory. Should clean all cache here.
+                // 1. Check cache used, if cache is larger than > 0, then just 
return and wait for it to 0 to release some memory.
+                if 
(doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted >
+                            0.05 &&
+                    
doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted >
+                            0.05) {
+                    
doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted =
+                            0.04;
+                    
doris::GlobalMemoryArbitrator::notify_cache_adjust_capacity();
+                    LOG(INFO) << "There are some queries need process memory, 
so that set cache "
+                                 "capacity "
+                                 "to 0 now";
+                }
+                // need to check config::disable_memory_gc here, if not, when 
config::disable_memory_gc == true,
+                // cache is not adjusted, query_it->cache_ratio_ will always 
be 1, and this if branch will nenver
+                // execute, this query will never be resumed, and will 
deadlock here
+                if ((!config::disable_memory_gc && query_it->cache_ratio_ < 
0.05) ||

Review Comment:
   实际上 cache 释放需要时间,几十G的 page cache 全部释放需要接近1s,
   
   这里 `cache_ratio_ < 0.05` 期望的语义是 "当前所有Cache已经都释放完了,如果其他地方不能释放内存就 spill",但实际上 
Cache 可能并没有释放完。
   
   TODO:给 CacheManager 加个方法,返回所有 Cache 的使用率,若低于某个值,则认为 
"所有Cache释放完了,不必等待Cache释放内存了"



##########
be/src/runtime/workload_group/workload_group_manager.cpp:
##########
@@ -287,6 +257,642 @@ void WorkloadGroupMgr::refresh_workload_group_metrics() {
     }
 }
 
+void WorkloadGroupMgr::add_paused_query(const std::shared_ptr<QueryContext>& 
query_ctx,
+                                        int64_t reserve_size, const Status& 
status) {
+    DCHECK(query_ctx != nullptr);
+    query_ctx->update_paused_reason(status);
+    query_ctx->set_low_memory_mode();
+    query_ctx->set_memory_sufficient(false);
+    std::lock_guard<std::mutex> lock(_paused_queries_lock);
+    auto wg = query_ctx->workload_group();
+    auto&& [it, inserted] = _paused_queries_list[wg].emplace(
+            query_ctx, 
doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted,
+            doris::GlobalMemoryArbitrator::any_workload_group_exceed_limit, 
reserve_size);
+    // Check if this is an invalid reserve, for example, if the reserve size 
is too large, larger than the query limit
+    // if hard limit is enabled, then not need enable other queries hard limit.
+    if (inserted) {
+        LOG(INFO) << "Insert one new paused query: " << 
query_ctx->debug_string()
+                  << ", workload group: " << wg->debug_string();
+    }
+}
+
+/**
+ * Strategy 1: A revocable query should not have any running 
task(PipelineTask).
+ * strategy 2: If the workload group has any task exceed workload group 
memlimit, then set all queryctx's memlimit
+ * strategy 3: If any query exceed process memlimit, then should clear all 
caches.
+ * strategy 4: If any query exceed query's memlimit, then do spill disk or 
cancel it.
+ * strategy 5: If any query exceed process's memlimit and cache is zero, then 
do following:
+ */
+void WorkloadGroupMgr::handle_paused_queries() {
+    {
+        std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
+        for (auto& [wg_id, wg] : _workload_groups) {
+            std::unique_lock<std::mutex> lock(_paused_queries_lock);
+            if (_paused_queries_list[wg].empty()) {
+                // Add an empty set to wg that not contains paused queries.
+            }
+        }
+    }
+
+    std::unique_lock<std::mutex> lock(_paused_queries_lock);
+    bool has_revoked_from_other_group = false;
+    bool has_query_exceed_process_memlimit = false;
+    for (auto it = _paused_queries_list.begin(); it != 
_paused_queries_list.end();) {
+        auto& queries_list = it->second;
+        auto query_count = queries_list.size();
+        const auto& wg = it->first;
+
+        if (query_count != 0) {
+            LOG_EVERY_T(INFO, 1) << "Paused queries count of wg " << 
wg->name() << ": "
+                                 << query_count;
+        }
+
+        bool has_changed_hard_limit = false;
+        int64_t flushed_memtable_bytes = 0;
+        // If the query is paused because its limit exceed the query itself's 
memlimit, then just spill disk.
+        // The query's memlimit is set using slot mechanism and its value is 
set using the user settings, not
+        // by weighted value. So if reserve failed, then it is actually exceed 
limit.
+        for (auto query_it = queries_list.begin(); query_it != 
queries_list.end();) {
+            auto query_ctx = query_it->query_ctx_.lock();
+            // The query is finished during in paused list.
+            if (query_ctx == nullptr) {
+                LOG(INFO) << "Query: " << query_it->query_id() << " is 
nullptr, erase it.";
+                query_it = queries_list.erase(query_it);
+                continue;
+            }
+            if (query_ctx->is_cancelled()) {
+                LOG(INFO) << "Query: " << print_id(query_ctx->query_id())
+                          << " was canceled, remove from paused list";
+                query_it = queries_list.erase(query_it);
+                continue;
+            }
+
+            if 
(query_ctx->paused_reason().is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) {
+                // Streamload, kafka load, group commit will never have query 
memory exceeded error because
+                // their  query limit is very large.
+                bool spill_res =
+                        handle_single_query_(query_ctx, 
query_it->reserve_size_,
+                                             query_it->elapsed_time(), 
query_ctx->paused_reason());
+                if (!spill_res) {
+                    ++query_it;
+                    continue;
+                } else {
+                    VLOG_DEBUG << "Query: " << print_id(query_ctx->query_id())
+                               << " remove from paused list";
+                    query_it = queries_list.erase(query_it);
+                    continue;
+                }
+            } else if 
(query_ctx->paused_reason().is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) {
+                // Only deal with non overcommit workload group.
+                if (wg->enable_memory_overcommit()) {
+                    // Soft limit wg will only reserve failed when process 
limit exceed. But in some corner case,
+                    // when reserve, the wg is hard limit, the query reserve 
failed, but when this loop run
+                    // the wg is converted to soft limit.
+                    // So that should resume the query.
+                    LOG(WARNING)
+                            << "Query: " << print_id(query_ctx->query_id())
+                            << " reserve memory failed because exceed workload 
group memlimit, it "
+                               "should not happen, resume it again. paused 
reason: "
+                            << query_ctx->paused_reason();
+                    query_ctx->set_memory_sufficient(true);
+                    query_it = queries_list.erase(query_it);
+                    continue;
+                }
+                // check if the reserve is too large, if it is too large,
+                // should set the query's limit only.
+                // Check the query's reserve with expected limit.
+                if (query_ctx->adjusted_mem_limit() <
+                    query_ctx->get_mem_tracker()->consumption() + 
query_it->reserve_size_) {
+                    query_ctx->set_mem_limit(query_ctx->adjusted_mem_limit());
+                    query_ctx->set_memory_sufficient(true);
+                    LOG(INFO) << "Workload group memory reserve failed because 
"
+                              << query_ctx->debug_string() << " reserve size "
+                              << 
PrettyPrinter::print_bytes(query_it->reserve_size_)
+                              << " is too large, set hard limit to "
+                              << 
PrettyPrinter::print_bytes(query_ctx->adjusted_mem_limit())
+                              << " and resume running.";
+                    query_it = queries_list.erase(query_it);
+                    continue;
+                }
+                if (flushed_memtable_bytes <= 0) {
+                    flushed_memtable_bytes =
+                            flush_memtable_from_current_group_(wg, 
query_it->reserve_size_);
+                }
+                if (flushed_memtable_bytes > 0) {
+                    // Flushed some memtable, just wait flush finished and not 
do anything more.
+                    wg->enable_write_buffer_limit(true);
+                    ++query_it;
+                    continue;
+                }
+                if (!has_changed_hard_limit) {
+                    update_queries_limit_(wg, true);
+                    has_changed_hard_limit = true;
+                    LOG(INFO) << "Query: " << print_id(query_ctx->query_id()) 
<< " reserve memory("
+                              << 
PrettyPrinter::print_bytes(query_it->reserve_size_)
+                              << ") failed due to workload group memory 
exceed, "
+                                 "should set the workload group work in memory 
insufficent mode, "
+                                 "so that other query will reduce their 
memory."
+                              << " Query mem limit: "
+                              << 
PrettyPrinter::print_bytes(query_ctx->get_mem_limit())
+                              << " mem usage: "
+                              << PrettyPrinter::print_bytes(
+                                         
query_ctx->get_mem_tracker()->consumption())
+                              << ", wg: " << wg->debug_string();
+                }
+                if (wg->slot_memory_policy() == TWgSlotMemoryPolicy::NONE) {
+                    // If not enable slot memory policy, then should spill 
directly
+                    // Maybe there are another query that use too much memory, 
but we
+                    // not encourage not enable slot memory.
+                    // TODO should kill the query that exceed limit.
+                    bool spill_res = handle_single_query_(query_ctx, 
query_it->reserve_size_,
+                                                          
query_it->elapsed_time(),
+                                                          
query_ctx->paused_reason());
+                    if (!spill_res) {
+                        ++query_it;
+                        continue;
+                    } else {
+                        VLOG_DEBUG << "Query: " << 
print_id(query_ctx->query_id())
+                                   << " remove from paused list";
+                        query_it = queries_list.erase(query_it);
+                        continue;
+                    }
+                } else {
+                    // Should not put the query back to task scheduler 
immediately, because when wg's memory not sufficient,
+                    // and then set wg's flag, other query may not free memory 
very quickly.
+                    if (query_it->elapsed_time() > 
config::spill_in_paused_queue_timeout_ms) {
+                        // set wg's memory to insufficent, then add it back to 
task scheduler to run.
+                        LOG(INFO) << "Query: " << 
print_id(query_ctx->query_id())
+                                  << " will be resume.";
+                        query_ctx->set_memory_sufficient(true);
+                        query_it = queries_list.erase(query_it);
+                        continue;
+                    } else {
+                        ++query_it;
+                        continue;
+                    }
+                }
+            } else {
+                has_query_exceed_process_memlimit = true;
+                // If wg's memlimit not exceed, but process memory exceed, it 
means cache or other metadata
+                // used too much memory. Should clean all cache here.
+                // 1. Check cache used, if cache is larger than > 0, then just 
return and wait for it to 0 to release some memory.
+                if 
(doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted >
+                            0.05 &&
+                    
doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted >
+                            0.05) {
+                    
doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted =
+                            0.04;
+                    
doris::GlobalMemoryArbitrator::notify_cache_adjust_capacity();
+                    LOG(INFO) << "There are some queries need process memory, 
so that set cache "
+                                 "capacity "
+                                 "to 0 now";
+                }
+                // need to check config::disable_memory_gc here, if not, when 
config::disable_memory_gc == true,
+                // cache is not adjusted, query_it->cache_ratio_ will always 
be 1, and this if branch will nenver
+                // execute, this query will never be resumed, and will 
deadlock here
+                if ((!config::disable_memory_gc && query_it->cache_ratio_ < 
0.05) ||
+                    config::disable_memory_gc) {
+                    // 1. Check if could revoke some memory from memtable
+                    if (flushed_memtable_bytes <= 0) {
+                        flushed_memtable_bytes =
+                                flush_memtable_from_current_group_(wg, 
query_it->reserve_size_);
+                    }
+                    if (flushed_memtable_bytes > 0) {
+                        // Flushed some memtable, just wait flush finished and 
not do anything more.
+                        ++query_it;
+                        continue;
+                    }
+                    // TODO should wait here to check if the process has 
release revoked_size memory and then continue.
+                    if (!has_revoked_from_other_group) {
+                        int64_t revoked_size = revoke_memory_from_other_group_(
+                                query_ctx, wg->enable_memory_overcommit(), 
query_it->reserve_size_);
+                        if (revoked_size > 0) {
+                            has_revoked_from_other_group = true;
+                            query_ctx->set_memory_sufficient(true);
+                            VLOG_DEBUG << "Query: " << 
print_id(query_ctx->query_id())
+                                       << " is resumed after revoke memory 
from other group.";
+                            query_it = queries_list.erase(query_it);
+                            // Do not care if the revoked_size > reserve size, 
and try to run again.
+                            continue;
+                        } else {
+                            bool spill_res = handle_single_query_(
+                                    query_ctx, query_it->reserve_size_, 
query_it->elapsed_time(),
+                                    query_ctx->paused_reason());
+                            if (spill_res) {
+                                VLOG_DEBUG << "Query: " << 
print_id(query_ctx->query_id())
+                                           << " remove from paused list";
+                                query_it = queries_list.erase(query_it);
+                                continue;
+                            } else {
+                                ++query_it;
+                                continue;
+                            }
+                        }
+                    } else {
+                        // If any query is cancelled during process limit 
stage, should resume other query and
+                        // do not do any check now.
+                        query_ctx->set_memory_sufficient(true);
+                        VLOG_DEBUG << "Query: " << 
print_id(query_ctx->query_id())
+                                   << " remove from paused list";
+                        query_it = queries_list.erase(query_it);
+                        continue;
+                    }
+                }
+                if 
(doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted <
+                            0.05 &&
+                    query_it->cache_ratio_ > 0.05) {
+                    LOG(INFO) << "Query: " << print_id(query_ctx->query_id())
+                              << " will be resume after cache adjust.";
+                    query_ctx->set_memory_sufficient(true);
+                    query_it = queries_list.erase(query_it);
+                    continue;
+                }
+                ++query_it;
+            }
+        }
+
+        bool is_low_watermark = false;
+        bool is_high_watermark = false;
+        wg->check_mem_used(&is_low_watermark, &is_high_watermark);
+        // Not need waiting flush memtable and below low watermark disable 
load buffer limit
+        if (flushed_memtable_bytes <= 0 && !is_low_watermark) {
+            wg->enable_write_buffer_limit(false);
+        }
+
+        if (queries_list.empty()) {
+            it = _paused_queries_list.erase(it);
+            continue;
+        } else {
+            // Finished deal with one workload group, and should deal with 
next one.
+            ++it;
+        }
+    }
+
+    if (has_query_exceed_process_memlimit) {
+        // No query failed due to process exceed limit, so that enable cache 
now.
+        
doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted = 
1;
+    }
+}
+
+// Return the expected free bytes if memtable could flush
+int64_t WorkloadGroupMgr::flush_memtable_from_current_group_(WorkloadGroupPtr 
wg,
+                                                             int64_t 
need_free_mem) {
+    // If there are a lot of memtable memory, then wait them flush finished.
+    MemTableMemoryLimiter* memtable_limiter =
+            doris::ExecEnv::GetInstance()->memtable_memory_limiter();
+    int64_t memtable_active_bytes = 0;
+    int64_t memtable_queue_bytes = 0;
+    int64_t memtable_flush_bytes = 0;
+    DCHECK(memtable_limiter != nullptr) << "memtable limiter is nullptr";
+    memtable_limiter->get_workload_group_memtable_usage(
+            wg->id(), &memtable_active_bytes, &memtable_queue_bytes, 
&memtable_flush_bytes);
+    // TODO: should add a signal in memtable limiter to prevent new batch
+    // For example, streamload, it will not reserve many memory, but it will 
occupy many memtable memory.
+    // TODO: 0.2 should be a workload group properties. For example, the group 
is optimized for load,then the value
+    // should be larged, if the group is optimized for query, then the value 
should be smaller.
+    int64_t max_wg_memtable_bytes = wg->write_buffer_limit();
+    if (memtable_active_bytes + memtable_queue_bytes + memtable_flush_bytes >
+        max_wg_memtable_bytes) {
+        // There are many table in flush queue, just waiting them flush 
finished.
+        if (memtable_active_bytes < (int64_t)(max_wg_memtable_bytes * 0.6)) {
+            LOG_EVERY_T(INFO, 60) << wg->name()
+                                  << " load memtable size is: " << 
memtable_active_bytes << ", "
+                                  << memtable_queue_bytes << ", " << 
memtable_flush_bytes
+                                  << ", load buffer limit is: " << 
max_wg_memtable_bytes
+                                  << " wait for flush finished to release more 
memory";
+            return memtable_queue_bytes + memtable_flush_bytes;
+        } else {
+            // Flush some memtables(currently written) to flush queue.
+            memtable_limiter->flush_workload_group_memtables(
+                    wg->id(), memtable_active_bytes - 
(int64_t)(max_wg_memtable_bytes * 0.6));
+            LOG_EVERY_T(INFO, 60) << wg->name()
+                                  << " load memtable size is: " << 
memtable_active_bytes << ", "
+                                  << memtable_queue_bytes << ", " << 
memtable_flush_bytes
+                                  << ", flush some active memtable to revoke 
memory";
+            return memtable_queue_bytes + memtable_flush_bytes + 
memtable_active_bytes -
+                   (int64_t)(max_wg_memtable_bytes * 0.6);
+        }
+    }
+    return 0;
+}
+
+int64_t 
WorkloadGroupMgr::revoke_memory_from_other_group_(std::shared_ptr<QueryContext> 
requestor,
+                                                          bool hard_limit, 
int64_t need_free_mem) {
+    int64_t total_freed_mem = 0;
+    std::unique_ptr<RuntimeProfile> profile = 
std::make_unique<RuntimeProfile>("RevokeMemory");
+    // 1. memtable like memory
+    // 2. query exceed workload group limit
+    int64_t freed_mem = revoke_overcommited_memory_(requestor, need_free_mem, 
profile.get());
+    total_freed_mem += freed_mem;
+    // The revoke process may kill current requestor, so should return now.
+    if (need_free_mem - total_freed_mem < 0 || requestor->is_cancelled()) {
+        return total_freed_mem;
+    }
+    if (hard_limit) {
+        freed_mem = cancel_top_query_in_overcommit_group_(need_free_mem - 
total_freed_mem,
+                                                          
doris::QUERY_MIN_MEMORY, profile.get());

Review Comment:
   这里为啥要判断 hard_limit,然后设置一个 cancel 的 `lower_bound` 呢



##########
be/src/runtime/workload_group/workload_group_manager.cpp:
##########
@@ -287,6 +257,642 @@ void WorkloadGroupMgr::refresh_workload_group_metrics() {
     }
 }
 
+void WorkloadGroupMgr::add_paused_query(const std::shared_ptr<QueryContext>& 
query_ctx,
+                                        int64_t reserve_size, const Status& 
status) {
+    DCHECK(query_ctx != nullptr);
+    query_ctx->update_paused_reason(status);
+    query_ctx->set_low_memory_mode();
+    query_ctx->set_memory_sufficient(false);
+    std::lock_guard<std::mutex> lock(_paused_queries_lock);
+    auto wg = query_ctx->workload_group();
+    auto&& [it, inserted] = _paused_queries_list[wg].emplace(
+            query_ctx, 
doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted,
+            doris::GlobalMemoryArbitrator::any_workload_group_exceed_limit, 
reserve_size);
+    // Check if this is an invalid reserve, for example, if the reserve size 
is too large, larger than the query limit
+    // if hard limit is enabled, then not need enable other queries hard limit.
+    if (inserted) {
+        LOG(INFO) << "Insert one new paused query: " << 
query_ctx->debug_string()
+                  << ", workload group: " << wg->debug_string();
+    }
+}
+
+/**
+ * Strategy 1: A revocable query should not have any running 
task(PipelineTask).
+ * strategy 2: If the workload group has any task exceed workload group 
memlimit, then set all queryctx's memlimit
+ * strategy 3: If any query exceed process memlimit, then should clear all 
caches.
+ * strategy 4: If any query exceed query's memlimit, then do spill disk or 
cancel it.
+ * strategy 5: If any query exceed process's memlimit and cache is zero, then 
do following:
+ */
+void WorkloadGroupMgr::handle_paused_queries() {
+    {
+        std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
+        for (auto& [wg_id, wg] : _workload_groups) {
+            std::unique_lock<std::mutex> lock(_paused_queries_lock);
+            if (_paused_queries_list[wg].empty()) {
+                // Add an empty set to wg that not contains paused queries.
+            }
+        }
+    }
+
+    std::unique_lock<std::mutex> lock(_paused_queries_lock);
+    bool has_revoked_from_other_group = false;
+    bool has_query_exceed_process_memlimit = false;
+    for (auto it = _paused_queries_list.begin(); it != 
_paused_queries_list.end();) {
+        auto& queries_list = it->second;
+        auto query_count = queries_list.size();
+        const auto& wg = it->first;
+
+        if (query_count != 0) {
+            LOG_EVERY_T(INFO, 1) << "Paused queries count of wg " << 
wg->name() << ": "
+                                 << query_count;
+        }
+
+        bool has_changed_hard_limit = false;
+        int64_t flushed_memtable_bytes = 0;
+        // If the query is paused because its limit exceed the query itself's 
memlimit, then just spill disk.
+        // The query's memlimit is set using slot mechanism and its value is 
set using the user settings, not
+        // by weighted value. So if reserve failed, then it is actually exceed 
limit.
+        for (auto query_it = queries_list.begin(); query_it != 
queries_list.end();) {
+            auto query_ctx = query_it->query_ctx_.lock();
+            // The query is finished during in paused list.
+            if (query_ctx == nullptr) {
+                LOG(INFO) << "Query: " << query_it->query_id() << " is 
nullptr, erase it.";
+                query_it = queries_list.erase(query_it);
+                continue;
+            }
+            if (query_ctx->is_cancelled()) {
+                LOG(INFO) << "Query: " << print_id(query_ctx->query_id())
+                          << " was canceled, remove from paused list";
+                query_it = queries_list.erase(query_it);
+                continue;
+            }
+
+            if 
(query_ctx->paused_reason().is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) {
+                // Streamload, kafka load, group commit will never have query 
memory exceeded error because
+                // their  query limit is very large.
+                bool spill_res =
+                        handle_single_query_(query_ctx, 
query_it->reserve_size_,
+                                             query_it->elapsed_time(), 
query_ctx->paused_reason());
+                if (!spill_res) {
+                    ++query_it;
+                    continue;
+                } else {
+                    VLOG_DEBUG << "Query: " << print_id(query_ctx->query_id())
+                               << " remove from paused list";
+                    query_it = queries_list.erase(query_it);
+                    continue;
+                }
+            } else if 
(query_ctx->paused_reason().is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) {
+                // Only deal with non overcommit workload group.
+                if (wg->enable_memory_overcommit()) {
+                    // Soft limit wg will only reserve failed when process 
limit exceed. But in some corner case,
+                    // when reserve, the wg is hard limit, the query reserve 
failed, but when this loop run
+                    // the wg is converted to soft limit.
+                    // So that should resume the query.
+                    LOG(WARNING)
+                            << "Query: " << print_id(query_ctx->query_id())
+                            << " reserve memory failed because exceed workload 
group memlimit, it "
+                               "should not happen, resume it again. paused 
reason: "
+                            << query_ctx->paused_reason();
+                    query_ctx->set_memory_sufficient(true);
+                    query_it = queries_list.erase(query_it);
+                    continue;
+                }
+                // check if the reserve is too large, if it is too large,
+                // should set the query's limit only.
+                // Check the query's reserve with expected limit.
+                if (query_ctx->adjusted_mem_limit() <
+                    query_ctx->get_mem_tracker()->consumption() + 
query_it->reserve_size_) {
+                    query_ctx->set_mem_limit(query_ctx->adjusted_mem_limit());
+                    query_ctx->set_memory_sufficient(true);
+                    LOG(INFO) << "Workload group memory reserve failed because 
"
+                              << query_ctx->debug_string() << " reserve size "
+                              << 
PrettyPrinter::print_bytes(query_it->reserve_size_)
+                              << " is too large, set hard limit to "
+                              << 
PrettyPrinter::print_bytes(query_ctx->adjusted_mem_limit())
+                              << " and resume running.";
+                    query_it = queries_list.erase(query_it);
+                    continue;
+                }
+                if (flushed_memtable_bytes <= 0) {
+                    flushed_memtable_bytes =
+                            flush_memtable_from_current_group_(wg, 
query_it->reserve_size_);
+                }
+                if (flushed_memtable_bytes > 0) {
+                    // Flushed some memtable, just wait flush finished and not 
do anything more.
+                    wg->enable_write_buffer_limit(true);
+                    ++query_it;
+                    continue;
+                }
+                if (!has_changed_hard_limit) {
+                    update_queries_limit_(wg, true);
+                    has_changed_hard_limit = true;
+                    LOG(INFO) << "Query: " << print_id(query_ctx->query_id()) 
<< " reserve memory("
+                              << 
PrettyPrinter::print_bytes(query_it->reserve_size_)
+                              << ") failed due to workload group memory 
exceed, "
+                                 "should set the workload group work in memory 
insufficent mode, "
+                                 "so that other query will reduce their 
memory."
+                              << " Query mem limit: "
+                              << 
PrettyPrinter::print_bytes(query_ctx->get_mem_limit())
+                              << " mem usage: "
+                              << PrettyPrinter::print_bytes(
+                                         
query_ctx->get_mem_tracker()->consumption())
+                              << ", wg: " << wg->debug_string();
+                }
+                if (wg->slot_memory_policy() == TWgSlotMemoryPolicy::NONE) {
+                    // If not enable slot memory policy, then should spill 
directly
+                    // Maybe there are another query that use too much memory, 
but we
+                    // not encourage not enable slot memory.
+                    // TODO should kill the query that exceed limit.
+                    bool spill_res = handle_single_query_(query_ctx, 
query_it->reserve_size_,
+                                                          
query_it->elapsed_time(),
+                                                          
query_ctx->paused_reason());
+                    if (!spill_res) {
+                        ++query_it;
+                        continue;
+                    } else {
+                        VLOG_DEBUG << "Query: " << 
print_id(query_ctx->query_id())
+                                   << " remove from paused list";
+                        query_it = queries_list.erase(query_it);
+                        continue;
+                    }
+                } else {
+                    // Should not put the query back to task scheduler 
immediately, because when wg's memory not sufficient,
+                    // and then set wg's flag, other query may not free memory 
very quickly.
+                    if (query_it->elapsed_time() > 
config::spill_in_paused_queue_timeout_ms) {
+                        // set wg's memory to insufficent, then add it back to 
task scheduler to run.
+                        LOG(INFO) << "Query: " << 
print_id(query_ctx->query_id())
+                                  << " will be resume.";
+                        query_ctx->set_memory_sufficient(true);
+                        query_it = queries_list.erase(query_it);
+                        continue;
+                    } else {
+                        ++query_it;
+                        continue;
+                    }
+                }
+            } else {
+                has_query_exceed_process_memlimit = true;
+                // If wg's memlimit not exceed, but process memory exceed, it 
means cache or other metadata
+                // used too much memory. Should clean all cache here.
+                // 1. Check cache used, if cache is larger than > 0, then just 
return and wait for it to 0 to release some memory.
+                if 
(doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted >
+                            0.05 &&
+                    
doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted >
+                            0.05) {
+                    
doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted =
+                            0.04;
+                    
doris::GlobalMemoryArbitrator::notify_cache_adjust_capacity();
+                    LOG(INFO) << "There are some queries need process memory, 
so that set cache "
+                                 "capacity "
+                                 "to 0 now";
+                }
+                // need to check config::disable_memory_gc here, if not, when 
config::disable_memory_gc == true,
+                // cache is not adjusted, query_it->cache_ratio_ will always 
be 1, and this if branch will nenver
+                // execute, this query will never be resumed, and will 
deadlock here
+                if ((!config::disable_memory_gc && query_it->cache_ratio_ < 
0.05) ||
+                    config::disable_memory_gc) {
+                    // 1. Check if could revoke some memory from memtable
+                    if (flushed_memtable_bytes <= 0) {
+                        flushed_memtable_bytes =
+                                flush_memtable_from_current_group_(wg, 
query_it->reserve_size_);
+                    }
+                    if (flushed_memtable_bytes > 0) {
+                        // Flushed some memtable, just wait flush finished and 
not do anything more.
+                        ++query_it;
+                        continue;
+                    }
+                    // TODO should wait here to check if the process has 
release revoked_size memory and then continue.
+                    if (!has_revoked_from_other_group) {
+                        int64_t revoked_size = revoke_memory_from_other_group_(
+                                query_ctx, wg->enable_memory_overcommit(), 
query_it->reserve_size_);
+                        if (revoked_size > 0) {
+                            has_revoked_from_other_group = true;
+                            query_ctx->set_memory_sufficient(true);
+                            VLOG_DEBUG << "Query: " << 
print_id(query_ctx->query_id())
+                                       << " is resumed after revoke memory 
from other group.";
+                            query_it = queries_list.erase(query_it);
+                            // Do not care if the revoked_size > reserve size, 
and try to run again.
+                            continue;
+                        } else {
+                            bool spill_res = handle_single_query_(
+                                    query_ctx, query_it->reserve_size_, 
query_it->elapsed_time(),
+                                    query_ctx->paused_reason());
+                            if (spill_res) {
+                                VLOG_DEBUG << "Query: " << 
print_id(query_ctx->query_id())
+                                           << " remove from paused list";
+                                query_it = queries_list.erase(query_it);
+                                continue;
+                            } else {
+                                ++query_it;
+                                continue;
+                            }
+                        }
+                    } else {
+                        // If any query is cancelled during process limit 
stage, should resume other query and
+                        // do not do any check now.
+                        query_ctx->set_memory_sufficient(true);
+                        VLOG_DEBUG << "Query: " << 
print_id(query_ctx->query_id())
+                                   << " remove from paused list";
+                        query_it = queries_list.erase(query_it);
+                        continue;
+                    }
+                }
+                if 
(doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted <
+                            0.05 &&
+                    query_it->cache_ratio_ > 0.05) {
+                    LOG(INFO) << "Query: " << print_id(query_ctx->query_id())
+                              << " will be resume after cache adjust.";
+                    query_ctx->set_memory_sufficient(true);
+                    query_it = queries_list.erase(query_it);
+                    continue;
+                }
+                ++query_it;
+            }
+        }
+
+        bool is_low_watermark = false;
+        bool is_high_watermark = false;
+        wg->check_mem_used(&is_low_watermark, &is_high_watermark);
+        // Not need waiting flush memtable and below low watermark disable 
load buffer limit
+        if (flushed_memtable_bytes <= 0 && !is_low_watermark) {
+            wg->enable_write_buffer_limit(false);
+        }
+
+        if (queries_list.empty()) {
+            it = _paused_queries_list.erase(it);
+            continue;
+        } else {
+            // Finished deal with one workload group, and should deal with 
next one.
+            ++it;
+        }
+    }
+
+    if (has_query_exceed_process_memlimit) {
+        // No query failed due to process exceed limit, so that enable cache 
now.
+        
doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted = 
1;
+    }
+}
+
+// Return the expected free bytes if memtable could flush
+int64_t WorkloadGroupMgr::flush_memtable_from_current_group_(WorkloadGroupPtr 
wg,
+                                                             int64_t 
need_free_mem) {
+    // If there are a lot of memtable memory, then wait them flush finished.
+    MemTableMemoryLimiter* memtable_limiter =
+            doris::ExecEnv::GetInstance()->memtable_memory_limiter();
+    int64_t memtable_active_bytes = 0;
+    int64_t memtable_queue_bytes = 0;
+    int64_t memtable_flush_bytes = 0;
+    DCHECK(memtable_limiter != nullptr) << "memtable limiter is nullptr";
+    memtable_limiter->get_workload_group_memtable_usage(
+            wg->id(), &memtable_active_bytes, &memtable_queue_bytes, 
&memtable_flush_bytes);
+    // TODO: should add a signal in memtable limiter to prevent new batch
+    // For example, streamload, it will not reserve many memory, but it will 
occupy many memtable memory.
+    // TODO: 0.2 should be a workload group properties. For example, the group 
is optimized for load,then the value
+    // should be larged, if the group is optimized for query, then the value 
should be smaller.
+    int64_t max_wg_memtable_bytes = wg->write_buffer_limit();
+    if (memtable_active_bytes + memtable_queue_bytes + memtable_flush_bytes >
+        max_wg_memtable_bytes) {
+        // There are many table in flush queue, just waiting them flush 
finished.
+        if (memtable_active_bytes < (int64_t)(max_wg_memtable_bytes * 0.6)) {
+            LOG_EVERY_T(INFO, 60) << wg->name()
+                                  << " load memtable size is: " << 
memtable_active_bytes << ", "
+                                  << memtable_queue_bytes << ", " << 
memtable_flush_bytes
+                                  << ", load buffer limit is: " << 
max_wg_memtable_bytes
+                                  << " wait for flush finished to release more 
memory";
+            return memtable_queue_bytes + memtable_flush_bytes;
+        } else {
+            // Flush some memtables(currently written) to flush queue.
+            memtable_limiter->flush_workload_group_memtables(
+                    wg->id(), memtable_active_bytes - 
(int64_t)(max_wg_memtable_bytes * 0.6));
+            LOG_EVERY_T(INFO, 60) << wg->name()
+                                  << " load memtable size is: " << 
memtable_active_bytes << ", "
+                                  << memtable_queue_bytes << ", " << 
memtable_flush_bytes
+                                  << ", flush some active memtable to revoke 
memory";
+            return memtable_queue_bytes + memtable_flush_bytes + 
memtable_active_bytes -
+                   (int64_t)(max_wg_memtable_bytes * 0.6);
+        }
+    }
+    return 0;
+}
+
+int64_t 
WorkloadGroupMgr::revoke_memory_from_other_group_(std::shared_ptr<QueryContext> 
requestor,

Review Comment:
   所以支持 overcommit 的 wg 内存 overcommited 后,只能是 process memory 超过 soft limit 
后,由其他 wg 的 query 去释放这个 overcommited 的 wg 对吧。
   
   这个调用关系看似合理,但会不会导致频繁调用呢,比如 100个暂停的 query 都去 revoke 其他 wg。
   类似的需求,我第一反应在 GC 线程里通过一定策略去 revoke overcommited 的 wg。



##########
be/src/runtime/workload_group/workload_group_manager.cpp:
##########
@@ -287,6 +257,642 @@ void WorkloadGroupMgr::refresh_workload_group_metrics() {
     }
 }
 
+void WorkloadGroupMgr::add_paused_query(const std::shared_ptr<QueryContext>& 
query_ctx,
+                                        int64_t reserve_size, const Status& 
status) {
+    DCHECK(query_ctx != nullptr);
+    query_ctx->update_paused_reason(status);
+    query_ctx->set_low_memory_mode();
+    query_ctx->set_memory_sufficient(false);
+    std::lock_guard<std::mutex> lock(_paused_queries_lock);
+    auto wg = query_ctx->workload_group();
+    auto&& [it, inserted] = _paused_queries_list[wg].emplace(
+            query_ctx, 
doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted,
+            doris::GlobalMemoryArbitrator::any_workload_group_exceed_limit, 
reserve_size);
+    // Check if this is an invalid reserve, for example, if the reserve size 
is too large, larger than the query limit
+    // if hard limit is enabled, then not need enable other queries hard limit.
+    if (inserted) {
+        LOG(INFO) << "Insert one new paused query: " << 
query_ctx->debug_string()
+                  << ", workload group: " << wg->debug_string();
+    }
+}
+
+/**
+ * Strategy 1: A revocable query should not have any running 
task(PipelineTask).
+ * strategy 2: If the workload group has any task exceed workload group 
memlimit, then set all queryctx's memlimit
+ * strategy 3: If any query exceed process memlimit, then should clear all 
caches.
+ * strategy 4: If any query exceed query's memlimit, then do spill disk or 
cancel it.
+ * strategy 5: If any query exceed process's memlimit and cache is zero, then 
do following:
+ */
+void WorkloadGroupMgr::handle_paused_queries() {
+    {
+        std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
+        for (auto& [wg_id, wg] : _workload_groups) {
+            std::unique_lock<std::mutex> lock(_paused_queries_lock);
+            if (_paused_queries_list[wg].empty()) {
+                // Add an empty set to wg that not contains paused queries.
+            }
+        }
+    }
+
+    std::unique_lock<std::mutex> lock(_paused_queries_lock);
+    bool has_revoked_from_other_group = false;
+    bool has_query_exceed_process_memlimit = false;
+    for (auto it = _paused_queries_list.begin(); it != 
_paused_queries_list.end();) {
+        auto& queries_list = it->second;
+        auto query_count = queries_list.size();
+        const auto& wg = it->first;
+
+        if (query_count != 0) {
+            LOG_EVERY_T(INFO, 1) << "Paused queries count of wg " << 
wg->name() << ": "
+                                 << query_count;
+        }
+
+        bool has_changed_hard_limit = false;
+        int64_t flushed_memtable_bytes = 0;
+        // If the query is paused because its limit exceed the query itself's 
memlimit, then just spill disk.
+        // The query's memlimit is set using slot mechanism and its value is 
set using the user settings, not
+        // by weighted value. So if reserve failed, then it is actually exceed 
limit.
+        for (auto query_it = queries_list.begin(); query_it != 
queries_list.end();) {
+            auto query_ctx = query_it->query_ctx_.lock();
+            // The query is finished during in paused list.
+            if (query_ctx == nullptr) {
+                LOG(INFO) << "Query: " << query_it->query_id() << " is 
nullptr, erase it.";
+                query_it = queries_list.erase(query_it);
+                continue;
+            }
+            if (query_ctx->is_cancelled()) {
+                LOG(INFO) << "Query: " << print_id(query_ctx->query_id())
+                          << " was canceled, remove from paused list";
+                query_it = queries_list.erase(query_it);
+                continue;
+            }
+
+            if 
(query_ctx->paused_reason().is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) {
+                // Streamload, kafka load, group commit will never have query 
memory exceeded error because
+                // their  query limit is very large.
+                bool spill_res =
+                        handle_single_query_(query_ctx, 
query_it->reserve_size_,
+                                             query_it->elapsed_time(), 
query_ctx->paused_reason());
+                if (!spill_res) {
+                    ++query_it;
+                    continue;
+                } else {
+                    VLOG_DEBUG << "Query: " << print_id(query_ctx->query_id())
+                               << " remove from paused list";
+                    query_it = queries_list.erase(query_it);
+                    continue;
+                }
+            } else if 
(query_ctx->paused_reason().is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) {
+                // Only deal with non overcommit workload group.
+                if (wg->enable_memory_overcommit()) {
+                    // Soft limit wg will only reserve failed when process 
limit exceed. But in some corner case,
+                    // when reserve, the wg is hard limit, the query reserve 
failed, but when this loop run
+                    // the wg is converted to soft limit.
+                    // So that should resume the query.
+                    LOG(WARNING)
+                            << "Query: " << print_id(query_ctx->query_id())
+                            << " reserve memory failed because exceed workload 
group memlimit, it "
+                               "should not happen, resume it again. paused 
reason: "
+                            << query_ctx->paused_reason();
+                    query_ctx->set_memory_sufficient(true);
+                    query_it = queries_list.erase(query_it);
+                    continue;
+                }
+                // check if the reserve is too large, if it is too large,
+                // should set the query's limit only.
+                // Check the query's reserve with expected limit.
+                if (query_ctx->adjusted_mem_limit() <
+                    query_ctx->get_mem_tracker()->consumption() + 
query_it->reserve_size_) {
+                    query_ctx->set_mem_limit(query_ctx->adjusted_mem_limit());
+                    query_ctx->set_memory_sufficient(true);
+                    LOG(INFO) << "Workload group memory reserve failed because 
"
+                              << query_ctx->debug_string() << " reserve size "
+                              << 
PrettyPrinter::print_bytes(query_it->reserve_size_)
+                              << " is too large, set hard limit to "
+                              << 
PrettyPrinter::print_bytes(query_ctx->adjusted_mem_limit())
+                              << " and resume running.";
+                    query_it = queries_list.erase(query_it);
+                    continue;
+                }
+                if (flushed_memtable_bytes <= 0) {
+                    flushed_memtable_bytes =
+                            flush_memtable_from_current_group_(wg, 
query_it->reserve_size_);
+                }
+                if (flushed_memtable_bytes > 0) {
+                    // Flushed some memtable, just wait flush finished and not 
do anything more.
+                    wg->enable_write_buffer_limit(true);
+                    ++query_it;
+                    continue;
+                }
+                if (!has_changed_hard_limit) {
+                    update_queries_limit_(wg, true);
+                    has_changed_hard_limit = true;
+                    LOG(INFO) << "Query: " << print_id(query_ctx->query_id()) 
<< " reserve memory("
+                              << 
PrettyPrinter::print_bytes(query_it->reserve_size_)
+                              << ") failed due to workload group memory 
exceed, "
+                                 "should set the workload group work in memory 
insufficent mode, "
+                                 "so that other query will reduce their 
memory."
+                              << " Query mem limit: "
+                              << 
PrettyPrinter::print_bytes(query_ctx->get_mem_limit())
+                              << " mem usage: "
+                              << PrettyPrinter::print_bytes(
+                                         
query_ctx->get_mem_tracker()->consumption())
+                              << ", wg: " << wg->debug_string();
+                }
+                if (wg->slot_memory_policy() == TWgSlotMemoryPolicy::NONE) {
+                    // If not enable slot memory policy, then should spill 
directly
+                    // Maybe there are another query that use too much memory, 
but we
+                    // not encourage not enable slot memory.
+                    // TODO should kill the query that exceed limit.
+                    bool spill_res = handle_single_query_(query_ctx, 
query_it->reserve_size_,
+                                                          
query_it->elapsed_time(),
+                                                          
query_ctx->paused_reason());
+                    if (!spill_res) {
+                        ++query_it;
+                        continue;
+                    } else {
+                        VLOG_DEBUG << "Query: " << 
print_id(query_ctx->query_id())
+                                   << " remove from paused list";
+                        query_it = queries_list.erase(query_it);
+                        continue;
+                    }
+                } else {
+                    // Should not put the query back to task scheduler 
immediately, because when wg's memory not sufficient,
+                    // and then set wg's flag, other query may not free memory 
very quickly.
+                    if (query_it->elapsed_time() > 
config::spill_in_paused_queue_timeout_ms) {
+                        // set wg's memory to insufficent, then add it back to 
task scheduler to run.
+                        LOG(INFO) << "Query: " << 
print_id(query_ctx->query_id())
+                                  << " will be resume.";
+                        query_ctx->set_memory_sufficient(true);
+                        query_it = queries_list.erase(query_it);
+                        continue;
+                    } else {
+                        ++query_it;
+                        continue;
+                    }
+                }
+            } else {
+                has_query_exceed_process_memlimit = true;
+                // If wg's memlimit not exceed, but process memory exceed, it 
means cache or other metadata
+                // used too much memory. Should clean all cache here.
+                // 1. Check cache used, if cache is larger than > 0, then just 
return and wait for it to 0 to release some memory.
+                if 
(doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted >
+                            0.05 &&
+                    
doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted >
+                            0.05) {
+                    
doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted =
+                            0.04;
+                    
doris::GlobalMemoryArbitrator::notify_cache_adjust_capacity();
+                    LOG(INFO) << "There are some queries need process memory, 
so that set cache "
+                                 "capacity "
+                                 "to 0 now";
+                }
+                // need to check config::disable_memory_gc here, if not, when 
config::disable_memory_gc == true,
+                // cache is not adjusted, query_it->cache_ratio_ will always 
be 1, and this if branch will nenver
+                // execute, this query will never be resumed, and will 
deadlock here
+                if ((!config::disable_memory_gc && query_it->cache_ratio_ < 
0.05) ||
+                    config::disable_memory_gc) {
+                    // 1. Check if could revoke some memory from memtable
+                    if (flushed_memtable_bytes <= 0) {
+                        flushed_memtable_bytes =
+                                flush_memtable_from_current_group_(wg, 
query_it->reserve_size_);
+                    }
+                    if (flushed_memtable_bytes > 0) {
+                        // Flushed some memtable, just wait flush finished and 
not do anything more.
+                        ++query_it;
+                        continue;
+                    }
+                    // TODO should wait here to check if the process has 
release revoked_size memory and then continue.
+                    if (!has_revoked_from_other_group) {
+                        int64_t revoked_size = revoke_memory_from_other_group_(
+                                query_ctx, wg->enable_memory_overcommit(), 
query_it->reserve_size_);
+                        if (revoked_size > 0) {
+                            has_revoked_from_other_group = true;
+                            query_ctx->set_memory_sufficient(true);
+                            VLOG_DEBUG << "Query: " << 
print_id(query_ctx->query_id())
+                                       << " is resumed after revoke memory 
from other group.";
+                            query_it = queries_list.erase(query_it);
+                            // Do not care if the revoked_size > reserve size, 
and try to run again.
+                            continue;
+                        } else {
+                            bool spill_res = handle_single_query_(
+                                    query_ctx, query_it->reserve_size_, 
query_it->elapsed_time(),
+                                    query_ctx->paused_reason());
+                            if (spill_res) {
+                                VLOG_DEBUG << "Query: " << 
print_id(query_ctx->query_id())
+                                           << " remove from paused list";
+                                query_it = queries_list.erase(query_it);
+                                continue;
+                            } else {
+                                ++query_it;
+                                continue;
+                            }
+                        }
+                    } else {
+                        // If any query is cancelled during process limit 
stage, should resume other query and
+                        // do not do any check now.
+                        query_ctx->set_memory_sufficient(true);
+                        VLOG_DEBUG << "Query: " << 
print_id(query_ctx->query_id())
+                                   << " remove from paused list";
+                        query_it = queries_list.erase(query_it);
+                        continue;
+                    }
+                }
+                if 
(doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted <
+                            0.05 &&
+                    query_it->cache_ratio_ > 0.05) {
+                    LOG(INFO) << "Query: " << print_id(query_ctx->query_id())
+                              << " will be resume after cache adjust.";
+                    query_ctx->set_memory_sufficient(true);
+                    query_it = queries_list.erase(query_it);
+                    continue;
+                }
+                ++query_it;
+            }
+        }
+
+        bool is_low_watermark = false;
+        bool is_high_watermark = false;
+        wg->check_mem_used(&is_low_watermark, &is_high_watermark);
+        // Not need waiting flush memtable and below low watermark disable 
load buffer limit
+        if (flushed_memtable_bytes <= 0 && !is_low_watermark) {
+            wg->enable_write_buffer_limit(false);
+        }
+
+        if (queries_list.empty()) {
+            it = _paused_queries_list.erase(it);
+            continue;
+        } else {
+            // Finished deal with one workload group, and should deal with 
next one.
+            ++it;
+        }
+    }
+
+    if (has_query_exceed_process_memlimit) {
+        // No query failed due to process exceed limit, so that enable cache 
now.
+        
doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted = 
1;
+    }
+}
+
+// Return the expected free bytes if memtable could flush
+int64_t WorkloadGroupMgr::flush_memtable_from_current_group_(WorkloadGroupPtr 
wg,
+                                                             int64_t 
need_free_mem) {
+    // If there are a lot of memtable memory, then wait them flush finished.
+    MemTableMemoryLimiter* memtable_limiter =
+            doris::ExecEnv::GetInstance()->memtable_memory_limiter();
+    int64_t memtable_active_bytes = 0;
+    int64_t memtable_queue_bytes = 0;
+    int64_t memtable_flush_bytes = 0;
+    DCHECK(memtable_limiter != nullptr) << "memtable limiter is nullptr";
+    memtable_limiter->get_workload_group_memtable_usage(
+            wg->id(), &memtable_active_bytes, &memtable_queue_bytes, 
&memtable_flush_bytes);
+    // TODO: should add a signal in memtable limiter to prevent new batch
+    // For example, streamload, it will not reserve many memory, but it will 
occupy many memtable memory.
+    // TODO: 0.2 should be a workload group properties. For example, the group 
is optimized for load,then the value
+    // should be larged, if the group is optimized for query, then the value 
should be smaller.
+    int64_t max_wg_memtable_bytes = wg->write_buffer_limit();
+    if (memtable_active_bytes + memtable_queue_bytes + memtable_flush_bytes >
+        max_wg_memtable_bytes) {
+        // There are many table in flush queue, just waiting them flush 
finished.
+        if (memtable_active_bytes < (int64_t)(max_wg_memtable_bytes * 0.6)) {
+            LOG_EVERY_T(INFO, 60) << wg->name()
+                                  << " load memtable size is: " << 
memtable_active_bytes << ", "
+                                  << memtable_queue_bytes << ", " << 
memtable_flush_bytes
+                                  << ", load buffer limit is: " << 
max_wg_memtable_bytes
+                                  << " wait for flush finished to release more 
memory";
+            return memtable_queue_bytes + memtable_flush_bytes;
+        } else {
+            // Flush some memtables(currently written) to flush queue.
+            memtable_limiter->flush_workload_group_memtables(
+                    wg->id(), memtable_active_bytes - 
(int64_t)(max_wg_memtable_bytes * 0.6));
+            LOG_EVERY_T(INFO, 60) << wg->name()
+                                  << " load memtable size is: " << 
memtable_active_bytes << ", "
+                                  << memtable_queue_bytes << ", " << 
memtable_flush_bytes
+                                  << ", flush some active memtable to revoke 
memory";
+            return memtable_queue_bytes + memtable_flush_bytes + 
memtable_active_bytes -
+                   (int64_t)(max_wg_memtable_bytes * 0.6);
+        }
+    }
+    return 0;
+}
+
+int64_t 
WorkloadGroupMgr::revoke_memory_from_other_group_(std::shared_ptr<QueryContext> 
requestor,
+                                                          bool hard_limit, 
int64_t need_free_mem) {
+    int64_t total_freed_mem = 0;
+    std::unique_ptr<RuntimeProfile> profile = 
std::make_unique<RuntimeProfile>("RevokeMemory");
+    // 1. memtable like memory
+    // 2. query exceed workload group limit
+    int64_t freed_mem = revoke_overcommited_memory_(requestor, need_free_mem, 
profile.get());
+    total_freed_mem += freed_mem;
+    // The revoke process may kill current requestor, so should return now.
+    if (need_free_mem - total_freed_mem < 0 || requestor->is_cancelled()) {
+        return total_freed_mem;
+    }
+    if (hard_limit) {
+        freed_mem = cancel_top_query_in_overcommit_group_(need_free_mem - 
total_freed_mem,
+                                                          
doris::QUERY_MIN_MEMORY, profile.get());
+    } else {
+        freed_mem = cancel_top_query_in_overcommit_group_(
+                need_free_mem - total_freed_mem, 
requestor->get_mem_tracker()->consumption(),
+                profile.get());
+    }
+    total_freed_mem += freed_mem;
+    // The revoke process may kill current requestor, so should return now.
+    if (need_free_mem - total_freed_mem < 0 || requestor->is_cancelled()) {
+        return total_freed_mem;
+    }
+    return total_freed_mem;
+}
+
+// Revoke memory from workload group that exceed it's limit. For example, if 
the wg's limit is 10g, but used 12g
+// then should revoke 2g from the group.
+int64_t 
WorkloadGroupMgr::revoke_overcommited_memory_(std::shared_ptr<QueryContext> 
requestor,
+                                                      int64_t need_free_mem,
+                                                      RuntimeProfile* profile) 
{
+    int64_t total_freed_mem = 0;
+    // 1. check memtable usage, and try to free them.
+    int64_t freed_mem = 
revoke_memtable_from_overcommited_groups_(need_free_mem, profile);
+    total_freed_mem += freed_mem;
+    // The revoke process may kill current requestor, so should return now.
+    if (need_free_mem - total_freed_mem < 0 || requestor->is_cancelled()) {
+        return total_freed_mem;
+    }
+    // 2. Cancel top usage query, one by one
+    using WorkloadGroupMem = std::pair<WorkloadGroupPtr, int64_t>;
+    auto cmp = [](WorkloadGroupMem left, WorkloadGroupMem right) {
+        return left.second < right.second;
+    };
+    std::priority_queue<WorkloadGroupMem, std::vector<WorkloadGroupMem>, 
decltype(cmp)> heap(cmp);
+    {
+        std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
+        for (auto iter = _workload_groups.begin(); iter != 
_workload_groups.end(); iter++) {
+            if (requestor->workload_group() != nullptr &&
+                iter->second->id() == requestor->workload_group()->id()) {
+                continue;
+            }
+            heap.emplace(iter->second, iter->second->memory_used());
+        }
+    }
+    while (!heap.empty() && need_free_mem - total_freed_mem > 0 && 
!requestor->is_cancelled()) {
+        auto [wg, sort_mem] = heap.top();
+        heap.pop();
+        freed_mem = wg->free_overcommited_memory(need_free_mem - 
total_freed_mem, profile);
+        total_freed_mem += freed_mem;
+    }
+    return total_freed_mem;
+}
+
+// If the memtable is too large, then flush them and wait for finished.
+int64_t WorkloadGroupMgr::revoke_memtable_from_overcommited_groups_(int64_t 
need_free_mem,
+                                                                    
RuntimeProfile* profile) {
+    return 0;
+}
+
+// 1. Sort all memory limiter in all overcommit wg, and cancel the top usage 
task that with most memory.
+// 2. Maybe not valid because it's memory not exceed limit.
+int64_t WorkloadGroupMgr::cancel_top_query_in_overcommit_group_(int64_t 
need_free_mem,

Review Comment:
   TODO:我后面会实现,但如上所说,revoke overcommited wg 的触发位置有待商榷



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to