This is an automated email from the ASF dual-hosted git repository.

mrhhsg pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 4b7eafb007102aa8cab1b66a61c590c5d36c77f7
Author: yiguolei <676222...@qq.com>
AuthorDate: Wed Oct 30 19:23:44 2024 +0800

    Should block memtable only when buffer limit reached (#42927)
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
    
    Co-authored-by: yiguolei <yiguo...@gmail.com>
---
 be/src/runtime/query_context.cpp                   |  3 +-
 be/src/runtime/query_context.h                     |  5 +-
 be/src/runtime/workload_group/workload_group.cpp   | 23 +++++---
 be/src/runtime/workload_group/workload_group.h     |  7 +++
 .../workload_group/workload_group_manager.cpp      | 63 +++++++++++++++-------
 .../resource/workloadgroup/WorkloadGroup.java      | 43 ++++++++++++++-
 .../resource/workloadgroup/WorkloadGroupTest.java  | 20 +++++++
 gensrc/thrift/BackendService.thrift                |  7 +++
 8 files changed, 140 insertions(+), 31 deletions(-)

diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 84b4cbc182d..753a5fc5ef0 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -148,6 +148,7 @@ void QueryContext::_init_query_mem_tracker() {
     }
     query_mem_tracker->set_overcommit(enable_mem_overcommit());
     _user_set_mem_limit = bytes_limit;
+    _expected_mem_limit = _user_set_mem_limit;
 }
 
 QueryContext::~QueryContext() {
@@ -386,7 +387,7 @@ void QueryContext::add_fragment_profile(
 #endif
 
     std::lock_guard<std::mutex> l(_profile_mutex);
-    LOG_INFO("Query X add fragment profile, query {}, fragment {}, pipeline 
profile count {} ",
+    LOG_INFO("Add fragment profile, query {}, fragment {}, pipeline profile 
count {} ",
              print_id(this->_query_id), fragment_id, pipeline_profiles.size());
 
     _profile_map.insert(std::make_pair(fragment_id, pipeline_profiles));
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 04c32535100..c7143c339eb 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -235,9 +235,7 @@ public:
 
     // This method is called by workload group manager to set query's memlimit 
using slot
     // If user set query limit explicitly, then should use less one
-    void set_mem_limit(int64_t new_mem_limit) {
-        query_mem_tracker->set_limit(std::min<int64_t>(new_mem_limit, 
_user_set_mem_limit));
-    }
+    void set_mem_limit(int64_t new_mem_limit) { 
query_mem_tracker->set_limit(new_mem_limit); }
 
     int64_t get_mem_limit() const { return query_mem_tracker->limit(); }
 
@@ -245,6 +243,7 @@ public:
         _expected_mem_limit = std::min<int64_t>(new_mem_limit, 
_user_set_mem_limit);
     }
 
+    // Expected mem limit is the limit when workload group reached limit.
     int64_t expected_mem_limit() { return _expected_mem_limit; }
 
     std::shared_ptr<MemTrackerLimiter>& get_mem_tracker() { return 
query_mem_tracker; }
diff --git a/be/src/runtime/workload_group/workload_group.cpp 
b/be/src/runtime/workload_group/workload_group.cpp
index 1ba4a1c1c35..ca34c6bf1b6 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -47,8 +47,8 @@ namespace doris {
 const static std::string MEMORY_LIMIT_DEFAULT_VALUE = "0%";
 const static bool ENABLE_MEMORY_OVERCOMMIT_DEFAULT_VALUE = true;
 const static int CPU_HARD_LIMIT_DEFAULT_VALUE = -1;
-const static int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50;
-const static int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80;
+const static int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 75;
+const static int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 90;
 // This is a invalid value, and should ignore this value during usage
 const static int TOTAL_QUERY_SLOT_COUNT_DEFAULT_VALUE = 0;
 const static int LOAD_BUFFER_RATIO_DEFAULT_VALUE = 20;
@@ -70,7 +70,8 @@ WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info)
           _spill_high_watermark(tg_info.spill_high_watermark),
           _scan_bytes_per_second(tg_info.read_bytes_per_second),
           _remote_scan_bytes_per_second(tg_info.remote_read_bytes_per_second),
-          _total_query_slot_count(tg_info.total_query_slot_count) {
+          _total_query_slot_count(tg_info.total_query_slot_count),
+          _slot_mem_policy(tg_info.slot_mem_policy) {
     std::vector<DataDirInfo>& data_dir_list = 
io::BeConfDataDirReader::be_config_data_dir_list;
     for (const auto& data_dir : data_dir_list) {
         _scan_io_throttle_map[data_dir.path] =
@@ -93,8 +94,8 @@ std::string WorkloadGroup::debug_string() const {
     auto mem_used_ratio = realtime_total_mem_used / ((double)_memory_limit + 
1);
     return fmt::format(
             "WorkloadGroup[id = {}, name = {}, version = {}, cpu_share = {}, "
-            "total_query_slot_count={}, "
-            "memory_limit = {}, write_buffer_ratio= {}%, "
+            "total_query_slot_count = {}, "
+            "memory_limit = {}, slot_memory_policy = {}, write_buffer_ratio= 
{}%, "
             "enable_memory_overcommit = {}, total_mem_used = {},"
             "wg_refresh_interval_memory_growth = {},  mem_used_ratio = {}, 
spill_low_watermark = "
             "{}, spill_high_watermark = {},cpu_hard_limit = {}, 
scan_thread_num = "
@@ -102,8 +103,8 @@ std::string WorkloadGroup::debug_string() const {
             "is_shutdown={}, query_num={}, "
             "read_bytes_per_second={}, remote_read_bytes_per_second={}]",
             _id, _name, _version, cpu_share(), _total_query_slot_count,
-            PrettyPrinter::print(_memory_limit, TUnit::BYTES), 
_load_buffer_ratio,
-            _enable_memory_overcommit ? "true" : "false",
+            PrettyPrinter::print(_memory_limit, TUnit::BYTES), 
to_string(_slot_mem_policy),
+            _load_buffer_ratio, _enable_memory_overcommit ? "true" : "false",
             PrettyPrinter::print(_total_mem_used.load(), TUnit::BYTES),
             PrettyPrinter::print(_wg_refresh_interval_memory_growth.load(), 
TUnit::BYTES),
             mem_used_ratio, _spill_low_watermark, _spill_high_watermark, 
cpu_hard_limit(),
@@ -176,6 +177,7 @@ void WorkloadGroup::check_and_update(const 
WorkloadGroupInfo& tg_info) {
             _remote_scan_bytes_per_second = 
tg_info.remote_read_bytes_per_second;
             _total_query_slot_count = tg_info.total_query_slot_count;
             _load_buffer_ratio = tg_info.write_buffer_ratio;
+            _slot_mem_policy = tg_info.slot_mem_policy;
         } else {
             return;
         }
@@ -551,6 +553,12 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
         write_buffer_ratio = tworkload_group_info.write_buffer_ratio;
     }
 
+    // 18 slot memory policy
+    TWgSlotMemoryPolicy::type slot_mem_policy = TWgSlotMemoryPolicy::DISABLED;
+    if (tworkload_group_info.__isset.slot_memory_policy) {
+        slot_mem_policy = tworkload_group_info.slot_memory_policy;
+    }
+
     return {.id = tg_id,
             .name = name,
             .cpu_share = cpu_share,
@@ -567,6 +575,7 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
             .read_bytes_per_second = read_bytes_per_second,
             .remote_read_bytes_per_second = remote_read_bytes_per_second,
             .total_query_slot_count = total_query_slot_count,
+            .slot_mem_policy = slot_mem_policy,
             .write_buffer_ratio = write_buffer_ratio};
 }
 
diff --git a/be/src/runtime/workload_group/workload_group.h 
b/be/src/runtime/workload_group/workload_group.h
index 1d1a65547e8..6e2fa426cf6 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -29,6 +29,7 @@
 #include <string>
 #include <unordered_set>
 
+#include "common/factory_creator.h"
 #include "common/status.h"
 #include "service/backend_options.h"
 #include "util/hash_util.hpp"
@@ -55,6 +56,8 @@ class WorkloadGroup;
 struct WorkloadGroupInfo;
 struct TrackerLimiterGroup;
 class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
+    ENABLE_FACTORY_CREATOR(WorkloadGroup);
+
 public:
     explicit WorkloadGroup(const WorkloadGroupInfo& tg_info);
 
@@ -136,6 +139,8 @@ public:
         return _memory_limit > 0;
     }
 
+    TWgSlotMemoryPolicy::type slot_memory_policy() const { return 
_slot_mem_policy; }
+
     bool exceed_limit() {
         std::shared_lock<std::shared_mutex> r_lock(_mutex);
         return _memory_limit > 0 ? _total_mem_used > _memory_limit : false;
@@ -240,6 +245,7 @@ private:
     std::atomic<int64_t> _scan_bytes_per_second {-1};
     std::atomic<int64_t> _remote_scan_bytes_per_second {-1};
     std::atomic<int> _total_query_slot_count = 0;
+    std::atomic<TWgSlotMemoryPolicy::type> _slot_mem_policy 
{TWgSlotMemoryPolicy::DISABLED};
 
     // means workload group is mark dropped
     // new query can not submit
@@ -284,6 +290,7 @@ struct WorkloadGroupInfo {
     const int read_bytes_per_second = -1;
     const int remote_read_bytes_per_second = -1;
     const int total_query_slot_count = 0;
+    const TWgSlotMemoryPolicy::type slot_mem_policy = 
TWgSlotMemoryPolicy::DISABLED;
     const int write_buffer_ratio = 0;
     // log cgroup cpu info
     uint64_t cgroup_cpu_shares = 0;
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp 
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 3d820293694..853f3740551 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -279,7 +279,7 @@ void WorkloadGroupMgr::handle_paused_queries() {
             }
         }
     }
-    const int64_t TIMEOUT_IN_QUEUE = 1000L * 10;
+    const int64_t TIMEOUT_IN_QUEUE = 1000L * 3;
     std::unique_lock<std::mutex> lock(_paused_queries_lock);
     bool has_revoked_from_other_group = false;
     for (auto it = _paused_queries_list.begin(); it != 
_paused_queries_list.end();) {
@@ -372,17 +372,34 @@ void WorkloadGroupMgr::handle_paused_queries() {
                                  "so that other query will reduce their 
memory. wg: "
                               << wg->debug_string();
                 }
-                // Should not put the query back to task scheduler 
immediately, because when wg's memory not sufficient,
-                // and then set wg's flag, other query may not free memory 
very quickly.
-                if (query_it->elapsed_time() > TIMEOUT_IN_QUEUE) {
-                    // set wg's memory to insufficent, then add it back to 
task scheduler to run.
-                    LOG(INFO) << "query: " << print_id(query_ctx->query_id()) 
<< " will be resume.";
-                    query_ctx->set_memory_sufficient(true);
-                    query_it = queries_list.erase(query_it);
-                    continue;
+                if (wg->slot_memory_policy() == TWgSlotMemoryPolicy::DISABLED) 
{
+                    // If not enable slot memory policy, then should spill 
directly
+                    // Maybe there are another query that use too much memory, 
but we
+                    // not encourage not enable slot memory.
+                    // TODO should kill the query that exceed limit.
+                    bool spill_res = handle_single_query_(query_ctx, 
query_it->reserve_size_,
+                                                          
query_ctx->paused_reason());
+                    if (!spill_res) {
+                        ++query_it;
+                        continue;
+                    } else {
+                        query_it = queries_list.erase(query_it);
+                        continue;
+                    }
                 } else {
-                    ++query_it;
-                    continue;
+                    // Should not put the query back to task scheduler 
immediately, because when wg's memory not sufficient,
+                    // and then set wg's flag, other query may not free memory 
very quickly.
+                    if (query_it->elapsed_time() > TIMEOUT_IN_QUEUE) {
+                        // set wg's memory to insufficent, then add it back to 
task scheduler to run.
+                        LOG(INFO) << "query: " << 
print_id(query_ctx->query_id())
+                                  << " will be resume.";
+                        query_ctx->set_memory_sufficient(true);
+                        query_it = queries_list.erase(query_it);
+                        continue;
+                    } else {
+                        ++query_it;
+                        continue;
+                    }
                 }
             } else {
                 // If wg's memlimit not exceed, but process memory exceed, it 
means cache or other metadata
@@ -557,6 +574,10 @@ int64_t 
WorkloadGroupMgr::revoke_overcommited_memory_(std::shared_ptr<QueryConte
     {
         std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
         for (auto iter = _workload_groups.begin(); iter != 
_workload_groups.end(); iter++) {
+            if (requestor->workload_group() != nullptr &&
+                iter->second->id() == requestor->workload_group()->id()) {
+                continue;
+            }
             heap.emplace(iter->second, iter->second->memory_used());
         }
     }
@@ -620,7 +641,8 @@ bool 
WorkloadGroupMgr::handle_single_query_(std::shared_ptr<QueryContext> query_
                         "query({}) reserve memory failed, but could not find 
memory that "
                         "could "
                         "release or spill to disk(memory usage:{}, limit: {})",
-                        query_id, PrettyPrinter::print_bytes(memory_usage), 
PrettyPrinter::print_bytes(query_ctx->get_mem_limit())));
+                        query_id, PrettyPrinter::print_bytes(memory_usage),
+                        
PrettyPrinter::print_bytes(query_ctx->get_mem_limit())));
             }
         } else {
             if (!GlobalMemoryArbitrator::is_exceed_hard_mem_limit()) {
@@ -678,14 +700,15 @@ void 
WorkloadGroupMgr::update_queries_limit_(WorkloadGroupPtr wg, bool enable_ha
                 (double)(wg->total_mem_used()) / wg_mem_limit);
     }
 
-    // If the wg enable over commit memory, then it is no need to update query 
memlimit
-    if (wg->enable_memory_overcommit()) {
-        return;
-    }
-    // If reached low watermark then enable load buffer limit
-    if (is_low_wartermark) {
+    // If reached low watermark and wg is not enable memory overcommit, then 
enable load buffer limit
+    if (is_low_wartermark && !wg->enable_memory_overcommit()) {
         wg->enable_write_buffer_limit(true);
     }
+    // Both enable overcommit and not enable overcommit, if user set slot 
memory policy
+    // then we will replace the memtracker's memlimit with
+    if (wg->slot_memory_policy() == TWgSlotMemoryPolicy::DISABLED) {
+        return;
+    }
     int32_t total_used_slot_count = 0;
     int32_t total_slot_count = wg->total_query_slot_count();
     // calculate total used slot count
@@ -709,7 +732,7 @@ void 
WorkloadGroupMgr::update_queries_limit_(WorkloadGroupPtr wg, bool enable_ha
         int64_t query_weighted_mem_limit = 0;
         int64_t expected_query_weighted_mem_limit = 0;
         // If the query enable hard limit, then it should not use the soft 
limit
-        if (!query_ctx->enable_mem_overcommit()) {
+        if (wg->slot_memory_policy() == TWgSlotMemoryPolicy::FIXED) {
             if (total_slot_count < 1) {
                 LOG(WARNING)
                         << "query " << print_id(query_ctx->query_id())
@@ -740,6 +763,8 @@ void 
WorkloadGroupMgr::update_queries_limit_(WorkloadGroupPtr wg, bool enable_ha
         // If the query is a pure load task, then should not modify its limit. 
Or it will reserve
         // memory failed and we did not hanle it.
         if (!query_ctx->is_pure_load_task()) {
+            // If slot memory policy is enabled, then overcommit is disabled.
+            query_ctx->get_mem_tracker()->set_overcommit(false);
             query_ctx->set_mem_limit(query_weighted_mem_limit);
             
query_ctx->set_expected_mem_limit(expected_query_weighted_mem_limit);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
index 63e454e6dbd..c2a4e97e9c4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
@@ -29,6 +29,7 @@ import org.apache.doris.common.proc.BaseProcResult;
 import org.apache.doris.persist.gson.GsonPostProcessable;
 import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.thrift.TPipelineWorkloadGroup;
+import org.apache.doris.thrift.TWgSlotMemoryPolicy;
 import org.apache.doris.thrift.TWorkloadGroupInfo;
 import org.apache.doris.thrift.TopicInfo;
 
@@ -43,6 +44,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
@@ -75,6 +77,8 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
 
     public static final String SPILL_THRESHOLD_HIGH_WATERMARK = 
"spill_threshold_high_watermark";
 
+    public static final String SLOT_MEMORY_POLICY = "slot_memory_policy";
+
     public static final String TAG = "tag";
 
     public static final String READ_BYTES_PER_SECOND = "read_bytes_per_second";
@@ -90,11 +94,17 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
             .add(MAX_REMOTE_SCAN_THREAD_NUM).add(MIN_REMOTE_SCAN_THREAD_NUM)
             
.add(SPILL_THRESHOLD_LOW_WATERMARK).add(SPILL_THRESHOLD_HIGH_WATERMARK)
             
.add(TAG).add(READ_BYTES_PER_SECOND).add(REMOTE_READ_BYTES_PER_SECOND)
-            .add(WRITE_BUFFER_RATIO).build();
+            .add(WRITE_BUFFER_RATIO).add(SLOT_MEMORY_POLICY).build();
 
     public static final int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 75;
     public static final int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 90;
     public static final int WRITE_BUFFER_RATIO_DEFAULT_VALUE = 20;
+    public static final String SLOT_MEMORY_POLICY_DEFAULT_VALUE = "disabled";
+    public static final HashSet<String> AVAILABLE_SLOT_MEMORY_POLICY_VALUES = 
new HashSet<String>() {{
+            add("disabled");
+            add("fixed");
+            add("dynamic");
+        }};
 
     @SerializedName(value = "id")
     private long id;
@@ -141,6 +151,13 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
             this.properties.put(WRITE_BUFFER_RATIO, 
WRITE_BUFFER_RATIO_DEFAULT_VALUE + "");
         }
 
+        if (properties.containsKey(SLOT_MEMORY_POLICY)) {
+            String slotPolicy = properties.get(SLOT_MEMORY_POLICY);
+            this.properties.put(SLOT_MEMORY_POLICY, slotPolicy);
+        } else {
+            this.properties.put(SLOT_MEMORY_POLICY, 
SLOT_MEMORY_POLICY_DEFAULT_VALUE);
+        }
+
         if (properties.containsKey(ENABLE_MEMORY_OVERCOMMIT)) {
             properties.put(ENABLE_MEMORY_OVERCOMMIT, 
properties.get(ENABLE_MEMORY_OVERCOMMIT).toLowerCase());
         }
@@ -298,6 +315,14 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
             }
         }
 
+        if (properties.containsKey(SLOT_MEMORY_POLICY)) {
+            String value = properties.get(SLOT_MEMORY_POLICY).toLowerCase();
+            if (!AVAILABLE_SLOT_MEMORY_POLICY_VALUES.contains(value)) {
+                throw new DdlException("The value of '" + SLOT_MEMORY_POLICY
+                        + "' must be one of disabled, fixed, dynamic.");
+            }
+        }
+
         if (properties.containsKey(SCAN_THREAD_NUM)) {
             String value = properties.get(SCAN_THREAD_NUM);
             try {
@@ -589,6 +614,18 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
         return new TPipelineWorkloadGroup().setId(id);
     }
 
+    public static TWgSlotMemoryPolicy findSlotPolicyValueByString(String 
slotPolicy) {
+        if (slotPolicy.equalsIgnoreCase("disabled")) {
+            return TWgSlotMemoryPolicy.DISABLED;
+        } else if (slotPolicy.equalsIgnoreCase("fixed")) {
+            return TWgSlotMemoryPolicy.FIXED;
+        } else if (slotPolicy.equalsIgnoreCase("dynamic")) {
+            return TWgSlotMemoryPolicy.DYNAMIC;
+        } else {
+            throw new RuntimeException("Could not find policy using " + 
slotPolicy);
+        }
+    }
+
     public TopicInfo toTopicInfo() {
         TWorkloadGroupInfo tWorkloadGroupInfo = new TWorkloadGroupInfo();
         tWorkloadGroupInfo.setId(id);
@@ -613,6 +650,10 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
         if (writeBufferRatioStr != null) {
             
tWorkloadGroupInfo.setWriteBufferRatio(Integer.parseInt(writeBufferRatioStr));
         }
+        String slotMemoryPolicyStr = properties.get(SLOT_MEMORY_POLICY);
+        if (slotMemoryPolicyStr != null) {
+            
tWorkloadGroupInfo.setSlotMemoryPolicy(findSlotPolicyValueByString(slotMemoryPolicyStr));
+        }
         String memOvercommitStr = properties.get(ENABLE_MEMORY_OVERCOMMIT);
         if (memOvercommitStr != null) {
             
tWorkloadGroupInfo.setEnableMemoryOvercommit(Boolean.valueOf(memOvercommitStr));
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupTest.java
index f99fd4f3526..872a3d17b41 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupTest.java
@@ -19,6 +19,7 @@ package org.apache.doris.resource.workloadgroup;
 
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.proc.BaseProcResult;
+import org.apache.doris.thrift.TWgSlotMemoryPolicy;
 
 import com.google.common.collect.Maps;
 import org.junit.Assert;
@@ -87,4 +88,23 @@ public class WorkloadGroupTest {
         List<List<String>> rows = result.getRows();
         Assert.assertEquals(1, rows.size());
     }
+
+    @Test
+    public void testPolicyToString() {
+        TWgSlotMemoryPolicy p1 = 
WorkloadGroup.findSlotPolicyValueByString("fixed");
+        Assert.assertEquals(p1, TWgSlotMemoryPolicy.FIXED);
+        TWgSlotMemoryPolicy p2 = 
WorkloadGroup.findSlotPolicyValueByString("dynamic");
+        Assert.assertEquals(p2, TWgSlotMemoryPolicy.DYNAMIC);
+        TWgSlotMemoryPolicy p3 = 
WorkloadGroup.findSlotPolicyValueByString("disabled");
+        Assert.assertEquals(p3, TWgSlotMemoryPolicy.DISABLED);
+        TWgSlotMemoryPolicy p4 = 
WorkloadGroup.findSlotPolicyValueByString("disableD");
+        Assert.assertEquals(p4, TWgSlotMemoryPolicy.DISABLED);
+        boolean hasException = false;
+        try {
+            WorkloadGroup.findSlotPolicyValueByString("disableDa");
+        } catch (RuntimeException e) {
+            hasException = true;
+        }
+        Assert.assertEquals(hasException, true);
+    }
 }
diff --git a/gensrc/thrift/BackendService.thrift 
b/gensrc/thrift/BackendService.thrift
index 6b41cbc14b7..c016d9ba5c8 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -253,6 +253,12 @@ enum TTopicInfoType {
     WORKLOAD_SCHED_POLICY = 2
 }
 
+enum TWgSlotMemoryPolicy {
+    DISABLED = 0,
+    FIXED = 1,
+    DYNAMIC = 2
+}
+
 struct TWorkloadGroupInfo {
   1: optional i64 id
   2: optional string name
@@ -272,6 +278,7 @@ struct TWorkloadGroupInfo {
   16: optional string tag
   17: optional i32 total_query_slot_count
   18: optional i32 write_buffer_ratio
+  19: optional TWgSlotMemoryPolicy slot_memory_policy
 }
 
 enum TWorkloadMetricType {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to