This is an automated email from the ASF dual-hosted git repository.

wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new df144a36aa7 [feature](workloadgroup)Add workload condition query used 
memory (#35662)
df144a36aa7 is described below

commit df144a36aa7bf5b6f8aad9fcfdf56692c9d868f6
Author: wangbo <wan...@apache.org>
AuthorDate: Tue Jun 4 10:39:10 2024 +0800

    [feature](workloadgroup)Add workload condition query used memory (#35662)
    
    ## Proposed changes
    Add workload condition query used memory, we can kill queries based on
    memory usage.
    
    1  create a policy which can kill query used memory exceeds 100M.
    ```
    create workload policy memory_used_policy conditions(query_be_memory_bytes 
> 104857600) actions(cancel_query);
    ```
    
    2 submit a query.
    ```
    mysql [hits]>insert into hits2 select * from hits;
    ERROR 1105 (HY000): errCode = 2, detailMessage = 
(10.16.10.8)[INTERNAL_ERROR]query 81c39e2b3ecf461c-bc78ad6d9b6173d2 cancelled 
by workload policy memory_used_policy, id:29028
    ```
---
 be/src/runtime/runtime_query_statistics_mgr.cpp    |  2 +
 .../workload_management/workload_action.cpp        | 11 ++++--
 .../workload_management/workload_condition.cpp     | 13 ++++++
 .../workload_management/workload_condition.h       | 17 +++++++-
 .../workload_management/workload_query_info.h      |  2 +
 .../workload_management/workload_sched_policy.cpp  |  2 +
 .../workloadschedpolicy/WorkloadActionMeta.java    | 12 +++---
 .../workloadschedpolicy/WorkloadCondition.java     |  2 +
 .../WorkloadConditionBeScanBytes.java              | 11 ++++--
 .../WorkloadConditionBeScanRows.java               | 11 ++++--
 .../workloadschedpolicy/WorkloadConditionMeta.java | 14 +++----
 ...ws.java => WorkloadConditionQueryBeMemory.java} | 32 ++++++++-------
 .../WorkloadConditionQueryTime.java                | 11 ++++--
 .../workloadschedpolicy/WorkloadMetricType.java    |  2 +-
 .../workloadschedpolicy/WorkloadSchedPolicy.java   | 27 ++-----------
 .../WorkloadSchedPolicyMgr.java                    | 46 +++++++++++++++++++++-
 gensrc/thrift/BackendService.thrift                |  1 +
 .../test_workload_sched_policy.groovy              | 33 ++++++++++++++--
 18 files changed, 177 insertions(+), 72 deletions(-)

diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp 
b/be/src/runtime/runtime_query_statistics_mgr.cpp
index 55051eff686..3e3dd3de2dd 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.cpp
+++ b/be/src/runtime/runtime_query_statistics_mgr.cpp
@@ -621,6 +621,8 @@ void RuntimeQueryStatiticsMgr::get_metric_map(
     metric_map.emplace(WorkloadMetricType::QUERY_TIME, 
std::to_string(query_time_ms));
     metric_map.emplace(WorkloadMetricType::SCAN_ROWS, 
std::to_string(ret_qs.get_scan_rows()));
     metric_map.emplace(WorkloadMetricType::SCAN_BYTES, 
std::to_string(ret_qs.get_scan_bytes()));
+    metric_map.emplace(WorkloadMetricType::QUERY_MEMORY_BYTES,
+                       std::to_string(ret_qs.get_current_used_memory_bytes()));
 }
 
 void RuntimeQueryStatiticsMgr::set_workload_group_id(std::string query_id, 
int64_t wg_id) {
diff --git a/be/src/runtime/workload_management/workload_action.cpp 
b/be/src/runtime/workload_management/workload_action.cpp
index 2a9ff8b03a7..8e6e3b19e2c 100644
--- a/be/src/runtime/workload_management/workload_action.cpp
+++ b/be/src/runtime/workload_management/workload_action.cpp
@@ -22,9 +22,14 @@
 namespace doris {
 
 void WorkloadActionCancelQuery::exec(WorkloadQueryInfo* query_info) {
-    LOG(INFO) << "[workload_schedule]workload scheduler cancel query " << 
query_info->query_id;
-    ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
-            query_info->tquery_id, Status::InternalError("query canceled by 
workload scheduler"));
+    std::stringstream msg;
+    msg << "query " << query_info->query_id
+        << " cancelled by workload policy: " << query_info->policy_name
+        << ", id:" << query_info->policy_id;
+    std::string msg_str = msg.str();
+    LOG(INFO) << "[workload_schedule]" << msg_str;
+    ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_info->tquery_id,
+                                                         
Status::InternalError<false>(msg_str));
 }
 
 void WorkloadActionMoveQuery::exec(WorkloadQueryInfo* query_info) {
diff --git a/be/src/runtime/workload_management/workload_condition.cpp 
b/be/src/runtime/workload_management/workload_condition.cpp
index dff6f2adc24..62c6072a60c 100644
--- a/be/src/runtime/workload_management/workload_condition.cpp
+++ b/be/src/runtime/workload_management/workload_condition.cpp
@@ -56,4 +56,17 @@ bool WorkloadConditionScanBytes::eval(std::string str_val) {
     return WorkloadCompareUtils::compare_signed_integer(_op, scan_bytes_args, 
_scan_bytes);
 }
 
+// query memory
+WorkloadConditionQueryMemory::WorkloadConditionQueryMemory(WorkloadCompareOperator
 op,
+                                                           std::string 
str_val) {
+    _op = op;
+    _query_memory_bytes = std::stol(str_val);
+}
+
+bool WorkloadConditionQueryMemory::eval(std::string str_val) {
+    int64_t query_memory_bytes = std::stol(str_val);
+    return WorkloadCompareUtils::compare_signed_integer(_op, 
query_memory_bytes,
+                                                        _query_memory_bytes);
+}
+
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/workload_management/workload_condition.h 
b/be/src/runtime/workload_management/workload_condition.h
index 96387a2af41..a85268a8dc3 100644
--- a/be/src/runtime/workload_management/workload_condition.h
+++ b/be/src/runtime/workload_management/workload_condition.h
@@ -23,7 +23,7 @@
 
 namespace doris {
 
-enum WorkloadMetricType { QUERY_TIME, SCAN_ROWS, SCAN_BYTES };
+enum WorkloadMetricType { QUERY_TIME, SCAN_ROWS, SCAN_BYTES, 
QUERY_MEMORY_BYTES };
 
 class WorkloadCondition {
 public:
@@ -74,6 +74,19 @@ private:
     WorkloadCompareOperator _op;
 };
 
+class WorkloadConditionQueryMemory : public WorkloadCondition {
+public:
+    WorkloadConditionQueryMemory(WorkloadCompareOperator op, std::string 
str_val);
+    bool eval(std::string str_val) override;
+    WorkloadMetricType get_workload_metric_type() override {
+        return WorkloadMetricType::QUERY_MEMORY_BYTES;
+    }
+
+private:
+    int64_t _query_memory_bytes;
+    WorkloadCompareOperator _op;
+};
+
 class WorkloadConditionFactory {
 public:
     static std::unique_ptr<WorkloadCondition> create_workload_condition(
@@ -88,6 +101,8 @@ public:
             return std::make_unique<WorkloadConditionScanRows>(op, str_val);
         } else if (TWorkloadMetricType::type::BE_SCAN_BYTES == metric_name) {
             return std::make_unique<WorkloadConditionScanBytes>(op, str_val);
+        } else if (TWorkloadMetricType::type::QUERY_BE_MEMORY_BYTES == 
metric_name) {
+            return std::make_unique<WorkloadConditionQueryMemory>(op, str_val);
         }
         LOG(ERROR) << "not find a metric name " << metric_name;
         return nullptr;
diff --git a/be/src/runtime/workload_management/workload_query_info.h 
b/be/src/runtime/workload_management/workload_query_info.h
index f2da31b6196..e544668e103 100644
--- a/be/src/runtime/workload_management/workload_query_info.h
+++ b/be/src/runtime/workload_management/workload_query_info.h
@@ -29,6 +29,8 @@ public:
     TUniqueId tquery_id;
     std::string query_id;
     int64_t wg_id;
+    int64_t policy_id;
+    std::string policy_name;
 };
 
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/workload_management/workload_sched_policy.cpp 
b/be/src/runtime/workload_management/workload_sched_policy.cpp
index b97eb85c068..efa8965dd77 100644
--- a/be/src/runtime/workload_management/workload_sched_policy.cpp
+++ b/be/src/runtime/workload_management/workload_sched_policy.cpp
@@ -75,6 +75,8 @@ bool WorkloadSchedPolicy::is_match(WorkloadQueryInfo* 
query_info_ptr) {
 
 void WorkloadSchedPolicy::exec_action(WorkloadQueryInfo* query_info) {
     for (int i = 0; i < _action_list.size(); i++) {
+        query_info->policy_id = this->_id;
+        query_info->policy_name = this->_name;
         _action_list[i]->exec(query_info);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMeta.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMeta.java
index 57f6ba37993..2ce05412844 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMeta.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMeta.java
@@ -37,14 +37,12 @@ public class WorkloadActionMeta {
     }
 
     static WorkloadActionType getWorkloadActionType(String strType) throws 
UserException {
-        if 
(WorkloadActionType.CANCEL_QUERY.toString().equalsIgnoreCase(strType)) {
-            return WorkloadActionType.CANCEL_QUERY;
-        } else if 
(WorkloadActionType.MOVE_QUERY_TO_GROUP.toString().equalsIgnoreCase(strType)) {
-            return WorkloadActionType.MOVE_QUERY_TO_GROUP;
-        } else if 
(WorkloadActionType.SET_SESSION_VARIABLE.toString().equalsIgnoreCase(strType)) {
-            return WorkloadActionType.SET_SESSION_VARIABLE;
+        WorkloadActionType workloadActionType = 
WorkloadSchedPolicyMgr.STRING_ACTION_MAP.get(strType.toUpperCase());
+        if (workloadActionType == null) {
+            throw new UserException("invalid action type " + strType);
+        } else {
+            return workloadActionType;
         }
-        throw new UserException("invalid action type " + strType);
     }
 
     public String toString() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java
index 5d89d2afae9..c790a401308 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java
@@ -37,6 +37,8 @@ public interface WorkloadCondition {
             return WorkloadConditionBeScanRows.createWorkloadCondition(cm.op, 
cm.value);
         } else if (WorkloadMetricType.BE_SCAN_BYTES.equals(cm.metricName)) {
             return WorkloadConditionBeScanBytes.createWorkloadCondition(cm.op, 
cm.value);
+        } else if 
(WorkloadMetricType.QUERY_BE_MEMORY_BYTES.equals(cm.metricName)) {
+            return 
WorkloadConditionQueryBeMemory.createWorkloadCondition(cm.op, cm.value);
         }
         throw new UserException("invalid metric name:" + cm.metricName);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanBytes.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanBytes.java
index 7431f2e0c4f..bd914baf54e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanBytes.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanBytes.java
@@ -38,9 +38,14 @@ public class WorkloadConditionBeScanBytes implements 
WorkloadCondition {
 
     public static WorkloadConditionBeScanBytes 
createWorkloadCondition(WorkloadConditionOperator op, String value)
             throws UserException {
-        long longValue = Long.parseLong(value);
-        if (longValue < 0) {
-            throw new UserException("invalid scan bytes value, " + longValue + 
", it requires >= 0");
+        long longValue = -1;
+        try {
+            longValue = Long.parseLong(value);
+            if (longValue < 0) {
+                throw new NumberFormatException();
+            }
+        } catch (NumberFormatException e) {
+            throw new UserException("invalid scan bytes value: " + value + ", 
it requires >= 0");
         }
         return new WorkloadConditionBeScanBytes(op, longValue);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanRows.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanRows.java
index c2fb638e082..8b99e40d04d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanRows.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanRows.java
@@ -38,9 +38,14 @@ public class WorkloadConditionBeScanRows implements 
WorkloadCondition {
 
     public static WorkloadConditionBeScanRows 
createWorkloadCondition(WorkloadConditionOperator op, String value)
             throws UserException {
-        long longValue = Long.parseLong(value);
-        if (longValue < 0) {
-            throw new UserException("invalid scan rows value, " + longValue + 
", it requires >= 0");
+        long longValue = -1;
+        try {
+            longValue = Long.parseLong(value);
+            if (longValue < 0) {
+                throw new NumberFormatException();
+            }
+        } catch (NumberFormatException e) {
+            throw new UserException("invalid scan rows value: " + value + ", 
it requires >= 0");
         }
         return new WorkloadConditionBeScanRows(op, longValue);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionMeta.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionMeta.java
index 52f50f924fc..81e0f6c2188 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionMeta.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionMeta.java
@@ -40,16 +40,12 @@ public class WorkloadConditionMeta {
     }
 
     private static WorkloadMetricType getMetricType(String metricStr) throws 
UserException {
-        if 
(WorkloadMetricType.USERNAME.toString().equalsIgnoreCase(metricStr)) {
-            return WorkloadMetricType.USERNAME;
-        } else if 
(WorkloadMetricType.QUERY_TIME.toString().equalsIgnoreCase(metricStr)) {
-            return WorkloadMetricType.QUERY_TIME;
-        } else if 
(WorkloadMetricType.BE_SCAN_ROWS.toString().equalsIgnoreCase(metricStr)) {
-            return WorkloadMetricType.BE_SCAN_ROWS;
-        } else if 
(WorkloadMetricType.BE_SCAN_BYTES.toString().equalsIgnoreCase(metricStr)) {
-            return WorkloadMetricType.BE_SCAN_BYTES;
+        WorkloadMetricType metricType = 
WorkloadSchedPolicyMgr.STRING_METRIC_MAP.get(metricStr.toUpperCase());
+        if (metricType == null) {
+            throw new UserException("invalid metric name:" + metricStr);
+        } else {
+            return metricType;
         }
-        throw new UserException("invalid metric name:" + metricStr);
     }
 
     public String toString() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanRows.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionQueryBeMemory.java
similarity index 60%
copy from 
fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanRows.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionQueryBeMemory.java
index c2fb638e082..2274b35ca51 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanRows.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionQueryBeMemory.java
@@ -19,34 +19,38 @@ package org.apache.doris.resource.workloadschedpolicy;
 
 import org.apache.doris.common.UserException;
 
-public class WorkloadConditionBeScanRows implements WorkloadCondition {
+public class WorkloadConditionQueryBeMemory implements WorkloadCondition {
 
     private long value;
 
     private WorkloadConditionOperator op;
 
-    public WorkloadConditionBeScanRows(WorkloadConditionOperator op, long 
value) {
-        this.op = op;
+    public WorkloadConditionQueryBeMemory(WorkloadConditionOperator op, long 
value) {
         this.value = value;
+        this.op = op;
     }
 
     @Override
     public boolean eval(String strValue) {
-        // currently not support run in fe, so this condition never match
         return false;
     }
 
-    public static WorkloadConditionBeScanRows 
createWorkloadCondition(WorkloadConditionOperator op, String value)
-            throws UserException {
-        long longValue = Long.parseLong(value);
-        if (longValue < 0) {
-            throw new UserException("invalid scan rows value, " + longValue + 
", it requires >= 0");
-        }
-        return new WorkloadConditionBeScanRows(op, longValue);
-    }
-
     @Override
     public WorkloadMetricType getMetricType() {
-        return WorkloadMetricType.BE_SCAN_ROWS;
+        return WorkloadMetricType.QUERY_BE_MEMORY_BYTES;
+    }
+
+    public static WorkloadConditionQueryBeMemory 
createWorkloadCondition(WorkloadConditionOperator op,
+            String value) throws UserException {
+        long longValue = -1;
+        try {
+            longValue = Long.parseLong(value);
+            if (longValue < 0) {
+                throw new NumberFormatException();
+            }
+        } catch (NumberFormatException e) {
+            throw new UserException("invalid query be memory value: " + value 
+ ", it requires >= 0");
+        }
+        return new WorkloadConditionQueryBeMemory(op, longValue);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionQueryTime.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionQueryTime.java
index e61484508df..6c3a5c653aa 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionQueryTime.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionQueryTime.java
@@ -37,9 +37,14 @@ public class WorkloadConditionQueryTime implements 
WorkloadCondition {
 
     public static WorkloadConditionQueryTime 
createWorkloadCondition(WorkloadConditionOperator op, String value)
             throws UserException {
-        long longValue = Long.parseLong(value);
-        if (longValue < 0) {
-            throw new UserException("invalid query time value, " + longValue + 
", it requires >= 0");
+        long longValue = -1;
+        try {
+            longValue = Long.parseLong(value);
+            if (longValue < 0) {
+                throw new NumberFormatException();
+            }
+        } catch (NumberFormatException e) {
+            throw new UserException("invalid query time value: " + value + ", 
it requires >= 0");
         }
         return new WorkloadConditionQueryTime(op, longValue);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadMetricType.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadMetricType.java
index ed17414ec45..93e612a85c2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadMetricType.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadMetricType.java
@@ -18,5 +18,5 @@
 package org.apache.doris.resource.workloadschedpolicy;
 
 public enum WorkloadMetricType {
-    USERNAME, QUERY_TIME, BE_SCAN_ROWS, BE_SCAN_BYTES
+    USERNAME, QUERY_TIME, BE_SCAN_ROWS, BE_SCAN_BYTES, QUERY_BE_MEMORY_BYTES
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java
index 55759e90972..ff27a08706b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java
@@ -22,7 +22,6 @@ import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.persist.gson.GsonPostProcessable;
 import org.apache.doris.persist.gson.GsonUtils;
-import org.apache.doris.thrift.TCompareOperator;
 import org.apache.doris.thrift.TWorkloadAction;
 import org.apache.doris.thrift.TWorkloadActionType;
 import org.apache.doris.thrift.TWorkloadCondition;
@@ -31,7 +30,6 @@ import org.apache.doris.thrift.TWorkloadSchedPolicy;
 import org.apache.doris.thrift.TopicInfo;
 
 import com.esotericsoftware.minlog.Log;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.gson.annotations.SerializedName;
 
@@ -51,25 +49,6 @@ public class WorkloadSchedPolicy implements Writable, 
GsonPostProcessable {
     public static final ImmutableSet<String> POLICY_PROPERTIES = new 
ImmutableSet.Builder<String>()
             .add(ENABLED).add(PRIORITY).add(WORKLOAD_GROUP).build();
 
-    // used for convert fe type to thrift type
-    private static ImmutableMap<WorkloadMetricType, TWorkloadMetricType> 
METRIC_MAP
-            = new ImmutableMap.Builder<WorkloadMetricType, 
TWorkloadMetricType>()
-            .put(WorkloadMetricType.QUERY_TIME, TWorkloadMetricType.QUERY_TIME)
-            .put(WorkloadMetricType.BE_SCAN_ROWS, 
TWorkloadMetricType.BE_SCAN_ROWS)
-            .put(WorkloadMetricType.BE_SCAN_BYTES, 
TWorkloadMetricType.BE_SCAN_BYTES).build();
-    private static ImmutableMap<WorkloadActionType, TWorkloadActionType> 
ACTION_MAP
-            = new ImmutableMap.Builder<WorkloadActionType, 
TWorkloadActionType>()
-            .put(WorkloadActionType.MOVE_QUERY_TO_GROUP, 
TWorkloadActionType.MOVE_QUERY_TO_GROUP)
-            .put(WorkloadActionType.CANCEL_QUERY, 
TWorkloadActionType.CANCEL_QUERY).build();
-
-    private static ImmutableMap<WorkloadConditionOperator, TCompareOperator> 
OP_MAP
-            = new ImmutableMap.Builder<WorkloadConditionOperator, 
TCompareOperator>()
-            .put(WorkloadConditionOperator.EQUAL, TCompareOperator.EQUAL)
-            .put(WorkloadConditionOperator.GREATER, TCompareOperator.GREATER)
-            .put(WorkloadConditionOperator.GREATER_EQUAL, 
TCompareOperator.GREATER_EQUAL)
-            .put(WorkloadConditionOperator.LESS, TCompareOperator.LESS)
-            .put(WorkloadConditionOperator.LESS_EQUAl, 
TCompareOperator.LESS_EQUAL).build();
-
     @SerializedName(value = "id")
     long id;
     @SerializedName(value = "name")
@@ -255,12 +234,12 @@ public class WorkloadSchedPolicy implements Writable, 
GsonPostProcessable {
         List<TWorkloadCondition> condList = new ArrayList();
         for (WorkloadConditionMeta cond : conditionMetaList) {
             TWorkloadCondition tCond = new TWorkloadCondition();
-            TWorkloadMetricType metricType = METRIC_MAP.get(cond.metricName);
+            TWorkloadMetricType metricType = 
WorkloadSchedPolicyMgr.METRIC_MAP.get(cond.metricName);
             if (metricType == null) {
                 return null;
             }
             tCond.setMetricName(metricType);
-            tCond.setOp(OP_MAP.get(cond.op));
+            tCond.setOp(WorkloadSchedPolicyMgr.OP_MAP.get(cond.op));
             tCond.setValue(cond.value);
             condList.add(tCond);
         }
@@ -268,7 +247,7 @@ public class WorkloadSchedPolicy implements Writable, 
GsonPostProcessable {
         List<TWorkloadAction> actionList = new ArrayList();
         for (WorkloadActionMeta action : actionMetaList) {
             TWorkloadAction tAction = new TWorkloadAction();
-            TWorkloadActionType tActionType = ACTION_MAP.get(action.action);
+            TWorkloadActionType tActionType = 
WorkloadSchedPolicyMgr.ACTION_MAP.get(action.action);
             if (tActionType == null) {
                 return null;
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
index 4aa7563f8d7..3879dd83b9a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
@@ -35,11 +35,15 @@ import org.apache.doris.persist.gson.GsonPostProcessable;
 import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.service.ExecuteEnv;
+import org.apache.doris.thrift.TCompareOperator;
 import org.apache.doris.thrift.TUserIdentity;
+import org.apache.doris.thrift.TWorkloadActionType;
+import org.apache.doris.thrift.TWorkloadMetricType;
 import org.apache.doris.thrift.TopicInfo;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
 import com.google.gson.annotations.SerializedName;
@@ -80,6 +84,14 @@ public class WorkloadSchedPolicyMgr extends MasterDaemon 
implements Writable, Gs
             .add("WorkloadGroup")
             .build();
 
+    public static final ImmutableMap<WorkloadConditionOperator, 
TCompareOperator> OP_MAP
+            = new ImmutableMap.Builder<WorkloadConditionOperator, 
TCompareOperator>()
+            .put(WorkloadConditionOperator.EQUAL, TCompareOperator.EQUAL)
+            .put(WorkloadConditionOperator.GREATER, TCompareOperator.GREATER)
+            .put(WorkloadConditionOperator.GREATER_EQUAL, 
TCompareOperator.GREATER_EQUAL)
+            .put(WorkloadConditionOperator.LESS, TCompareOperator.LESS)
+            .put(WorkloadConditionOperator.LESS_EQUAl, 
TCompareOperator.LESS_EQUAL).build();
+
     public static final ImmutableSet<WorkloadActionType> FE_ACTION_SET
             = new 
ImmutableSet.Builder<WorkloadActionType>().add(WorkloadActionType.SET_SESSION_VARIABLE).build();
 
@@ -93,7 +105,39 @@ public class WorkloadSchedPolicyMgr extends MasterDaemon 
implements Writable, Gs
 
     public static final ImmutableSet<WorkloadMetricType> BE_METRIC_SET
             = new 
ImmutableSet.Builder<WorkloadMetricType>().add(WorkloadMetricType.BE_SCAN_ROWS)
-            
.add(WorkloadMetricType.BE_SCAN_BYTES).add(WorkloadMetricType.QUERY_TIME).build();
+            
.add(WorkloadMetricType.BE_SCAN_BYTES).add(WorkloadMetricType.QUERY_TIME)
+            .add(WorkloadMetricType.QUERY_BE_MEMORY_BYTES).build();
+
+    // used for convert fe type to thrift type
+    public static final ImmutableMap<WorkloadMetricType, TWorkloadMetricType> 
METRIC_MAP
+            = new ImmutableMap.Builder<WorkloadMetricType, 
TWorkloadMetricType>()
+            .put(WorkloadMetricType.QUERY_TIME, TWorkloadMetricType.QUERY_TIME)
+            .put(WorkloadMetricType.BE_SCAN_ROWS, 
TWorkloadMetricType.BE_SCAN_ROWS)
+            .put(WorkloadMetricType.BE_SCAN_BYTES, 
TWorkloadMetricType.BE_SCAN_BYTES)
+            .put(WorkloadMetricType.QUERY_BE_MEMORY_BYTES, 
TWorkloadMetricType.QUERY_BE_MEMORY_BYTES).build();
+    public static final ImmutableMap<WorkloadActionType, TWorkloadActionType> 
ACTION_MAP
+            = new ImmutableMap.Builder<WorkloadActionType, 
TWorkloadActionType>()
+            .put(WorkloadActionType.MOVE_QUERY_TO_GROUP, 
TWorkloadActionType.MOVE_QUERY_TO_GROUP)
+            .put(WorkloadActionType.CANCEL_QUERY, 
TWorkloadActionType.CANCEL_QUERY).build();
+
+    public static final Map<String, WorkloadMetricType> STRING_METRIC_MAP = 
new HashMap<>();
+    public static final Map<String, WorkloadActionType> STRING_ACTION_MAP = 
new HashMap<>();
+
+    static {
+        for (WorkloadMetricType metricType : FE_METRIC_SET) {
+            STRING_METRIC_MAP.put(metricType.toString(), metricType);
+        }
+        for (WorkloadMetricType metricType : BE_METRIC_SET) {
+            STRING_METRIC_MAP.put(metricType.toString(), metricType);
+        }
+
+        for (WorkloadActionType actionType : FE_ACTION_SET) {
+            STRING_ACTION_MAP.put(actionType.toString(), actionType);
+        }
+        for (WorkloadActionType actionType : BE_ACTION_SET) {
+            STRING_ACTION_MAP.put(actionType.toString(), actionType);
+        }
+    }
 
     private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
diff --git a/gensrc/thrift/BackendService.thrift 
b/gensrc/thrift/BackendService.thrift
index 46018fc947d..26cf411f739 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -272,6 +272,7 @@ enum TWorkloadMetricType {
     QUERY_TIME
     BE_SCAN_ROWS
     BE_SCAN_BYTES
+    QUERY_BE_MEMORY_BYTES
 }
 
 enum TCompareOperator {
diff --git 
a/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy 
b/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy
index c51a10f89b5..ca0c00fc895 100644
--- 
a/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy
+++ 
b/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy
@@ -23,6 +23,9 @@ suite("test_workload_sched_policy") {
     sql "drop workload policy if exists set_action_policy;"
     sql "drop workload policy if exists fe_policy;"
     sql "drop workload policy if exists be_policy;"
+    sql "drop workload policy if exists be_scan_row_policy;"
+    sql "drop workload policy if exists be_scan_bytes_policy;"
+    sql "drop workload policy if exists query_be_memory_used;"
 
     // 1 create cancel policy
     sql "create workload policy test_cancel_policy " +
@@ -106,16 +109,42 @@ suite("test_workload_sched_policy") {
         exception "duplicate set_session_variable action args one policy"
     }
 
+    test {
+        sql "create workload policy invalid_metric_value_policy 
conditions(query_be_memory_bytes > '-1') actions(cancel_query);"
+        exception "invalid"
+    }
+
+    test {
+        sql "create workload policy invalid_metric_value_policy 
conditions(query_time > '-1') actions(cancel_query);"
+        exception "invalid"
+    }
+
+    test {
+        sql "create workload policy invalid_metric_value_policy 
conditions(be_scan_rows > '-1') actions(cancel_query);"
+        exception "invalid"
+    }
+
+    test {
+        sql "create workload policy invalid_metric_value_policy 
conditions(be_scan_bytes > '-1') actions(cancel_query);"
+        exception "invalid"
+    }
+
+    sql "create workload policy be_scan_row_policy conditions(be_scan_rows > 
1) actions(cancel_query) properties('enabled'='false');"
+    sql "create workload policy be_scan_bytes_policy conditions(be_scan_bytes 
> 1) actions(cancel_query) properties('enabled'='false');"
+    sql "create workload policy query_be_memory_used 
conditions(query_be_memory_bytes > 1) actions(cancel_query) 
properties('enabled'='false');"
+
     // drop
     sql "drop workload policy test_cancel_policy;"
     sql "drop workload policy set_action_policy;"
     sql "drop workload policy fe_policy;"
     sql "drop workload policy be_policy;"
+    sql "drop workload policy be_scan_row_policy;"
+    sql "drop workload policy be_scan_bytes_policy;"
+    sql "drop workload policy query_be_memory_used;"
 
     qt_select_policy_tvf_after_drop "select 
name,condition,action,priority,enabled,version from 
information_schema.workload_policy where name 
in('be_policy','fe_policy','set_action_policy','test_cancel_policy') order by 
name;"
 
     // test workload policy
-    sql "ADMIN SET FRONTEND CONFIG ('workload_sched_policy_interval_ms' = 
'500');"
     sql """drop user if exists test_workload_sched_user"""
     sql """create user test_workload_sched_user identified by '12345'"""
     sql """grant ADMIN_PRIV on *.*.* to test_workload_sched_user"""
@@ -149,8 +178,6 @@ suite("test_workload_sched_policy") {
     }
     assertEquals("parallel_pipeline_task_num", result3[0][0])
     assertEquals("33", result3[0][1])
-    
-    sql "ADMIN SET FRONTEND CONFIG ('workload_sched_policy_interval_ms' = 
'10000');"
 
     sql "drop workload policy if exists test_set_var_policy;"
     sql "drop workload policy if exists test_set_var_policy2;"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to