This is an automated email from the ASF dual-hosted git repository.

mrhhsg pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 4204f49ce4e3acec2383b438cbf4c63cc87e93c1
Author: yiguolei <676222...@qq.com>
AuthorDate: Fri Oct 25 18:10:47 2024 +0800

    block memtable when memory is not enough (#42418)
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
    
    ---------
    
    Co-authored-by: yiguolei <yiguo...@gmail.com>
    Co-authored-by: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
---
 be/src/olap/memtable.cpp                           |   1 +
 be/src/olap/memtable_memory_limiter.cpp            |  56 ++-
 be/src/olap/memtable_memory_limiter.h              |   9 +-
 be/src/pipeline/exec/file_scan_operator.cpp        |   2 +-
 be/src/runtime/load_channel.h                      |   2 +
 be/src/runtime/load_channel_mgr.cpp                |   3 +-
 be/src/runtime/memory/mem_tracker_limiter.cpp      |  15 +
 be/src/runtime/memory/mem_tracker_limiter.h        |  26 +-
 be/src/runtime/memory/thread_mem_tracker_mgr.h     |  18 +-
 be/src/runtime/query_context.cpp                   |   1 +
 be/src/runtime/query_context.h                     |   7 +-
 be/src/runtime/runtime_state.cpp                   |   4 +
 be/src/runtime/runtime_state.h                     |   3 +
 be/src/runtime/workload_group/workload_group.cpp   | 145 ++++++--
 be/src/runtime/workload_group/workload_group.h     |  49 +--
 .../workload_group/workload_group_manager.cpp      | 406 +++++++++++----------
 .../workload_group/workload_group_manager.h        |  20 +-
 be/src/vec/sink/writer/vtablet_writer_v2.cpp       |   3 +-
 .../java/org/apache/doris/qe/SessionVariable.java  |  14 +-
 .../resource/workloadgroup/WorkloadGroup.java      |  51 ++-
 gensrc/thrift/BackendService.thrift                |   1 +
 gensrc/thrift/PaloInternalService.thrift           |   2 +-
 22 files changed, 523 insertions(+), 315 deletions(-)

diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index e0f19b1624d..facbc90c450 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -78,6 +78,7 @@ MemTable::MemTable(int64_t tablet_id, 
std::shared_ptr<TabletSchema> tablet_schem
     // TODO: Support ZOrderComparator in the future
     _init_columns_offset_by_slot_descs(slot_descs, tuple_desc);
     _mem_tracker = std::make_shared<MemTracker>();
+    _query_thread_context.query_mem_tracker->push_load_buffer(_mem_tracker);
 }
 
 void MemTable::_init_columns_offset_by_slot_descs(const 
std::vector<SlotDescriptor*>* slot_descs,
diff --git a/be/src/olap/memtable_memory_limiter.cpp 
b/be/src/olap/memtable_memory_limiter.cpp
index 7a220ef87ac..2071f814768 100644
--- a/be/src/olap/memtable_memory_limiter.cpp
+++ b/be/src/olap/memtable_memory_limiter.cpp
@@ -111,6 +111,21 @@ int64_t MemTableMemoryLimiter::_need_flush() {
     return need_flush - _queue_mem_usage;
 }
 
+void 
MemTableMemoryLimiter::handle_workload_group_memtable_flush(WorkloadGroupPtr 
wg) {
+    // It means some query is pending on here to flush memtable and to 
continue running.
+    // So that should wait here.
+    // Wait at most 1s, because this code is not aware cancel flag. If the 
load task is cancelled
+    // Should releae memory quickly.
+    using namespace std::chrono_literals;
+    int32_t sleep_times = 10;
+    while (wg != nullptr && wg->enable_load_buffer_limit() && sleep_times > 0) 
{
+        std::this_thread::sleep_for(100ms);
+        --sleep_times;
+    }
+    // Check process memory again.
+    handle_memtable_flush();
+}
+
 void MemTableMemoryLimiter::handle_memtable_flush() {
     // Check the soft limit.
     DCHECK(_load_soft_mem_limit > 0);
@@ -150,20 +165,39 @@ void MemTableMemoryLimiter::handle_memtable_flush() {
     LOG(INFO) << "waited " << time_ms << " ms for memtable memory limit";
 }
 
-void MemTableMemoryLimiter::flush_workload_group_memtables(uint64_t wg_id,
-                                                           int64_t 
need_flush_bytes) {
+int64_t MemTableMemoryLimiter::flush_workload_group_memtables(uint64_t wg_id, 
int64_t need_flush) {
     std::unique_lock<std::mutex> l(_lock);
-    _flush_active_memtables(wg_id, need_flush_bytes);
+    return _flush_active_memtables(wg_id, need_flush);
+}
+
+void MemTableMemoryLimiter::get_workload_group_memtable_usage(uint64_t wg_id, 
int64_t* active_bytes,
+                                                              int64_t* 
queue_bytes,
+                                                              int64_t* 
flush_bytes) {
+    std::unique_lock<std::mutex> l(_lock);
+    *active_bytes = 0;
+    *queue_bytes = 0;
+    *flush_bytes = 0;
+    for (auto it = _writers.begin(); it != _writers.end(); ++it) {
+        if (auto writer = it->lock()) {
+            // If wg id is specified, but wg id not match, then not need flush
+            if (writer->workload_group_id() != wg_id) {
+                continue;
+            }
+            *active_bytes += writer->active_memtable_mem_consumption();
+            *queue_bytes += writer->mem_consumption(MemType::WRITE_FINISHED);
+            *flush_bytes += writer->mem_consumption(MemType::FLUSH);
+        }
+    }
 }
 
-void MemTableMemoryLimiter::_flush_active_memtables(uint64_t wg_id, int64_t 
need_flush) {
+int64_t MemTableMemoryLimiter::_flush_active_memtables(uint64_t wg_id, int64_t 
need_flush) {
     if (need_flush <= 0) {
-        return;
+        return 0;
     }
 
     _refresh_mem_tracker();
     if (_active_writers.size() == 0) {
-        return;
+        return 0;
     }
 
     using WriterMem = std::pair<std::weak_ptr<MemTableWriter>, int64_t>;
@@ -211,6 +245,7 @@ void 
MemTableMemoryLimiter::_flush_active_memtables(uint64_t wg_id, int64_t need
     }
     LOG(INFO) << "flushed " << num_flushed << " out of " << 
_active_writers.size()
               << " active writers, flushed size: " << 
PrettyPrinter::print_bytes(mem_flushed);
+    return mem_flushed;
 }
 
 void MemTableMemoryLimiter::refresh_mem_tracker() {
@@ -245,29 +280,20 @@ void MemTableMemoryLimiter::_refresh_mem_tracker() {
     _flush_mem_usage = 0;
     _queue_mem_usage = 0;
     _active_mem_usage = 0;
-    std::map<uint64_t, doris::MemtableUsage> wg_mem_usages;
     _active_writers.clear();
     for (auto it = _writers.begin(); it != _writers.end();) {
         if (auto writer = it->lock()) {
-            if (wg_mem_usages.find(writer->workload_group_id()) == 
wg_mem_usages.end()) {
-                wg_mem_usages.insert({writer->workload_group_id(), {0, 0, 0}});
-            }
-            auto& wg_mem_usage = 
wg_mem_usages.find(writer->workload_group_id())->second;
-
             // The memtable is currently used by writer to insert blocks.
             auto active_usage = writer->active_memtable_mem_consumption();
-            wg_mem_usage.active_mem_usage += active_usage;
             _active_mem_usage += active_usage;
             if (active_usage > 0) {
                 _active_writers.push_back(writer);
             }
 
             auto flush_usage = writer->mem_consumption(MemType::FLUSH);
-            wg_mem_usage.flush_mem_usage += flush_usage;
             _flush_mem_usage += flush_usage;
 
             auto write_usage = 
writer->mem_consumption(MemType::WRITE_FINISHED);
-            wg_mem_usage.queue_mem_usage += write_usage;
             _queue_mem_usage += write_usage;
             ++it;
         } else {
diff --git a/be/src/olap/memtable_memory_limiter.h 
b/be/src/olap/memtable_memory_limiter.h
index 143edaa8fe5..de2fb802165 100644
--- a/be/src/olap/memtable_memory_limiter.h
+++ b/be/src/olap/memtable_memory_limiter.h
@@ -21,6 +21,7 @@
 
 #include "common/status.h"
 #include "runtime/memory/mem_tracker.h"
+#include "runtime/workload_group/workload_group.h"
 #include "util/countdown_latch.h"
 #include "util/stopwatch.hpp"
 
@@ -37,13 +38,17 @@ public:
 
     Status init(int64_t process_mem_limit);
 
+    void handle_workload_group_memtable_flush(WorkloadGroupPtr wg);
     // check if the total mem consumption exceeds limit.
     // If yes, it will flush memtable to try to reduce memory consumption.
     // Every write operation will call this API to check if need flush 
memtable OR hang
     // when memory is not available.
     void handle_memtable_flush();
 
-    void flush_workload_group_memtables(uint64_t wg_id, int64_t 
need_flush_bytes);
+    int64_t flush_workload_group_memtables(uint64_t wg_id, int64_t 
need_flush_bytes);
+
+    void get_workload_group_memtable_usage(uint64_t wg_id, int64_t* 
active_bytes,
+                                           int64_t* queue_bytes, int64_t* 
flush_bytes);
 
     void register_writer(std::weak_ptr<MemTableWriter> writer);
 
@@ -61,7 +66,7 @@ private:
     bool _hard_limit_reached();
     bool _load_usage_low();
     int64_t _need_flush();
-    void _flush_active_memtables(uint64_t wg_id, int64_t need_flush);
+    int64_t _flush_active_memtables(uint64_t wg_id, int64_t need_flush);
     void _refresh_mem_tracker();
 
     std::mutex _lock;
diff --git a/be/src/pipeline/exec/file_scan_operator.cpp 
b/be/src/pipeline/exec/file_scan_operator.cpp
index 1c8db8b446a..1571b585545 100644
--- a/be/src/pipeline/exec/file_scan_operator.cpp
+++ b/be/src/pipeline/exec/file_scan_operator.cpp
@@ -63,7 +63,7 @@ void FileScanLocalState::set_scan_ranges(RuntimeState* state,
     auto wg_ptr = state->get_query_ctx()->workload_group();
     _max_scanners =
             config::doris_scanner_thread_pool_thread_num / 
state->query_parallel_instance_num();
-    if (wg_ptr && state->get_query_ctx()->enable_query_slot_hard_limit()) {
+    if (wg_ptr && !state->get_query_ctx()->enable_mem_overcommit()) {
         const auto total_slots = wg_ptr->total_query_slot_count();
         const auto query_slots = state->get_query_ctx()->get_slot_count();
         _max_scanners = _max_scanners * query_slots / total_slots;
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index 36a8f363ba9..c9ed66f8c74 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -69,6 +69,8 @@ public:
 
     bool is_high_priority() const { return _is_high_priority; }
 
+    WorkloadGroupPtr workload_group() const { return 
_query_thread_context.wg_wptr.lock(); }
+
     RuntimeProfile::Counter* get_mgr_add_batch_timer() { return 
_mgr_add_batch_timer; }
     RuntimeProfile::Counter* get_handle_mem_limit_timer() { return 
_handle_mem_limit_timer; }
 
diff --git a/be/src/runtime/load_channel_mgr.cpp 
b/be/src/runtime/load_channel_mgr.cpp
index c53cade466b..55db6564488 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -150,7 +150,8 @@ Status LoadChannelMgr::add_batch(const 
PTabletWriterAddBlockRequest& request,
         // If this is a high priority load task, do not handle this.
         // because this may block for a while, which may lead to rpc timeout.
         SCOPED_TIMER(channel->get_handle_mem_limit_timer());
-        
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush();
+        
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_workload_group_memtable_flush(
+                channel->workload_group());
     }
 
     // 3. add batch to load channel
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp 
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 5c79e0d37a6..487b7d48fae 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -257,6 +257,21 @@ void MemTrackerLimiter::clean_tracker_limiter_group() {
 #endif
 }
 
+void MemTrackerLimiter::update_load_buffer_size() {
+    std::lock_guard l(_load_buffer_lock);
+    int64_t total_buf_size = 0;
+    for (auto memtable_tracker = _load_buffers.begin(); memtable_tracker != 
_load_buffers.end();) {
+        auto m = memtable_tracker->lock();
+        if (m == nullptr) {
+            memtable_tracker = _load_buffers.erase(memtable_tracker);
+        } else {
+            total_buf_size += m->consumption();
+            ++memtable_tracker;
+        }
+    }
+    _load_buffer_size = total_buf_size;
+}
+
 void MemTrackerLimiter::make_type_trackers_profile(RuntimeProfile* profile,
                                                    MemTrackerLimiter::Type 
type) {
     if (type == Type::GLOBAL) {
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h 
b/be/src/runtime/memory/mem_tracker_limiter.h
index bca16290c17..6f0ecdbe975 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -35,6 +35,7 @@
 #include "common/config.h"
 #include "common/status.h"
 #include "runtime/memory/mem_counter.h"
+#include "runtime/memory/mem_tracker.h"
 #include "runtime/query_statistics.h"
 #include "util/string_util.h"
 #include "util/uid_util.h"
@@ -45,6 +46,7 @@ class RuntimeProfile;
 class MemTrackerLimiter;
 
 constexpr size_t MEM_TRACKER_GROUP_NUM = 1000;
+constexpr size_t QUERY_MIN_MEMORY = 32 * 1024 * 1024;
 
 struct TrackerLimiterGroup {
     // Note! in order to enable ExecEnv::mem_tracker_limiter_pool support 
resize,
@@ -132,6 +134,7 @@ public:
     ~MemTrackerLimiter();
 
     Type type() const { return _type; }
+    void set_overcommit(bool enable) { _enable_overcommit = enable; }
     const std::string& label() const { return _label; }
     std::shared_ptr<QueryStatistics> get_query_statistics() { return 
_query_statistics; }
     int64_t group_num() const { return _group_num; }
@@ -141,7 +144,6 @@ public:
     Status check_limit(int64_t bytes = 0);
     // Log the memory usage when memory limit is exceeded.
     std::string tracker_limit_exceeded_str();
-    bool is_overcommit_tracker() const { return type() == Type::QUERY || 
type() == Type::LOAD; }
     void set_limit(int64_t new_mem_limit) { _limit = new_mem_limit; }
     bool is_query_cancelled() { return _is_query_cancelled; }
     void set_is_query_cancelled(bool is_cancelled) { 
_is_query_cancelled.store(is_cancelled); }
@@ -208,7 +210,8 @@ public:
         if (UNLIKELY(bytes == 0)) {
             return true;
         }
-        bool rt = _mem_counter.try_add(bytes, _limit);
+        // Reserve will check limit, should ignore load buffer size.
+        bool rt = _mem_counter.try_add(bytes - _load_buffer_size, _limit);
         if (rt && _query_statistics) {
             _query_statistics->set_max_peak_memory_bytes(peak_consumption());
             _query_statistics->set_current_used_memory_bytes(consumption());
@@ -234,6 +237,15 @@ public:
     static void make_top_consumption_tasks_tracker_profile(RuntimeProfile* 
profile, int top_num);
     static void make_all_tasks_tracker_profile(RuntimeProfile* profile);
 
+    void push_load_buffer(std::shared_ptr<MemTracker> memtable_tracker) {
+        std::lock_guard l(_load_buffer_lock);
+        _load_buffers.push_back(memtable_tracker);
+    }
+
+    void update_load_buffer_size();
+
+    int64_t load_buffer_size() const { return _load_buffer_size; }
+
     void print_log_usage(const std::string& msg);
     void enable_print_log_usage() { _enable_print_log_usage = true; }
 
@@ -300,6 +312,7 @@ private:
     */
 
     Type _type;
+    bool _enable_overcommit = true;
 
     // label used in the make snapshot, not guaranteed unique.
     std::string _label;
@@ -327,6 +340,10 @@ private:
 
     std::shared_ptr<QueryStatistics> _query_statistics = nullptr;
 
+    std::mutex _load_buffer_lock;
+    std::vector<std::weak_ptr<MemTracker>> _load_buffers;
+    std::atomic<int64_t> _load_buffer_size = 0;
+
     struct AddressSanitizer {
         size_t size;
         std::string stack_trace;
@@ -356,10 +373,11 @@ inline void MemTrackerLimiter::cache_consume(int64_t 
bytes) {
 }
 
 inline Status MemTrackerLimiter::check_limit(int64_t bytes) {
-    if (bytes <= 0 || (is_overcommit_tracker() && 
config::enable_query_memory_overcommit)) {
+    if (bytes <= 0 || _enable_overcommit) {
         return Status::OK();
     }
-    if (_limit > 0 && consumption() + bytes > _limit) {
+    // check limit should ignore memtable size, because it is treated as a 
cache
+    if (_limit > 0 && consumption() - _load_buffer_size + bytes > _limit) {
         return Status::MemoryLimitExceeded(fmt::format("failed alloc size {}, 
{}",
                                                        
MemCounter::print_bytes(bytes),
                                                        
tracker_limit_exceeded_str()));
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h 
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index 9a316032bc9..83caf753aed 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -286,21 +286,6 @@ inline doris::Status 
ThreadMemTrackerMgr::try_reserve(int64_t size) {
     // _untracked_mem store bytes that not synchronized to process reserved 
memory.
     flush_untracked_mem();
     auto wg_ptr = _wg_wptr.lock();
-    // For wg with overcommit, the limit will only task affect when memory > 
soft limit
-    // wg mgr will change wg's hard limit property.
-    if (wg_ptr != nullptr && wg_ptr->enable_memory_overcommit() &&
-        !wg_ptr->has_changed_to_hard_limit()) {
-        // Only do a check here, do not real reserve. If we could reserve it, 
it is better, but the logic is too complicated.
-        if (!doris::GlobalMemoryArbitrator::try_reserve_process_memory(size)) {
-            return doris::Status::Error<ErrorCode::PROCESS_MEMORY_EXCEEDED>(
-                    "reserve memory failed, size: {}, because {}",
-                    PrettyPrinter::print(size, TUnit::BYTES),
-                    GlobalMemoryArbitrator::process_mem_log_str());
-        } else {
-            
doris::GlobalMemoryArbitrator::release_process_reserved_memory(size);
-            return Status::OK();
-        }
-    }
     if (!_limiter_tracker->try_reserve(size)) {
         auto err_msg = fmt::format(
                 "reserve memory failed, size: {}, because query memory 
exceeded, memory tracker "
@@ -313,7 +298,8 @@ inline doris::Status 
ThreadMemTrackerMgr::try_reserve(int64_t size) {
     if (wg_ptr) {
         if (!wg_ptr->add_wg_refresh_interval_memory_growth(size)) {
             auto err_msg = fmt::format(
-                    "reserve memory failed, size: {}, because wg memory 
exceeded, wg info: {}",
+                    "reserve memory failed, size: {}, because workload group 
memory exceeded, "
+                    "workload group: {}",
                     PrettyPrinter::print(size, TUnit::BYTES), 
wg_ptr->memory_debug_string());
             _limiter_tracker->release(size);          // rollback
             _limiter_tracker->release_reserved(size); // rollback
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index b097ba91ee4..84b4cbc182d 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -146,6 +146,7 @@ void QueryContext::_init_query_mem_tracker() {
     if (_query_options.__isset.is_report_success && 
_query_options.is_report_success) {
         query_mem_tracker->enable_print_log_usage();
     }
+    query_mem_tracker->set_overcommit(enable_mem_overcommit());
     _user_set_mem_limit = bytes_limit;
 }
 
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 746df124b07..04c32535100 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -253,10 +253,9 @@ public:
         return _query_options.__isset.query_slot_count ? 
_query_options.query_slot_count : 1;
     }
 
-    bool enable_query_slot_hard_limit() const {
-        return _query_options.__isset.enable_query_slot_hard_limit
-                       ? _query_options.enable_query_slot_hard_limit
-                       : false;
+    bool enable_mem_overcommit() const {
+        return _query_options.__isset.enable_mem_overcommit ? 
_query_options.enable_mem_overcommit
+                                                            : false;
     }
     DescriptorTbl* desc_tbl = nullptr;
     bool set_rsc_info = false;
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index e3f9d075c8f..4277173f1b7 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -304,6 +304,10 @@ std::shared_ptr<MemTrackerLimiter> 
RuntimeState::query_mem_tracker() const {
     return _query_mem_tracker;
 }
 
+WorkloadGroupPtr RuntimeState::workload_group() {
+    return _query_ctx->workload_group();
+}
+
 bool RuntimeState::log_error(const std::string& error) {
     std::lock_guard<std::mutex> l(_error_log_lock);
 
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index bdc9a102fd6..31e1d378526 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -45,6 +45,7 @@
 #include "io/fs/file_system.h"
 #include "io/fs/s3_file_system.h"
 #include "runtime/task_execution_context.h"
+#include "runtime/workload_group/workload_group.h"
 #include "util/debug_util.h"
 #include "util/runtime_profile.h"
 #include "vec/columns/columns_number.h"
@@ -450,6 +451,8 @@ public:
 
     QueryContext* get_query_ctx() { return _query_ctx; }
 
+    WorkloadGroupPtr workload_group();
+
     void set_query_mem_tracker(const std::shared_ptr<MemTrackerLimiter>& 
tracker) {
         _query_mem_tracker = tracker;
     }
diff --git a/be/src/runtime/workload_group/workload_group.cpp 
b/be/src/runtime/workload_group/workload_group.cpp
index ba6681eb41a..2cc232e97df 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -51,12 +51,14 @@ const static int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50;
 const static int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80;
 // This is a invalid value, and should ignore this value during usage
 const static int TOTAL_QUERY_SLOT_COUNT_DEFAULT_VALUE = 0;
+const static int LOAD_BUFFER_RATIO_DEFAULT_VALUE = 20;
 
 WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info)
         : _id(tg_info.id),
           _name(tg_info.name),
           _version(tg_info.version),
           _memory_limit(tg_info.memory_limit),
+          _load_buffer_ratio(tg_info.load_buffer_ratio),
           _enable_memory_overcommit(tg_info.enable_memory_overcommit),
           _cpu_share(tg_info.cpu_share),
           _mem_tracker_limiter_pool(MEM_TRACKER_GROUP_NUM),
@@ -83,29 +85,25 @@ WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& 
tg_info)
             std::make_unique<bvar::Adder<size_t>>(_name, 
"total_local_read_bytes");
     _total_local_scan_io_per_second = 
std::make_unique<bvar::PerSecond<bvar::Adder<size_t>>>(
             _name, "total_local_read_bytes_per_second", 
_total_local_scan_io_adder.get(), 1);
-    _load_buffer_limit = (int64_t)(_memory_limit * 0.2);
-    // Its initial value should equal to memory limit, or it will be 0 and all 
reserve memory request will failed.
-    _weighted_memory_limit = _memory_limit;
 }
 
 std::string WorkloadGroup::debug_string() const {
     std::shared_lock<std::shared_mutex> rl {_mutex};
     auto realtime_total_mem_used = _total_mem_used + 
_wg_refresh_interval_memory_growth.load();
-    auto mem_used_ratio = realtime_total_mem_used / 
((double)_weighted_memory_limit + 1);
+    auto mem_used_ratio = realtime_total_mem_used / ((double)_memory_limit + 
1);
     return fmt::format(
             "WorkloadGroup[id = {}, name = {}, version = {}, cpu_share = {}, "
             "total_query_slot_count={}, "
-            "memory_limit = {}, "
-            "enable_memory_overcommit = {},  weighted_memory_limit = {}, 
total_mem_used = {},"
+            "memory_limit = {}, load_buffer_ratio= {}%"
+            "enable_memory_overcommit = {}, total_mem_used = {},"
             "wg_refresh_interval_memory_growth = {},  mem_used_ratio = {}, 
spill_low_watermark = "
             "{}, spill_high_watermark = {},cpu_hard_limit = {}, 
scan_thread_num = "
             "{}, max_remote_scan_thread_num = {}, min_remote_scan_thread_num = 
{}, "
             "is_shutdown={}, query_num={}, "
             "read_bytes_per_second={}, remote_read_bytes_per_second={}]",
             _id, _name, _version, cpu_share(), _total_query_slot_count,
-            PrettyPrinter::print(_memory_limit, TUnit::BYTES),
+            PrettyPrinter::print(_memory_limit, TUnit::BYTES), 
_load_buffer_ratio,
             _enable_memory_overcommit ? "true" : "false",
-            PrettyPrinter::print(_weighted_memory_limit.load(), TUnit::BYTES),
             PrettyPrinter::print(_total_mem_used.load(), TUnit::BYTES),
             PrettyPrinter::print(_wg_refresh_interval_memory_growth.load(), 
TUnit::BYTES),
             mem_used_ratio, _spill_low_watermark, _spill_high_watermark, 
cpu_hard_limit(),
@@ -115,21 +113,18 @@ std::string WorkloadGroup::debug_string() const {
 }
 
 bool WorkloadGroup::add_wg_refresh_interval_memory_growth(int64_t size) {
-    // If a group is enable memory overcommit, then not need check the limit
-    // It is always true, and it will only fail when process memory is not
-    // enough.
-    if (_enable_memory_overcommit) {
-        if (doris::GlobalMemoryArbitrator::is_exceed_soft_mem_limit(size)) {
-            return false;
-        } else {
-            return true;
-        }
-    }
     auto realtime_total_mem_used =
             _total_mem_used + _wg_refresh_interval_memory_growth.load() + size;
     if ((realtime_total_mem_used >
          ((double)_memory_limit * 
_spill_high_watermark.load(std::memory_order_relaxed) / 100))) {
-        return false;
+        // If a group is enable memory overcommit, then not need check the 
limit
+        // It is always true, and it will only fail when process memory is not
+        // enough.
+        if (_enable_memory_overcommit) {
+            return true;
+        } else {
+            return false;
+        }
     } else {
         _wg_refresh_interval_memory_growth.fetch_add(size);
         return true;
@@ -138,16 +133,15 @@ bool 
WorkloadGroup::add_wg_refresh_interval_memory_growth(int64_t size) {
 
 std::string WorkloadGroup::memory_debug_string() const {
     auto realtime_total_mem_used = _total_mem_used + 
_wg_refresh_interval_memory_growth.load();
-    auto mem_used_ratio = realtime_total_mem_used / 
((double)_weighted_memory_limit + 1);
+    auto mem_used_ratio = realtime_total_mem_used / ((double)_memory_limit + 
1);
     return fmt::format(
             "WorkloadGroup[id = {}, name = {}, memory_limit = {}, 
enable_memory_overcommit = "
-            "{}, weighted_memory_limit = {}, total_mem_used = {},"
+            "{}, total_mem_used = {},"
             "wg_refresh_interval_memory_growth = {},  mem_used_ratio = {}, 
spill_low_watermark = "
             "{}, "
             "spill_high_watermark = {}, version = {}, is_shutdown = {}, 
query_num = {}]",
             _id, _name, PrettyPrinter::print(_memory_limit, TUnit::BYTES),
             _enable_memory_overcommit ? "true" : "false",
-            PrettyPrinter::print(_weighted_memory_limit.load(), TUnit::BYTES),
             PrettyPrinter::print(_total_mem_used.load(), TUnit::BYTES),
             PrettyPrinter::print(_wg_refresh_interval_memory_growth.load(), 
TUnit::BYTES),
             mem_used_ratio, _spill_low_watermark, _spill_high_watermark, 
_version, _is_shutdown,
@@ -181,6 +175,7 @@ void WorkloadGroup::check_and_update(const 
WorkloadGroupInfo& tg_info) {
             _scan_bytes_per_second = tg_info.read_bytes_per_second;
             _remote_scan_bytes_per_second = 
tg_info.remote_read_bytes_per_second;
             _total_query_slot_count = tg_info.total_query_slot_count;
+            _load_buffer_ratio = tg_info.load_buffer_ratio;
         } else {
             return;
         }
@@ -188,9 +183,9 @@ void WorkloadGroup::check_and_update(const 
WorkloadGroupInfo& tg_info) {
 }
 
 // MemtrackerLimiter is not removed during query context release, so that 
should remove it here.
-int64_t WorkloadGroup::make_memory_tracker_snapshots(
-        std::list<std::shared_ptr<MemTrackerLimiter>>* tracker_snapshots) {
+int64_t WorkloadGroup::refresh_memory_usage() {
     int64_t used_memory = 0;
+    int64_t load_buffer_size = 0;
     for (auto& mem_tracker_group : _mem_tracker_limiter_pool) {
         std::lock_guard<std::mutex> l(mem_tracker_group.group_lock);
         for (auto trackerWptr = mem_tracker_group.trackers.begin();
@@ -199,16 +194,16 @@ int64_t WorkloadGroup::make_memory_tracker_snapshots(
             if (tracker == nullptr) {
                 trackerWptr = mem_tracker_group.trackers.erase(trackerWptr);
             } else {
-                if (tracker_snapshots != nullptr) {
-                    tracker_snapshots->insert(tracker_snapshots->end(), 
tracker);
-                }
+                tracker->update_load_buffer_size();
                 used_memory += tracker->consumption();
+                load_buffer_size += tracker->load_buffer_size();
                 ++trackerWptr;
             }
         }
     }
     // refresh total memory used.
     _total_mem_used = used_memory;
+    _load_buffer_size = load_buffer_size;
     // reserve memory is recorded in the query mem tracker
     // and _total_mem_used already contains all the current reserve memory.
     // so after refreshing _total_mem_used, reset 
_wg_refresh_interval_memory_growth.
@@ -218,7 +213,7 @@ int64_t WorkloadGroup::make_memory_tracker_snapshots(
 }
 
 int64_t WorkloadGroup::memory_used() {
-    return make_memory_tracker_snapshots(nullptr);
+    return refresh_memory_usage();
 }
 
 void WorkloadGroup::do_sweep() {
@@ -255,6 +250,91 @@ void 
WorkloadGroup::add_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> m
             _mem_tracker_limiter_pool[group_num].trackers.end(), 
mem_tracker_ptr);
 }
 
+int64_t WorkloadGroup::free_overcommited_memory(int64_t need_free_mem, 
RuntimeProfile* profile) {
+    if (need_free_mem <= 0) {
+        return 0;
+    }
+    int64_t used_memory = memory_used();
+    // Change need free mem to exceed limit
+    need_free_mem = std::min<int64_t>(used_memory - _memory_limit, 
need_free_mem);
+    if (need_free_mem <= 0) {
+        return 0;
+    }
+
+    int64_t freed_mem = 0;
+
+    std::string cancel_str =
+            fmt::format("Kill overcommit query, wg id:{}, name:{}, used:{}, 
limit:{}, backend:{}.",
+                        _id, _name, MemCounter::print_bytes(used_memory),
+                        MemCounter::print_bytes(_memory_limit), 
BackendOptions::get_localhost());
+
+    auto cancel_top_overcommit_str = [cancel_str](int64_t mem_consumption,
+                                                  const std::string& label) {
+        return fmt::format(
+                "{} cancel top memory overcommit tracker <{}> consumption {}. 
details:{}, "
+                "Execute again after enough memory, details see be.INFO.",
+                cancel_str, label, MemCounter::print_bytes(mem_consumption),
+                GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str());
+    };
+
+    LOG(INFO) << fmt::format(
+            "Workload group start gc, id:{} name:{}, memory limit: {}, used: 
{}, "
+            "need_free_mem: {}.",
+            _id, _name, _memory_limit, used_memory, need_free_mem);
+    Defer defer {[&]() {
+        LOG(INFO) << fmt::format(
+                "Workload group finished gc, id:{} name:{}, memory limit: {}, 
used: "
+                "{}, need_free_mem: {}, freed memory: {}.",
+                _id, _name, _memory_limit, used_memory, need_free_mem, 
freed_mem);
+    }};
+
+    // 1. free top overcommit query
+    RuntimeProfile* tmq_profile = profile->create_child(
+            fmt::format("FreeGroupTopOvercommitQuery:Name {}", _name), true, 
true);
+    freed_mem += MemTrackerLimiter::free_top_overcommit_query(
+            need_free_mem - freed_mem, MemTrackerLimiter::Type::QUERY, 
_mem_tracker_limiter_pool,
+            cancel_top_overcommit_str, tmq_profile, 
MemTrackerLimiter::GCType::WORK_LOAD_GROUP);
+    // To be compatible with the non-group's gc logic, minorGC just gc 
overcommit query
+    if (freed_mem >= need_free_mem) {
+        return freed_mem;
+    }
+    auto cancel_top_usage_str = [cancel_str](int64_t mem_consumption, const 
std::string& label) {
+        return fmt::format(
+                "{} cancel top memory used tracker <{}> consumption {}. 
details:{}, Execute "
+                "again "
+                "after enough memory, details see be.INFO.",
+                cancel_str, label, MemCounter::print_bytes(mem_consumption),
+                
GlobalMemoryArbitrator::process_soft_limit_exceeded_errmsg_str());
+    };
+    // 2. free top usage query
+    tmq_profile =
+            profile->create_child(fmt::format("FreeGroupTopUsageQuery:Name 
{}", _name), true, true);
+    freed_mem += MemTrackerLimiter::free_top_memory_query(
+            need_free_mem - freed_mem, MemTrackerLimiter::Type::QUERY, 
_mem_tracker_limiter_pool,
+            cancel_top_usage_str, tmq_profile, 
MemTrackerLimiter::GCType::WORK_LOAD_GROUP);
+    if (freed_mem >= need_free_mem) {
+        return freed_mem;
+    }
+
+    // 3. free top overcommit load
+    tmq_profile = 
profile->create_child(fmt::format("FreeGroupTopOvercommitLoad:Name {}", _name),
+                                        true, true);
+    freed_mem += MemTrackerLimiter::free_top_overcommit_query(
+            need_free_mem - freed_mem, MemTrackerLimiter::Type::LOAD, 
_mem_tracker_limiter_pool,
+            cancel_top_overcommit_str, tmq_profile, 
MemTrackerLimiter::GCType::WORK_LOAD_GROUP);
+    if (freed_mem >= need_free_mem) {
+        return freed_mem;
+    }
+
+    // 4. free top usage load
+    tmq_profile =
+            profile->create_child(fmt::format("FreeGroupTopUsageLoad:Name {}", 
_name), true, true);
+    freed_mem += MemTrackerLimiter::free_top_memory_query(
+            need_free_mem - freed_mem, MemTrackerLimiter::Type::LOAD, 
_mem_tracker_limiter_pool,
+            cancel_top_usage_str, tmq_profile, 
MemTrackerLimiter::GCType::WORK_LOAD_GROUP);
+    return freed_mem;
+}
+
 int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, RuntimeProfile* 
profile, bool is_minor_gc) {
     if (need_free_mem <= 0) {
         return 0;
@@ -466,6 +546,12 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
         total_query_slot_count = tworkload_group_info.total_query_slot_count;
     }
 
+    // 17 load buffer memory limit
+    int load_buffer_ratio = LOAD_BUFFER_RATIO_DEFAULT_VALUE;
+    if (tworkload_group_info.__isset.load_buffer_ratio) {
+        load_buffer_ratio = tworkload_group_info.load_buffer_ratio;
+    }
+
     return {.id = tg_id,
             .name = name,
             .cpu_share = cpu_share,
@@ -481,7 +567,8 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
             .spill_high_watermark = spill_high_watermark,
             .read_bytes_per_second = read_bytes_per_second,
             .remote_read_bytes_per_second = remote_read_bytes_per_second,
-            .total_query_slot_count = total_query_slot_count};
+            .total_query_slot_count = total_query_slot_count,
+            .load_buffer_ratio = load_buffer_ratio};
 }
 
 void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* 
exec_env) {
diff --git a/be/src/runtime/workload_group/workload_group.h 
b/be/src/runtime/workload_group/workload_group.h
index dc857a9f40d..c88f1e3604b 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -80,14 +80,14 @@ public:
 
     int64_t total_mem_used() const { return _total_mem_used; }
 
-    void set_weighted_memory_limit(int64_t weighted_memory_limit) {
-        _weighted_memory_limit = weighted_memory_limit;
-    }
+    int64_t load_mem_used() const { return _load_buffer_size; }
+
+    void enable_load_buffer_limit(bool enable_limit) { 
_enable_load_buffer_limit = enable_limit; }
+
+    bool enable_load_buffer_limit() const { return _enable_load_buffer_limit; }
 
     // make memory snapshots and refresh total memory used at the same time.
-    int64_t make_memory_tracker_snapshots(
-            std::list<std::shared_ptr<MemTrackerLimiter>>* tracker_snapshots);
-    // call make_memory_tracker_snapshots, so also refresh total memory used.
+    int64_t refresh_memory_usage();
     int64_t memory_used();
 
     void do_sweep();
@@ -120,20 +120,6 @@ public:
                                 
_spill_high_watermark.load(std::memory_order_relaxed) / 100));
     }
 
-    void update_load_mem_usage(int64_t active_bytes, int64_t queue_bytes, 
int64_t flush_bytes) {
-        std::unique_lock<std::shared_mutex> wlock(_mutex);
-        _active_mem_usage = active_bytes;
-        _queue_mem_usage = queue_bytes;
-        _flush_mem_usage = flush_bytes;
-    }
-
-    void get_load_mem_usage(int64_t* active_bytes, int64_t* queue_bytes, 
int64_t* flush_bytes) {
-        std::shared_lock<std::shared_mutex> r_lock(_mutex);
-        *active_bytes += _active_mem_usage;
-        *queue_bytes += _queue_mem_usage;
-        *flush_bytes += _flush_mem_usage;
-    }
-
     std::string debug_string() const;
     std::string memory_debug_string() const;
 
@@ -222,11 +208,9 @@ public:
         return _memtable_flush_pool.get();
     }
 
-    int64_t load_buffer_limit() { return _load_buffer_limit; }
+    int64_t load_buffer_limit() const { return _memory_limit * 
_load_buffer_ratio / 100; }
 
-    bool has_changed_to_hard_limit() const { return _has_changed_hard_limit; }
-
-    void change_to_hard_limit(bool to_hard_limit) { _has_changed_hard_limit = 
to_hard_limit; }
+    int64_t free_overcommited_memory(int64_t need_free_mem, RuntimeProfile* 
profile);
 
 private:
     mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, 
_memory_limit
@@ -237,19 +221,11 @@ private:
     // For example, load memtable, write to parquet.
     // If the wg's memory reached high water mark, then the load buffer
     // will be restricted to this limit.
-    int64_t _load_buffer_limit;
-    std::atomic<bool> _has_changed_hard_limit = false;
-
-    // memory used by load memtable
-    int64_t _active_mem_usage = 0;
-    int64_t _queue_mem_usage = 0;
-    int64_t _flush_mem_usage = 0;
-
-    // `weighted_memory_limit` less than or equal to _memory_limit, calculate 
after exclude public memory.
-    // more detailed description in `refresh_wg_weighted_memory_limit`.
-    std::atomic<int64_t> _weighted_memory_limit {0}; //
-    // last value of make_memory_tracker_snapshots, refresh every time 
make_memory_tracker_snapshots is called.
+    int64_t _load_buffer_ratio = 0;
+    std::atomic<bool> _enable_load_buffer_limit = false;
+
     std::atomic_int64_t _total_mem_used = 0; // bytes
+    std::atomic_int64_t _load_buffer_size = 0;
     std::atomic_int64_t _wg_refresh_interval_memory_growth;
     bool _enable_memory_overcommit;
     std::atomic<uint64_t> _cpu_share;
@@ -307,6 +283,7 @@ struct WorkloadGroupInfo {
     const int read_bytes_per_second = -1;
     const int remote_read_bytes_per_second = -1;
     const int total_query_slot_count = 0;
+    const int load_buffer_ratio = 0;
     // log cgroup cpu info
     uint64_t cgroup_cpu_shares = 0;
     int cgroup_cpu_hard_limit = 0;
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp 
b/be/src/runtime/workload_group/workload_group_manager.cpp
index b42aeeb1b43..784c09555c4 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -162,18 +162,16 @@ struct WorkloadGroupMemInfo {
     std::list<std::shared_ptr<MemTrackerLimiter>> tracker_snapshots =
             std::list<std::shared_ptr<MemTrackerLimiter>>();
 };
+
 void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() {
     std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
 
     // 1. make all workload groups memory snapshots(refresh workload groups 
total memory used at the same time)
     // and calculate total memory used of all queries.
     int64_t all_workload_groups_mem_usage = 0;
-    std::unordered_map<uint64_t, WorkloadGroupMemInfo> wgs_mem_info;
     bool has_wg_exceed_limit = false;
     for (auto& [wg_id, wg] : _workload_groups) {
-        wgs_mem_info[wg_id].total_mem_used =
-                
wg->make_memory_tracker_snapshots(&wgs_mem_info[wg_id].tracker_snapshots);
-        all_workload_groups_mem_usage += wgs_mem_info[wg_id].total_mem_used;
+        all_workload_groups_mem_usage += wg->refresh_memory_usage();
         if (wg->exceed_limit()) {
             has_wg_exceed_limit = true;
         }
@@ -219,7 +217,7 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() {
             weighted_memory_limit_ratio);
     LOG_EVERY_T(INFO, 60) << debug_msg;
     for (auto& wg : _workload_groups) {
-        update_queries_limit(wg.second, false);
+        update_queries_limit_(wg.second, false);
     }
 }
 
@@ -264,43 +262,27 @@ void WorkloadGroupMgr::add_paused_query(const 
std::shared_ptr<QueryContext>& que
     }
 }
 
-/**
- * 1. When Process's memory is lower than soft limit, then all workload group 
will be converted to hard limit (Exception: there is only one workload group).
- * 2. Reserve logic for workload group that is soft limit take no effect, it 
will always return success.
- * 3. QueryLimit for streamload,routineload,group commit, take no affect, it 
will always return success, but workload group's hard limit will take affect.
- * 4. See handle_non_overcommit_wg_paused_queries for hard limit logic.
- */
-void WorkloadGroupMgr::handle_paused_queries() {
-    handle_non_overcommit_wg_paused_queries();
-    handle_overcommit_wg_paused_queries();
-}
-
 /**
  * Strategy 1: A revocable query should not have any running 
task(PipelineTask).
  * strategy 2: If the workload group has any task exceed workload group 
memlimit, then set all queryctx's memlimit
  * strategy 3: If any query exceed process memlimit, then should clear all 
caches.
  * strategy 4: If any query exceed query's memlimit, then do spill disk or 
cancel it.
  * strategy 5: If any query exceed process's memlimit and cache is zero, then 
do following:
- * 1. cancel other wg's(soft limit) query that exceed limit
- * 2. spill disk
- * 3. cancel it self.
  */
-void WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() {
+void WorkloadGroupMgr::handle_paused_queries() {
     const int64_t TIMEOUT_IN_QUEUE = 1000L * 10;
     std::unique_lock<std::mutex> lock(_paused_queries_lock);
-    std::vector<std::weak_ptr<QueryContext>> resume_after_gc;
+    bool has_revoked_from_other_group = false;
     for (auto it = _paused_queries_list.begin(); it != 
_paused_queries_list.end();) {
         auto& queries_list = it->second;
         const auto& wg = it->first;
-        if (queries_list.empty()) {
-            it = _paused_queries_list.erase(it);
-            continue;
-        }
+
         bool is_low_wartermark = false;
         bool is_high_wartermark = false;
-
         wg->check_mem_used(&is_low_wartermark, &is_high_wartermark);
+
         bool has_changed_hard_limit = false;
+        int64_t flushed_memtable_bytes = 0;
         // If the query is paused because its limit exceed the query itself's 
memlimit, then just spill disk.
         // The query's memlimit is set using slot mechanism and its value is 
set using the user settings, not
         // by weighted value. So if reserve failed, then it is actually exceed 
limit.
@@ -317,29 +299,12 @@ void 
WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() {
                 query_it = queries_list.erase(query_it);
                 continue;
             }
-            bool wg_changed_to_hard_limit = wg->has_changed_to_hard_limit();
-            // Only deal with non overcommit workload group.
-            if (wg->enable_memory_overcommit() && !wg_changed_to_hard_limit &&
-                
!query_ctx->paused_reason().is<ErrorCode::PROCESS_MEMORY_EXCEEDED>()) {
-                // Soft limit wg will only reserve failed when process limit 
exceed. But in some corner case,
-                // when reserve, the wg is hard limit, the query reserve 
failed, but when this loop run
-                // the wg is converted to soft limit.
-                // So that should resume the query.
-                LOG(WARNING) << "query: " << print_id(query_ctx->query_id())
-                             << " reserve memory failed, but workload group 
not converted to hard "
-                                "limit, it should not happen, resume it again. 
paused reason: "
-                             << query_ctx->paused_reason();
-                query_ctx->set_memory_sufficient(true);
-                query_it = queries_list.erase(query_it);
-                continue;
-            }
 
             if 
(query_ctx->paused_reason().is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) {
-                CHECK(!wg->enable_memory_overcommit() || 
wg_changed_to_hard_limit);
                 // Streamload, kafka load, group commit will never have query 
memory exceeded error because
                 // their  query limit is very large.
-                bool spill_res = handle_single_query(query_ctx, 
query_it->reserve_size_,
-                                                     
query_ctx->paused_reason());
+                bool spill_res = handle_single_query_(query_ctx, 
query_it->reserve_size_,
+                                                      
query_ctx->paused_reason());
                 if (!spill_res) {
                     ++query_it;
                     continue;
@@ -348,7 +313,21 @@ void 
WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() {
                     continue;
                 }
             } else if 
(query_ctx->paused_reason().is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) {
-                CHECK(!wg->enable_memory_overcommit() || 
wg_changed_to_hard_limit);
+                // Only deal with non overcommit workload group.
+                if (wg->enable_memory_overcommit()) {
+                    // Soft limit wg will only reserve failed when process 
limit exceed. But in some corner case,
+                    // when reserve, the wg is hard limit, the query reserve 
failed, but when this loop run
+                    // the wg is converted to soft limit.
+                    // So that should resume the query.
+                    LOG(WARNING)
+                            << "query: " << print_id(query_ctx->query_id())
+                            << " reserve memory failed because exceed workload 
group memlimit, it "
+                               "should not happen, resume it again. paused 
reason: "
+                            << query_ctx->paused_reason();
+                    query_ctx->set_memory_sufficient(true);
+                    query_it = queries_list.erase(query_it);
+                    continue;
+                }
                 // check if the reserve is too large, if it is too large,
                 // should set the query's limit only.
                 // Check the query's reserve with expected limit.
@@ -358,13 +337,28 @@ void 
WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() {
                     query_ctx->set_memory_sufficient(true);
                     LOG(INFO) << "workload group memory reserve failed because 
"
                               << query_ctx->debug_string() << " reserve size "
-                              << query_it->reserve_size_ << " is too large, 
set hard limit to "
-                              << query_ctx->expected_mem_limit() << " and 
resume running.";
+                              << 
PrettyPrinter::print_bytes(query_it->reserve_size_)
+                              << " is too large, set hard limit to "
+                              << 
PrettyPrinter::print_bytes(query_ctx->expected_mem_limit())
+                              << " and resume running.";
                     query_it = queries_list.erase(query_it);
                     continue;
                 }
+                if (flushed_memtable_bytes < 0) {
+                    flushed_memtable_bytes = 
flush_memtable_from_current_group_(
+                            query_ctx, wg, query_it->reserve_size_);
+                }
+                if (flushed_memtable_bytes > 0) {
+                    // Flushed some memtable, just wait flush finished and not 
do anything more.
+                    wg->enable_load_buffer_limit(true);
+                    ++query_it;
+                    continue;
+                } else {
+                    // If could not revoke memory by flush memtable, then 
disable load buffer limit
+                    wg->enable_load_buffer_limit(false);
+                }
                 if (!has_changed_hard_limit) {
-                    update_queries_limit(wg, true);
+                    update_queries_limit_(wg, true);
                     has_changed_hard_limit = true;
                     LOG(INFO) << "query: " << print_id(query_ctx->query_id())
                               << " reserve memory failed due to workload group 
memory exceed, "
@@ -372,42 +366,6 @@ void 
WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() {
                                  "so that other query will reduce their 
memory. wg: "
                               << wg->debug_string();
                 }
-                // If there are a lot of memtable memory, then wait them flush 
finished.
-                MemTableMemoryLimiter* memtable_limiter =
-                        
doris::ExecEnv::GetInstance()->memtable_memory_limiter();
-                // Not use memlimit, should use high water mark.
-                int64_t memtable_active_bytes = 0;
-                int64_t memtable_queue_bytes = 0;
-                int64_t memtable_flush_bytes = 0;
-                wg->get_load_mem_usage(&memtable_active_bytes, 
&memtable_queue_bytes,
-                                       &memtable_flush_bytes);
-                // TODO: should add a signal in memtable limiter to prevent 
new batch
-                // For example, streamload, it will not reserve many memory, 
but it will occupy many memtable memory.
-                // TODO: 0.2 should be a workload group properties. For 
example, the group is optimized for load,then the value
-                // should be larged, if the group is optimized for query, then 
the value should be smaller.
-                int64_t max_wg_memtable_bytes = wg->load_buffer_limit();
-                if (memtable_active_bytes + memtable_queue_bytes + 
memtable_flush_bytes >
-                    max_wg_memtable_bytes) {
-                    // There are many table in flush queue, just waiting them 
flush finished.
-                    if (memtable_active_bytes < 
(int64_t)(max_wg_memtable_bytes * 0.6)) {
-                        LOG_EVERY_T(INFO, 60)
-                                << wg->name() << " load memtable size is: " << 
memtable_active_bytes
-                                << ", " << memtable_queue_bytes << ", " << 
memtable_flush_bytes
-                                << ", load buffer limit is: " << 
max_wg_memtable_bytes
-                                << " wait for flush finished to release more 
memory";
-                        continue;
-                    } else {
-                        // Flush some memtables(currently written) to flush 
queue.
-                        memtable_limiter->flush_workload_group_memtables(
-                                wg->id(),
-                                memtable_active_bytes - 
(int64_t)(max_wg_memtable_bytes * 0.6));
-                        LOG_EVERY_T(INFO, 60)
-                                << wg->name() << " load memtable size is: " << 
memtable_active_bytes
-                                << ", " << memtable_queue_bytes << ", " << 
memtable_flush_bytes
-                                << ", flush some active memtable to revoke 
memory";
-                        continue;
-                    }
-                }
                 // Should not put the query back to task scheduler 
immediately, because when wg's memory not sufficient,
                 // and then set wg's flag, other query may not free memory 
very quickly.
                 if (query_it->elapsed_time() > TIMEOUT_IN_QUEUE) {
@@ -415,13 +373,12 @@ void 
WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() {
                     LOG(INFO) << "query: " << print_id(query_ctx->query_id()) 
<< " will be resume.";
                     query_ctx->set_memory_sufficient(true);
                     query_it = queries_list.erase(query_it);
+                    continue;
                 } else {
                     ++query_it;
+                    continue;
                 }
-                continue;
             } else {
-                // PROCESS Reserve logic using hard limit, if reached here, 
should try to spill or cancel.
-                // GC Logic also work at hard limit, so GC may cancel some 
query and could not spill here.
                 // If wg's memlimit not exceed, but process memory exceed, it 
means cache or other metadata
                 // used too much memory. Should clean all cache here.
                 // 1. Check cache used, if cache is larger than > 0, then just 
return and wait for it to 0 to release some memory.
@@ -437,34 +394,43 @@ void 
WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() {
                                  "to 0 now";
                 }
                 if (query_it->cache_ratio_ < 0.001) {
-                    if (query_it->any_wg_exceed_limit_) {
-                        if (wg->enable_memory_overcommit()) {
-                            if (query_it->elapsed_time() > TIMEOUT_IN_QUEUE) {
-                                resume_after_gc.push_back(query_ctx);
+                    // 1. Check if could revoke some memory from memtable
+                    if (flushed_memtable_bytes < 0) {
+                        flushed_memtable_bytes = 
flush_memtable_from_current_group_(
+                                query_ctx, wg, query_it->reserve_size_);
+                    }
+                    if (flushed_memtable_bytes > 0) {
+                        // Flushed some memtable, just wait flush finished and 
not do anything more.
+                        ++query_it;
+                        continue;
+                    }
+                    // TODO should wait here to check if the process has 
release revoked_size memory and then continue.
+                    if (!has_revoked_from_other_group) {
+                        int64_t revoked_size = revoke_memory_from_other_group_(
+                                query_ctx, wg->enable_memory_overcommit(), 
query_it->reserve_size_);
+                        if (revoked_size > 0) {
+                            has_revoked_from_other_group = true;
+                            query_ctx->set_memory_sufficient(true);
+                            query_it = queries_list.erase(query_it);
+                            // Do not care if the revoked_size > reserve size, 
and try to run again.
+                            continue;
+                        } else {
+                            bool spill_res = handle_single_query_(
+                                    query_ctx, query_it->reserve_size_, 
query_ctx->paused_reason());
+                            if (spill_res) {
                                 query_it = queries_list.erase(query_it);
                                 continue;
                             } else {
                                 ++query_it;
                                 continue;
                             }
-                        } else {
-                            // current workload group is hard limit, should 
not wait other wg with
-                            // soft limit, just cancel
-                            resume_after_gc.push_back(query_ctx);
-                            query_it = queries_list.erase(query_it);
-                            continue;
                         }
                     } else {
-                        // TODO: Find other exceed limit workload group and 
cancel query.
-                        bool spill_res = handle_single_query(query_ctx, 
query_it->reserve_size_,
-                                                             
query_ctx->paused_reason());
-                        if (!spill_res) {
-                            ++query_it;
-                            continue;
-                        } else {
-                            query_it = queries_list.erase(query_it);
-                            continue;
-                        }
+                        // If any query is cancelled during process limit 
stage, should resume other query and
+                        // do not do any check now.
+                        query_ctx->set_memory_sufficient(true);
+                        query_it = queries_list.erase(query_it);
+                        continue;
                     }
                 }
                 if 
(doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted <
@@ -479,68 +445,148 @@ void 
WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() {
                 ++query_it;
             }
         }
+        // Not need waiting flush memtable and below low watermark disable 
load buffer limit
+        if (flushed_memtable_bytes <= 0 && !is_low_wartermark) {
+            wg->enable_load_buffer_limit(false);
+        }
 
-        // Finished deal with one workload group, and should deal with next 
one.
-        ++it;
-    }
-    // TODO minor GC to release some query
-    if (!resume_after_gc.empty()) {
+        if (queries_list.empty()) {
+            it = _paused_queries_list.erase(it);
+            continue;
+        } else {
+            // Finished deal with one workload group, and should deal with 
next one.
+            ++it;
+        }
     }
-    for (auto resume_it = resume_after_gc.begin(); resume_it != 
resume_after_gc.end();
-         ++resume_it) {
-        auto query_ctx = resume_it->lock();
-        if (query_ctx != nullptr) {
-            query_ctx->set_memory_sufficient(true);
+}
+
+// Return the expected free bytes if memtable could flush
+int64_t WorkloadGroupMgr::flush_memtable_from_current_group_(
+        std::shared_ptr<QueryContext> requestor, WorkloadGroupPtr wg, int64_t 
need_free_mem) {
+    // If there are a lot of memtable memory, then wait them flush finished.
+    MemTableMemoryLimiter* memtable_limiter =
+            doris::ExecEnv::GetInstance()->memtable_memory_limiter();
+    int64_t memtable_active_bytes = 0;
+    int64_t memtable_queue_bytes = 0;
+    int64_t memtable_flush_bytes = 0;
+    memtable_limiter->get_workload_group_memtable_usage(
+            wg->id(), &memtable_active_bytes, &memtable_queue_bytes, 
&memtable_flush_bytes);
+    // TODO: should add a signal in memtable limiter to prevent new batch
+    // For example, streamload, it will not reserve many memory, but it will 
occupy many memtable memory.
+    // TODO: 0.2 should be a workload group properties. For example, the group 
is optimized for load,then the value
+    // should be larged, if the group is optimized for query, then the value 
should be smaller.
+    int64_t max_wg_memtable_bytes = wg->load_buffer_limit();
+    if (memtable_active_bytes + memtable_queue_bytes + memtable_flush_bytes >
+        max_wg_memtable_bytes) {
+        // There are many table in flush queue, just waiting them flush 
finished.
+        if (memtable_active_bytes < (int64_t)(max_wg_memtable_bytes * 0.6)) {
+            LOG_EVERY_T(INFO, 60) << wg->name()
+                                  << " load memtable size is: " << 
memtable_active_bytes << ", "
+                                  << memtable_queue_bytes << ", " << 
memtable_flush_bytes
+                                  << ", load buffer limit is: " << 
max_wg_memtable_bytes
+                                  << " wait for flush finished to release more 
memory";
+            return memtable_queue_bytes + memtable_flush_bytes;
+        } else {
+            // Flush some memtables(currently written) to flush queue.
+            memtable_limiter->flush_workload_group_memtables(
+                    wg->id(), memtable_active_bytes - 
(int64_t)(max_wg_memtable_bytes * 0.6));
+            LOG_EVERY_T(INFO, 60) << wg->name()
+                                  << " load memtable size is: " << 
memtable_active_bytes << ", "
+                                  << memtable_queue_bytes << ", " << 
memtable_flush_bytes
+                                  << ", flush some active memtable to revoke 
memory";
+            return memtable_queue_bytes + memtable_flush_bytes + 
memtable_active_bytes -
+                   (int64_t)(max_wg_memtable_bytes * 0.6);
         }
     }
+    return 0;
 }
 
-// streamload, kafka routine load, group commit
-// insert into select
-// select
+int64_t 
WorkloadGroupMgr::revoke_memory_from_other_group_(std::shared_ptr<QueryContext> 
requestor,
+                                                          bool hard_limit, 
int64_t need_free_mem) {
+    int64_t total_freed_mem = 0;
+    std::unique_ptr<RuntimeProfile> profile = 
std::make_unique<RuntimeProfile>("RevokeMemory");
+    // 1. memtable like memory
+    // 2. query exceed workload group limit
+    int64_t freed_mem = revoke_overcommited_memory_(requestor, need_free_mem, 
profile.get());
+    total_freed_mem += freed_mem;
+    // The revoke process may kill current requestor, so should return now.
+    if (need_free_mem - total_freed_mem < 0 || requestor->is_cancelled()) {
+        return total_freed_mem;
+    }
+    if (hard_limit) {
+        freed_mem = cancel_top_query_in_overcommit_group_(need_free_mem - 
total_freed_mem,
+                                                          
doris::QUERY_MIN_MEMORY, profile.get());
+    } else {
+        freed_mem = cancel_top_query_in_overcommit_group_(
+                need_free_mem - total_freed_mem, 
requestor->get_mem_tracker()->consumption(),
+                profile.get());
+    }
+    total_freed_mem += freed_mem;
+    // The revoke process may kill current requestor, so should return now.
+    if (need_free_mem - total_freed_mem < 0 || requestor->is_cancelled()) {
+        return total_freed_mem;
+    }
+    return total_freed_mem;
+}
 
-void WorkloadGroupMgr::handle_overcommit_wg_paused_queries() {
-    std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
-    // If there is only one workload group and it is overcommit, then do 
nothing.
-    // And should also start MinorGC logic.
-    if (_workload_groups.size() == 1) {
-        return;
+// Revoke memory from workload group that exceed it's limit. For example, if 
the wg's limit is 10g, but used 12g
+// then should revoke 2g from the group.
+int64_t 
WorkloadGroupMgr::revoke_overcommited_memory_(std::shared_ptr<QueryContext> 
requestor,
+                                                      int64_t need_free_mem,
+                                                      RuntimeProfile* profile) 
{
+    int64_t total_freed_mem = 0;
+    // 1. check memtable usage, and try to free them.
+    int64_t freed_mem = 
revoke_memtable_from_overcommited_groups_(need_free_mem, profile);
+    total_freed_mem += freed_mem;
+    // The revoke process may kill current requestor, so should return now.
+    if (need_free_mem - total_freed_mem < 0 || requestor->is_cancelled()) {
+        return total_freed_mem;
     }
-    // soft_limit - 10%, will change workload group to hard limit.
-    // soft limit, process memory reserve failed.
-    // hard limit, FullGC will kill query randomly.
-    if (doris::GlobalMemoryArbitrator::is_exceed_soft_mem_limit(
-                (int64_t)(MemInfo::mem_limit() * 0.1))) {
-        for (auto& [wg_id, wg] : _workload_groups) {
-            if (wg->enable_memory_overcommit() && 
!wg->has_changed_to_hard_limit()) {
-                wg->change_to_hard_limit(true);
-                LOG(INFO) << "Process memory usage + 10% will exceed soft 
limit, change all "
-                             "workload "
-                             "group with overcommit to hard limit now. "
-                          << wg->debug_string();
-            }
+    // 2. Cancel top usage query, one by one
+    std::map<WorkloadGroupPtr, int64_t> wg_mem_usage;
+    using WorkloadGroupMem = std::pair<WorkloadGroupPtr, int64_t>;
+    auto cmp = [](WorkloadGroupMem left, WorkloadGroupMem right) {
+        return left.second < right.second;
+    };
+    std::priority_queue<WorkloadGroupMem, std::vector<WorkloadGroupMem>, 
decltype(cmp)> heap(cmp);
+    {
+        std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
+        for (auto iter = _workload_groups.begin(); iter != 
_workload_groups.end(); iter++) {
+            heap.emplace(iter->second, iter->second->memory_used());
         }
     }
-    // If current memory usage is below soft memlimit - 15%, then enable wg's 
overcommit
-    if (!doris::GlobalMemoryArbitrator::is_exceed_soft_mem_limit(
-                (int64_t)(MemInfo::mem_limit() * 0.15))) {
-        for (auto& [wg_id, wg] : _workload_groups) {
-            if (wg->enable_memory_overcommit() && 
wg->has_changed_to_hard_limit()) {
-                wg->change_to_hard_limit(false);
-                LOG(INFO) << "Process memory usage is lower than soft limit, 
enable all workload "
-                             "group overcommit now. "
-                          << wg->debug_string();
-            }
-        }
+    while (!heap.empty() && need_free_mem - total_freed_mem > 0 && 
!requestor->is_cancelled()) {
+        auto [wg, sort_mem] = heap.top();
+        heap.pop();
+        freed_mem = wg->free_overcommited_memory(need_free_mem - 
total_freed_mem, profile);
+        total_freed_mem += freed_mem;
     }
+    return total_freed_mem;
 }
 
-// If the query could release some memory, for example, spill disk, flush 
memtable then the return value is true.
+// If the memtable is too large, then flush them and wait for finished.
+int64_t WorkloadGroupMgr::revoke_memtable_from_overcommited_groups_(int64_t 
need_free_mem,
+                                                                    
RuntimeProfile* profile) {
+    return 0;
+}
+
+// 1. Sort all memory limiter in all overcommit wg, and cancel the top usage 
task that with most memory.
+// 2. Maybe not valid because it's memory not exceed limit.
+int64_t WorkloadGroupMgr::cancel_top_query_in_overcommit_group_(int64_t 
need_free_mem,
+                                                                int64_t 
lower_bound,
+                                                                
RuntimeProfile* profile) {
+    return 0;
+}
+
+// streamload, kafka routine load, group commit
+// insert into select
+// select
+
+// If the query could release some memory, for example, spill disk, then the 
return value is true.
 // If the query could not release memory, then cancel the query, the return 
value is true.
-// If the query is not ready to do these tasks, it means just wait.
-bool WorkloadGroupMgr::handle_single_query(std::shared_ptr<QueryContext> 
query_ctx,
-                                           size_t size_to_reserve, Status 
paused_reason) {
-    // TODO: If the query is an insert into select query, should consider 
memtable as revoke memory.
+// If the query is not ready to do these tasks, it means just wait, then 
return value is false.
+bool WorkloadGroupMgr::handle_single_query_(std::shared_ptr<QueryContext> 
query_ctx,
+                                            size_t size_to_reserve, Status 
paused_reason) {
     size_t revocable_size = 0;
     size_t memory_usage = 0;
     bool has_running_task = false;
@@ -556,6 +602,8 @@ bool 
WorkloadGroupMgr::handle_single_query(std::shared_ptr<QueryContext> query_c
     if (revocable_tasks.empty()) {
         if (paused_reason.is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) {
             const auto limit = query_ctx->get_mem_limit();
+            // During waiting time, another operator in the query may finished 
and release
+            // many memory and we could run.
             if ((memory_usage + size_to_reserve) < limit) {
                 LOG(INFO) << "query: " << query_id << ", usage(" << 
memory_usage << " + "
                           << size_to_reserve << ") less than limit(" << limit 
<< "), resume it.";
@@ -564,7 +612,8 @@ bool 
WorkloadGroupMgr::handle_single_query(std::shared_ptr<QueryContext> query_c
             } else {
                 // Use MEM_LIMIT_EXCEEDED so that FE could parse the error 
code and do try logic
                 
query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(
-                        "query({}) reserve memory failed, but could not find  
memory that could "
+                        "query({}) reserve memory failed, but could not find  
memory that "
+                        "could "
                         "release or spill to disk(usage:{}, limit: {})",
                         query_id, memory_usage, query_ctx->get_mem_limit()));
             }
@@ -584,7 +633,8 @@ bool 
WorkloadGroupMgr::handle_single_query(std::shared_ptr<QueryContext> query_c
                       << ", wg info: " << 
query_ctx->workload_group()->memory_debug_string();
             
query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(
                     "The query({}) reserved memory failed because process 
limit exceeded, and "
-                    "there is no cache now. And could not find task to spill. 
Maybe you should set "
+                    "there is no cache now. And could not find task to spill. 
Maybe you should "
+                    "set "
                     "the workload group's limit to a lower value.",
                     query_id));
         }
@@ -595,21 +645,15 @@ bool 
WorkloadGroupMgr::handle_single_query(std::shared_ptr<QueryContext> query_c
     return true;
 }
 
-void WorkloadGroupMgr::update_queries_limit(WorkloadGroupPtr wg, bool 
enable_hard_limit) {
+void WorkloadGroupMgr::update_queries_limit_(WorkloadGroupPtr wg, bool 
enable_hard_limit) {
     auto wg_mem_limit = wg->memory_limit();
-    auto wg_weighted_mem_limit = int64_t(wg_mem_limit * 1);
-    wg->set_weighted_memory_limit(wg_weighted_mem_limit);
     auto all_query_ctxs = wg->queries();
     bool is_low_wartermark = false;
     bool is_high_wartermark = false;
     wg->check_mem_used(&is_low_wartermark, &is_high_wartermark);
     int64_t wg_high_water_mark_limit =
             (int64_t)(wg_mem_limit * wg->spill_threshold_high_water_mark() * 
1.0 / 100);
-    int64_t memtable_active_bytes = 0;
-    int64_t memtable_queue_bytes = 0;
-    int64_t memtable_flush_bytes = 0;
-    wg->get_load_mem_usage(&memtable_active_bytes, &memtable_queue_bytes, 
&memtable_flush_bytes);
-    int64_t memtable_usage = memtable_active_bytes + memtable_queue_bytes + 
memtable_flush_bytes;
+    int64_t memtable_usage = wg->load_mem_used();
     int64_t wg_high_water_mark_except_load = wg_high_water_mark_limit;
     if (memtable_usage > wg->load_buffer_limit()) {
         wg_high_water_mark_except_load = wg_high_water_mark_limit - 
wg->load_buffer_limit();
@@ -620,20 +664,23 @@ void 
WorkloadGroupMgr::update_queries_limit(WorkloadGroupPtr wg, bool enable_har
     std::string debug_msg;
     if (is_high_wartermark || is_low_wartermark) {
         debug_msg = fmt::format(
-                "\nWorkload Group {}: mem limit: {}, mem used: {}, weighted 
mem limit: {}, "
+                "\nWorkload Group {}: mem limit: {}, mem used: {}, "
                 "high water mark mem limit: {}, load memtable usage: {}, used 
ratio: {}",
                 wg->name(), PrettyPrinter::print(wg->memory_limit(), 
TUnit::BYTES),
                 PrettyPrinter::print(wg->total_mem_used(), TUnit::BYTES),
-                PrettyPrinter::print(wg_weighted_mem_limit, TUnit::BYTES),
                 PrettyPrinter::print(wg_high_water_mark_limit, TUnit::BYTES),
                 PrettyPrinter::print(memtable_usage, TUnit::BYTES),
-                (double)(wg->total_mem_used()) / wg_weighted_mem_limit);
+                (double)(wg->total_mem_used()) / wg_mem_limit);
     }
 
     // If the wg enable over commit memory, then it is no need to update query 
memlimit
-    if (wg->enable_memory_overcommit() && !wg->has_changed_to_hard_limit()) {
+    if (wg->enable_memory_overcommit()) {
         return;
     }
+    // If reached low watermark then enable load buffer limit
+    if (is_low_wartermark) {
+        wg->enable_load_buffer_limit(true);
+    }
     int32_t total_used_slot_count = 0;
     int32_t total_slot_count = wg->total_query_slot_count();
     // calculate total used slot count
@@ -657,7 +704,7 @@ void 
WorkloadGroupMgr::update_queries_limit(WorkloadGroupPtr wg, bool enable_har
         int64_t query_weighted_mem_limit = 0;
         int64_t expected_query_weighted_mem_limit = 0;
         // If the query enable hard limit, then it should not use the soft 
limit
-        if (query_ctx->enable_query_slot_hard_limit()) {
+        if (!query_ctx->enable_mem_overcommit()) {
             if (total_slot_count < 1) {
                 LOG(WARNING)
                         << "query " << print_id(query_ctx->query_id())
@@ -701,21 +748,4 @@ void WorkloadGroupMgr::stop() {
     }
 }
 
-void WorkloadGroupMgr::update_load_memtable_usage(
-        const std::map<uint64_t, MemtableUsage>& wg_memtable_usages) {
-    // Use readlock here, because it will not modify workload_groups
-    std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
-    for (auto it = _workload_groups.begin(); it != _workload_groups.end(); 
++it) {
-        auto wg_usage = wg_memtable_usages.find(it->first);
-        if (wg_usage != wg_memtable_usages.end()) {
-            
it->second->update_load_mem_usage(wg_usage->second.active_mem_usage,
-                                              wg_usage->second.queue_mem_usage,
-                                              
wg_usage->second.flush_mem_usage);
-        } else {
-            // Not anything in memtable limiter, then set to 0
-            it->second->update_load_mem_usage(0, 0, 0);
-        }
-    }
-}
-
 } // namespace doris
diff --git a/be/src/runtime/workload_group/workload_group_manager.h 
b/be/src/runtime/workload_group/workload_group_manager.h
index c8e1ee7adf7..065528c66ec 100644
--- a/be/src/runtime/workload_group/workload_group_manager.h
+++ b/be/src/runtime/workload_group/workload_group_manager.h
@@ -105,14 +105,20 @@ public:
 
     void handle_paused_queries();
 
-    void update_load_memtable_usage(const std::map<uint64_t, MemtableUsage>& 
wg_memtable_usages);
-
 private:
-    bool handle_single_query(std::shared_ptr<QueryContext> query_ctx, size_t 
size_to_reserve,
-                             Status paused_reason);
-    void handle_non_overcommit_wg_paused_queries();
-    void handle_overcommit_wg_paused_queries();
-    void update_queries_limit(WorkloadGroupPtr wg, bool enable_hard_limit);
+    int64_t cancel_top_query_in_overcommit_group_(int64_t need_free_mem, 
int64_t lower_bound,
+                                                  RuntimeProfile* profile);
+    int64_t flush_memtable_from_current_group_(std::shared_ptr<QueryContext> 
requestor,
+                                               WorkloadGroupPtr wg, int64_t 
need_free_mem);
+    bool handle_single_query_(std::shared_ptr<QueryContext> query_ctx, size_t 
size_to_reserve,
+                              Status paused_reason);
+    int64_t revoke_memory_from_other_group_(std::shared_ptr<QueryContext> 
requestor,
+                                            bool hard_limit, int64_t 
need_free_mem);
+    int64_t revoke_overcommited_memory_(std::shared_ptr<QueryContext> 
requestor,
+                                        int64_t need_free_mem, RuntimeProfile* 
profile);
+    int64_t revoke_memtable_from_overcommited_groups_(int64_t need_free_mem,
+                                                      RuntimeProfile* profile);
+    void update_queries_limit_(WorkloadGroupPtr wg, bool enable_hard_limit);
 
 private:
     std::shared_mutex _group_mutex;
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 96dfd85d297..df30e6777d9 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -503,7 +503,8 @@ Status 
VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> block
     }
     {
         SCOPED_TIMER(_wait_mem_limit_timer);
-        
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush();
+        
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_workload_group_memtable_flush(
+                _state->workload_group());
     }
     SCOPED_TIMER(_write_memtable_timer);
     st = delta_writer->write(block.get(), rows.row_idxes);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 97cedeafd03..52e78618c05 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -633,7 +633,7 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String QUERY_SLOT_COUNT = "query_slot_count";
 
-    public static final String ENABLE_QUERY_SLOT_HARD_LIMIT = 
"enable_query_slot_hard_limit";
+    public static final String ENABLE_MEM_OVERCOMMIT = "enable_mem_overcommit";
 
     public static final String MAX_COLUMN_READER_NUM = "max_column_reader_num";
 
@@ -727,7 +727,7 @@ public class SessionVariable implements Serializable, 
Writable {
     public long insertVisibleTimeoutMs = DEFAULT_INSERT_VISIBLE_TIMEOUT_MS;
 
     // max memory used on every backend.
-    @VariableMgr.VarAttr(name = EXEC_MEM_LIMIT)
+    @VariableMgr.VarAttr(name = EXEC_MEM_LIMIT, needForward = true)
     public long maxExecMemByte = 2147483648L;
 
     @VariableMgr.VarAttr(name = SCAN_QUEUE_MEM_LIMIT,
@@ -829,10 +829,10 @@ public class SessionVariable implements Serializable, 
Writable {
         }
     }
 
-    @VariableMgr.VarAttr(name = ENABLE_QUERY_SLOT_HARD_LIMIT, needForward = 
true, description = {
-            "是否通过硬限的方式来计算每个Slot的内存资源",
-            "Whether to calculate the memory resources of each Slot by hard 
limit"})
-    public boolean enableQuerySlotHardLimit = false;
+    @VariableMgr.VarAttr(name = ENABLE_MEM_OVERCOMMIT, needForward = true, 
description = {
+            "是否通过硬限的方式来计算每个Query的内存资源",
+            "Whether to calculate the memory resources of each query by hard 
limit"})
+    public boolean enableMemOvercommit = true;
 
     @VariableMgr.VarAttr(name = MAX_COLUMN_READER_NUM)
     public int maxColumnReaderNum = 20000;
@@ -3883,7 +3883,7 @@ public class SessionVariable implements Serializable, 
Writable {
         tResult.setHiveOrcUseColumnNames(hiveOrcUseColumnNames);
         tResult.setHiveParquetUseColumnNames(hiveParquetUseColumnNames);
         tResult.setQuerySlotCount(wgQuerySlotCount);
-        tResult.setEnableQuerySlotHardLimit(enableQuerySlotHardLimit);
+        tResult.setEnableMemOvercommit(enableMemOvercommit);
 
         tResult.setKeepCarriageReturn(keepCarriageReturn);
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
index 42929be609b..b428a058277 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
@@ -57,6 +57,8 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
 
     public static final String ENABLE_MEMORY_OVERCOMMIT = 
"enable_memory_overcommit";
 
+    public static final String LOAD_BUFFER_RATIO = "load_buffer_ratio";
+
     public static final String MAX_CONCURRENCY = "max_concurrency";
 
     public static final String MAX_QUEUE_SIZE = "max_queue_size";
@@ -87,10 +89,12 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
             
.add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).add(CPU_HARD_LIMIT).add(SCAN_THREAD_NUM)
             .add(MAX_REMOTE_SCAN_THREAD_NUM).add(MIN_REMOTE_SCAN_THREAD_NUM)
             
.add(SPILL_THRESHOLD_LOW_WATERMARK).add(SPILL_THRESHOLD_HIGH_WATERMARK)
-            
.add(TAG).add(READ_BYTES_PER_SECOND).add(REMOTE_READ_BYTES_PER_SECOND).build();
+            
.add(TAG).add(READ_BYTES_PER_SECOND).add(REMOTE_READ_BYTES_PER_SECOND)
+            .add(LOAD_BUFFER_RATIO).build();
 
-    public static final int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50;
-    public static final int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80;
+    public static final int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 75;
+    public static final int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 90;
+    public static final int LOAD_BUFFER_RATIO_DEFAULT_VALUE = 20;
 
     @SerializedName(value = "id")
     private long id;
@@ -126,6 +130,17 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
             this.memoryLimitPercent = Double.parseDouble(
                     memoryLimitString.substring(0, memoryLimitString.length() 
- 1));
         }
+
+        if (properties.containsKey(LOAD_BUFFER_RATIO)) {
+            String loadBufLimitStr = properties.get(LOAD_BUFFER_RATIO);
+            if (loadBufLimitStr.endsWith("%")) {
+                loadBufLimitStr = loadBufLimitStr.substring(0, 
loadBufLimitStr.length() - 1);
+            }
+            this.properties.put(LOAD_BUFFER_RATIO, loadBufLimitStr);
+        } else {
+            this.properties.put(LOAD_BUFFER_RATIO, 
LOAD_BUFFER_RATIO_DEFAULT_VALUE + "");
+        }
+
         if (properties.containsKey(ENABLE_MEMORY_OVERCOMMIT)) {
             properties.put(ENABLE_MEMORY_OVERCOMMIT, 
properties.get(ENABLE_MEMORY_OVERCOMMIT).toLowerCase());
         }
@@ -256,6 +271,26 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
             }
         }
 
+        if (properties.containsKey(LOAD_BUFFER_RATIO)) {
+            String memoryLimit = properties.get(LOAD_BUFFER_RATIO);
+            if (!memoryLimit.endsWith("%")) {
+                throw new DdlException(LOAD_BUFFER_RATIO + " " + memoryLimit
+                        + " requires a percentage and ends with a '%'");
+            }
+            String memLimitErr = LOAD_BUFFER_RATIO + " " + memoryLimit
+                    + " requires a positive int number.";
+            try {
+                if (Integer.parseInt(memoryLimit.substring(0, 
memoryLimit.length() - 1)) < 0) {
+                    throw new DdlException(memLimitErr);
+                }
+            } catch (NumberFormatException e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(memLimitErr, e);
+                }
+                throw new DdlException(memLimitErr);
+            }
+        }
+
         if (properties.containsKey(ENABLE_MEMORY_OVERCOMMIT)) {
             String value = 
properties.get(ENABLE_MEMORY_OVERCOMMIT).toLowerCase();
             if (!("true".equals(value) || "false".equals(value))) {
@@ -482,6 +517,12 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
                 row.add("-1");
             } else if (MEMORY_LIMIT.equals(key) && 
!properties.containsKey(key)) {
                 row.add("0%");
+            } else if (LOAD_BUFFER_RATIO.equals(key)) {
+                if (properties.containsKey(key)) {
+                    row.add(properties.get(key) + "%");
+                } else {
+                    row.add(LOAD_BUFFER_RATIO_DEFAULT_VALUE + "%");
+                }
             } else if (ENABLE_MEMORY_OVERCOMMIT.equals(key) && 
!properties.containsKey(key)) {
                 row.add("true");
             } else if (SCAN_THREAD_NUM.equals(key) && 
!properties.containsKey(key)) {
@@ -568,6 +609,10 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
         if (memLimitStr != null) {
             tWorkloadGroupInfo.setMemLimit(memLimitStr);
         }
+        String loadBufferRatioStr = properties.get(LOAD_BUFFER_RATIO);
+        if (loadBufferRatioStr != null) {
+            
tWorkloadGroupInfo.setLoadBufferRatio(Integer.parseInt(loadBufferRatioStr));
+        }
         String memOvercommitStr = properties.get(ENABLE_MEMORY_OVERCOMMIT);
         if (memOvercommitStr != null) {
             
tWorkloadGroupInfo.setEnableMemoryOvercommit(Boolean.valueOf(memOvercommitStr));
diff --git a/gensrc/thrift/BackendService.thrift 
b/gensrc/thrift/BackendService.thrift
index 4acd4602432..fa4fedcc9bb 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -271,6 +271,7 @@ struct TWorkloadGroupInfo {
   15: optional i64 remote_read_bytes_per_second
   16: optional string tag
   17: optional i32 total_query_slot_count
+  18: optional i32 load_buffer_ratio
 }
 
 enum TWorkloadMetricType {
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index d514c473d88..37c7da3d702 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -352,7 +352,7 @@ struct TQueryOptions {
   // The minimum memory that an operator required to run.
   137: optional i32 minimum_operator_memory_required_kb = 1024;
 
-  138: optional bool enable_query_slot_hard_limit = false;
+  138: optional bool enable_mem_overcommit = true;
   139: optional i32 query_slot_count = 0;
 
   140: optional bool enable_auto_create_when_overwrite = false;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to