This is an automated email from the ASF dual-hosted git repository. wangbo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new df144a36aa7 [feature](workloadgroup)Add workload condition query used memory (#35662) df144a36aa7 is described below commit df144a36aa7bf5b6f8aad9fcfdf56692c9d868f6 Author: wangbo <wan...@apache.org> AuthorDate: Tue Jun 4 10:39:10 2024 +0800 [feature](workloadgroup)Add workload condition query used memory (#35662) ## Proposed changes Add workload condition query used memory, we can kill queries based on memory usage. 1 create a policy which can kill query used memory exceeds 100M. ``` create workload policy memory_used_policy conditions(query_be_memory_bytes > 104857600) actions(cancel_query); ``` 2 submit a query. ``` mysql [hits]>insert into hits2 select * from hits; ERROR 1105 (HY000): errCode = 2, detailMessage = (10.16.10.8)[INTERNAL_ERROR]query 81c39e2b3ecf461c-bc78ad6d9b6173d2 cancelled by workload policy memory_used_policy, id:29028 ``` --- be/src/runtime/runtime_query_statistics_mgr.cpp | 2 + .../workload_management/workload_action.cpp | 11 ++++-- .../workload_management/workload_condition.cpp | 13 ++++++ .../workload_management/workload_condition.h | 17 +++++++- .../workload_management/workload_query_info.h | 2 + .../workload_management/workload_sched_policy.cpp | 2 + .../workloadschedpolicy/WorkloadActionMeta.java | 12 +++--- .../workloadschedpolicy/WorkloadCondition.java | 2 + .../WorkloadConditionBeScanBytes.java | 11 ++++-- .../WorkloadConditionBeScanRows.java | 11 ++++-- .../workloadschedpolicy/WorkloadConditionMeta.java | 14 +++---- ...ws.java => WorkloadConditionQueryBeMemory.java} | 32 ++++++++------- .../WorkloadConditionQueryTime.java | 11 ++++-- .../workloadschedpolicy/WorkloadMetricType.java | 2 +- .../workloadschedpolicy/WorkloadSchedPolicy.java | 27 ++----------- .../WorkloadSchedPolicyMgr.java | 46 +++++++++++++++++++++- gensrc/thrift/BackendService.thrift | 1 + .../test_workload_sched_policy.groovy | 33 ++++++++++++++-- 18 files changed, 177 insertions(+), 72 deletions(-) diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp b/be/src/runtime/runtime_query_statistics_mgr.cpp index 55051eff686..3e3dd3de2dd 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.cpp +++ b/be/src/runtime/runtime_query_statistics_mgr.cpp @@ -621,6 +621,8 @@ void RuntimeQueryStatiticsMgr::get_metric_map( metric_map.emplace(WorkloadMetricType::QUERY_TIME, std::to_string(query_time_ms)); metric_map.emplace(WorkloadMetricType::SCAN_ROWS, std::to_string(ret_qs.get_scan_rows())); metric_map.emplace(WorkloadMetricType::SCAN_BYTES, std::to_string(ret_qs.get_scan_bytes())); + metric_map.emplace(WorkloadMetricType::QUERY_MEMORY_BYTES, + std::to_string(ret_qs.get_current_used_memory_bytes())); } void RuntimeQueryStatiticsMgr::set_workload_group_id(std::string query_id, int64_t wg_id) { diff --git a/be/src/runtime/workload_management/workload_action.cpp b/be/src/runtime/workload_management/workload_action.cpp index 2a9ff8b03a7..8e6e3b19e2c 100644 --- a/be/src/runtime/workload_management/workload_action.cpp +++ b/be/src/runtime/workload_management/workload_action.cpp @@ -22,9 +22,14 @@ namespace doris { void WorkloadActionCancelQuery::exec(WorkloadQueryInfo* query_info) { - LOG(INFO) << "[workload_schedule]workload scheduler cancel query " << query_info->query_id; - ExecEnv::GetInstance()->fragment_mgr()->cancel_query( - query_info->tquery_id, Status::InternalError("query canceled by workload scheduler")); + std::stringstream msg; + msg << "query " << query_info->query_id + << " cancelled by workload policy: " << query_info->policy_name + << ", id:" << query_info->policy_id; + std::string msg_str = msg.str(); + LOG(INFO) << "[workload_schedule]" << msg_str; + ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_info->tquery_id, + Status::InternalError<false>(msg_str)); } void WorkloadActionMoveQuery::exec(WorkloadQueryInfo* query_info) { diff --git a/be/src/runtime/workload_management/workload_condition.cpp b/be/src/runtime/workload_management/workload_condition.cpp index dff6f2adc24..62c6072a60c 100644 --- a/be/src/runtime/workload_management/workload_condition.cpp +++ b/be/src/runtime/workload_management/workload_condition.cpp @@ -56,4 +56,17 @@ bool WorkloadConditionScanBytes::eval(std::string str_val) { return WorkloadCompareUtils::compare_signed_integer(_op, scan_bytes_args, _scan_bytes); } +// query memory +WorkloadConditionQueryMemory::WorkloadConditionQueryMemory(WorkloadCompareOperator op, + std::string str_val) { + _op = op; + _query_memory_bytes = std::stol(str_val); +} + +bool WorkloadConditionQueryMemory::eval(std::string str_val) { + int64_t query_memory_bytes = std::stol(str_val); + return WorkloadCompareUtils::compare_signed_integer(_op, query_memory_bytes, + _query_memory_bytes); +} + } // namespace doris \ No newline at end of file diff --git a/be/src/runtime/workload_management/workload_condition.h b/be/src/runtime/workload_management/workload_condition.h index 96387a2af41..a85268a8dc3 100644 --- a/be/src/runtime/workload_management/workload_condition.h +++ b/be/src/runtime/workload_management/workload_condition.h @@ -23,7 +23,7 @@ namespace doris { -enum WorkloadMetricType { QUERY_TIME, SCAN_ROWS, SCAN_BYTES }; +enum WorkloadMetricType { QUERY_TIME, SCAN_ROWS, SCAN_BYTES, QUERY_MEMORY_BYTES }; class WorkloadCondition { public: @@ -74,6 +74,19 @@ private: WorkloadCompareOperator _op; }; +class WorkloadConditionQueryMemory : public WorkloadCondition { +public: + WorkloadConditionQueryMemory(WorkloadCompareOperator op, std::string str_val); + bool eval(std::string str_val) override; + WorkloadMetricType get_workload_metric_type() override { + return WorkloadMetricType::QUERY_MEMORY_BYTES; + } + +private: + int64_t _query_memory_bytes; + WorkloadCompareOperator _op; +}; + class WorkloadConditionFactory { public: static std::unique_ptr<WorkloadCondition> create_workload_condition( @@ -88,6 +101,8 @@ public: return std::make_unique<WorkloadConditionScanRows>(op, str_val); } else if (TWorkloadMetricType::type::BE_SCAN_BYTES == metric_name) { return std::make_unique<WorkloadConditionScanBytes>(op, str_val); + } else if (TWorkloadMetricType::type::QUERY_BE_MEMORY_BYTES == metric_name) { + return std::make_unique<WorkloadConditionQueryMemory>(op, str_val); } LOG(ERROR) << "not find a metric name " << metric_name; return nullptr; diff --git a/be/src/runtime/workload_management/workload_query_info.h b/be/src/runtime/workload_management/workload_query_info.h index f2da31b6196..e544668e103 100644 --- a/be/src/runtime/workload_management/workload_query_info.h +++ b/be/src/runtime/workload_management/workload_query_info.h @@ -29,6 +29,8 @@ public: TUniqueId tquery_id; std::string query_id; int64_t wg_id; + int64_t policy_id; + std::string policy_name; }; } // namespace doris \ No newline at end of file diff --git a/be/src/runtime/workload_management/workload_sched_policy.cpp b/be/src/runtime/workload_management/workload_sched_policy.cpp index b97eb85c068..efa8965dd77 100644 --- a/be/src/runtime/workload_management/workload_sched_policy.cpp +++ b/be/src/runtime/workload_management/workload_sched_policy.cpp @@ -75,6 +75,8 @@ bool WorkloadSchedPolicy::is_match(WorkloadQueryInfo* query_info_ptr) { void WorkloadSchedPolicy::exec_action(WorkloadQueryInfo* query_info) { for (int i = 0; i < _action_list.size(); i++) { + query_info->policy_id = this->_id; + query_info->policy_name = this->_name; _action_list[i]->exec(query_info); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMeta.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMeta.java index 57f6ba37993..2ce05412844 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMeta.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMeta.java @@ -37,14 +37,12 @@ public class WorkloadActionMeta { } static WorkloadActionType getWorkloadActionType(String strType) throws UserException { - if (WorkloadActionType.CANCEL_QUERY.toString().equalsIgnoreCase(strType)) { - return WorkloadActionType.CANCEL_QUERY; - } else if (WorkloadActionType.MOVE_QUERY_TO_GROUP.toString().equalsIgnoreCase(strType)) { - return WorkloadActionType.MOVE_QUERY_TO_GROUP; - } else if (WorkloadActionType.SET_SESSION_VARIABLE.toString().equalsIgnoreCase(strType)) { - return WorkloadActionType.SET_SESSION_VARIABLE; + WorkloadActionType workloadActionType = WorkloadSchedPolicyMgr.STRING_ACTION_MAP.get(strType.toUpperCase()); + if (workloadActionType == null) { + throw new UserException("invalid action type " + strType); + } else { + return workloadActionType; } - throw new UserException("invalid action type " + strType); } public String toString() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java index 5d89d2afae9..c790a401308 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadCondition.java @@ -37,6 +37,8 @@ public interface WorkloadCondition { return WorkloadConditionBeScanRows.createWorkloadCondition(cm.op, cm.value); } else if (WorkloadMetricType.BE_SCAN_BYTES.equals(cm.metricName)) { return WorkloadConditionBeScanBytes.createWorkloadCondition(cm.op, cm.value); + } else if (WorkloadMetricType.QUERY_BE_MEMORY_BYTES.equals(cm.metricName)) { + return WorkloadConditionQueryBeMemory.createWorkloadCondition(cm.op, cm.value); } throw new UserException("invalid metric name:" + cm.metricName); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanBytes.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanBytes.java index 7431f2e0c4f..bd914baf54e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanBytes.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanBytes.java @@ -38,9 +38,14 @@ public class WorkloadConditionBeScanBytes implements WorkloadCondition { public static WorkloadConditionBeScanBytes createWorkloadCondition(WorkloadConditionOperator op, String value) throws UserException { - long longValue = Long.parseLong(value); - if (longValue < 0) { - throw new UserException("invalid scan bytes value, " + longValue + ", it requires >= 0"); + long longValue = -1; + try { + longValue = Long.parseLong(value); + if (longValue < 0) { + throw new NumberFormatException(); + } + } catch (NumberFormatException e) { + throw new UserException("invalid scan bytes value: " + value + ", it requires >= 0"); } return new WorkloadConditionBeScanBytes(op, longValue); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanRows.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanRows.java index c2fb638e082..8b99e40d04d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanRows.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanRows.java @@ -38,9 +38,14 @@ public class WorkloadConditionBeScanRows implements WorkloadCondition { public static WorkloadConditionBeScanRows createWorkloadCondition(WorkloadConditionOperator op, String value) throws UserException { - long longValue = Long.parseLong(value); - if (longValue < 0) { - throw new UserException("invalid scan rows value, " + longValue + ", it requires >= 0"); + long longValue = -1; + try { + longValue = Long.parseLong(value); + if (longValue < 0) { + throw new NumberFormatException(); + } + } catch (NumberFormatException e) { + throw new UserException("invalid scan rows value: " + value + ", it requires >= 0"); } return new WorkloadConditionBeScanRows(op, longValue); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionMeta.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionMeta.java index 52f50f924fc..81e0f6c2188 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionMeta.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionMeta.java @@ -40,16 +40,12 @@ public class WorkloadConditionMeta { } private static WorkloadMetricType getMetricType(String metricStr) throws UserException { - if (WorkloadMetricType.USERNAME.toString().equalsIgnoreCase(metricStr)) { - return WorkloadMetricType.USERNAME; - } else if (WorkloadMetricType.QUERY_TIME.toString().equalsIgnoreCase(metricStr)) { - return WorkloadMetricType.QUERY_TIME; - } else if (WorkloadMetricType.BE_SCAN_ROWS.toString().equalsIgnoreCase(metricStr)) { - return WorkloadMetricType.BE_SCAN_ROWS; - } else if (WorkloadMetricType.BE_SCAN_BYTES.toString().equalsIgnoreCase(metricStr)) { - return WorkloadMetricType.BE_SCAN_BYTES; + WorkloadMetricType metricType = WorkloadSchedPolicyMgr.STRING_METRIC_MAP.get(metricStr.toUpperCase()); + if (metricType == null) { + throw new UserException("invalid metric name:" + metricStr); + } else { + return metricType; } - throw new UserException("invalid metric name:" + metricStr); } public String toString() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanRows.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionQueryBeMemory.java similarity index 60% copy from fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanRows.java copy to fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionQueryBeMemory.java index c2fb638e082..2274b35ca51 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionBeScanRows.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionQueryBeMemory.java @@ -19,34 +19,38 @@ package org.apache.doris.resource.workloadschedpolicy; import org.apache.doris.common.UserException; -public class WorkloadConditionBeScanRows implements WorkloadCondition { +public class WorkloadConditionQueryBeMemory implements WorkloadCondition { private long value; private WorkloadConditionOperator op; - public WorkloadConditionBeScanRows(WorkloadConditionOperator op, long value) { - this.op = op; + public WorkloadConditionQueryBeMemory(WorkloadConditionOperator op, long value) { this.value = value; + this.op = op; } @Override public boolean eval(String strValue) { - // currently not support run in fe, so this condition never match return false; } - public static WorkloadConditionBeScanRows createWorkloadCondition(WorkloadConditionOperator op, String value) - throws UserException { - long longValue = Long.parseLong(value); - if (longValue < 0) { - throw new UserException("invalid scan rows value, " + longValue + ", it requires >= 0"); - } - return new WorkloadConditionBeScanRows(op, longValue); - } - @Override public WorkloadMetricType getMetricType() { - return WorkloadMetricType.BE_SCAN_ROWS; + return WorkloadMetricType.QUERY_BE_MEMORY_BYTES; + } + + public static WorkloadConditionQueryBeMemory createWorkloadCondition(WorkloadConditionOperator op, + String value) throws UserException { + long longValue = -1; + try { + longValue = Long.parseLong(value); + if (longValue < 0) { + throw new NumberFormatException(); + } + } catch (NumberFormatException e) { + throw new UserException("invalid query be memory value: " + value + ", it requires >= 0"); + } + return new WorkloadConditionQueryBeMemory(op, longValue); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionQueryTime.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionQueryTime.java index e61484508df..6c3a5c653aa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionQueryTime.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionQueryTime.java @@ -37,9 +37,14 @@ public class WorkloadConditionQueryTime implements WorkloadCondition { public static WorkloadConditionQueryTime createWorkloadCondition(WorkloadConditionOperator op, String value) throws UserException { - long longValue = Long.parseLong(value); - if (longValue < 0) { - throw new UserException("invalid query time value, " + longValue + ", it requires >= 0"); + long longValue = -1; + try { + longValue = Long.parseLong(value); + if (longValue < 0) { + throw new NumberFormatException(); + } + } catch (NumberFormatException e) { + throw new UserException("invalid query time value: " + value + ", it requires >= 0"); } return new WorkloadConditionQueryTime(op, longValue); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadMetricType.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadMetricType.java index ed17414ec45..93e612a85c2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadMetricType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadMetricType.java @@ -18,5 +18,5 @@ package org.apache.doris.resource.workloadschedpolicy; public enum WorkloadMetricType { - USERNAME, QUERY_TIME, BE_SCAN_ROWS, BE_SCAN_BYTES + USERNAME, QUERY_TIME, BE_SCAN_ROWS, BE_SCAN_BYTES, QUERY_BE_MEMORY_BYTES } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java index 55759e90972..ff27a08706b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java @@ -22,7 +22,6 @@ import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; -import org.apache.doris.thrift.TCompareOperator; import org.apache.doris.thrift.TWorkloadAction; import org.apache.doris.thrift.TWorkloadActionType; import org.apache.doris.thrift.TWorkloadCondition; @@ -31,7 +30,6 @@ import org.apache.doris.thrift.TWorkloadSchedPolicy; import org.apache.doris.thrift.TopicInfo; import com.esotericsoftware.minlog.Log; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.gson.annotations.SerializedName; @@ -51,25 +49,6 @@ public class WorkloadSchedPolicy implements Writable, GsonPostProcessable { public static final ImmutableSet<String> POLICY_PROPERTIES = new ImmutableSet.Builder<String>() .add(ENABLED).add(PRIORITY).add(WORKLOAD_GROUP).build(); - // used for convert fe type to thrift type - private static ImmutableMap<WorkloadMetricType, TWorkloadMetricType> METRIC_MAP - = new ImmutableMap.Builder<WorkloadMetricType, TWorkloadMetricType>() - .put(WorkloadMetricType.QUERY_TIME, TWorkloadMetricType.QUERY_TIME) - .put(WorkloadMetricType.BE_SCAN_ROWS, TWorkloadMetricType.BE_SCAN_ROWS) - .put(WorkloadMetricType.BE_SCAN_BYTES, TWorkloadMetricType.BE_SCAN_BYTES).build(); - private static ImmutableMap<WorkloadActionType, TWorkloadActionType> ACTION_MAP - = new ImmutableMap.Builder<WorkloadActionType, TWorkloadActionType>() - .put(WorkloadActionType.MOVE_QUERY_TO_GROUP, TWorkloadActionType.MOVE_QUERY_TO_GROUP) - .put(WorkloadActionType.CANCEL_QUERY, TWorkloadActionType.CANCEL_QUERY).build(); - - private static ImmutableMap<WorkloadConditionOperator, TCompareOperator> OP_MAP - = new ImmutableMap.Builder<WorkloadConditionOperator, TCompareOperator>() - .put(WorkloadConditionOperator.EQUAL, TCompareOperator.EQUAL) - .put(WorkloadConditionOperator.GREATER, TCompareOperator.GREATER) - .put(WorkloadConditionOperator.GREATER_EQUAL, TCompareOperator.GREATER_EQUAL) - .put(WorkloadConditionOperator.LESS, TCompareOperator.LESS) - .put(WorkloadConditionOperator.LESS_EQUAl, TCompareOperator.LESS_EQUAL).build(); - @SerializedName(value = "id") long id; @SerializedName(value = "name") @@ -255,12 +234,12 @@ public class WorkloadSchedPolicy implements Writable, GsonPostProcessable { List<TWorkloadCondition> condList = new ArrayList(); for (WorkloadConditionMeta cond : conditionMetaList) { TWorkloadCondition tCond = new TWorkloadCondition(); - TWorkloadMetricType metricType = METRIC_MAP.get(cond.metricName); + TWorkloadMetricType metricType = WorkloadSchedPolicyMgr.METRIC_MAP.get(cond.metricName); if (metricType == null) { return null; } tCond.setMetricName(metricType); - tCond.setOp(OP_MAP.get(cond.op)); + tCond.setOp(WorkloadSchedPolicyMgr.OP_MAP.get(cond.op)); tCond.setValue(cond.value); condList.add(tCond); } @@ -268,7 +247,7 @@ public class WorkloadSchedPolicy implements Writable, GsonPostProcessable { List<TWorkloadAction> actionList = new ArrayList(); for (WorkloadActionMeta action : actionMetaList) { TWorkloadAction tAction = new TWorkloadAction(); - TWorkloadActionType tActionType = ACTION_MAP.get(action.action); + TWorkloadActionType tActionType = WorkloadSchedPolicyMgr.ACTION_MAP.get(action.action); if (tActionType == null) { return null; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java index 4aa7563f8d7..3879dd83b9a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java @@ -35,11 +35,15 @@ import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ConnectContext; import org.apache.doris.service.ExecuteEnv; +import org.apache.doris.thrift.TCompareOperator; import org.apache.doris.thrift.TUserIdentity; +import org.apache.doris.thrift.TWorkloadActionType; +import org.apache.doris.thrift.TWorkloadMetricType; import org.apache.doris.thrift.TopicInfo; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; @@ -80,6 +84,14 @@ public class WorkloadSchedPolicyMgr extends MasterDaemon implements Writable, Gs .add("WorkloadGroup") .build(); + public static final ImmutableMap<WorkloadConditionOperator, TCompareOperator> OP_MAP + = new ImmutableMap.Builder<WorkloadConditionOperator, TCompareOperator>() + .put(WorkloadConditionOperator.EQUAL, TCompareOperator.EQUAL) + .put(WorkloadConditionOperator.GREATER, TCompareOperator.GREATER) + .put(WorkloadConditionOperator.GREATER_EQUAL, TCompareOperator.GREATER_EQUAL) + .put(WorkloadConditionOperator.LESS, TCompareOperator.LESS) + .put(WorkloadConditionOperator.LESS_EQUAl, TCompareOperator.LESS_EQUAL).build(); + public static final ImmutableSet<WorkloadActionType> FE_ACTION_SET = new ImmutableSet.Builder<WorkloadActionType>().add(WorkloadActionType.SET_SESSION_VARIABLE).build(); @@ -93,7 +105,39 @@ public class WorkloadSchedPolicyMgr extends MasterDaemon implements Writable, Gs public static final ImmutableSet<WorkloadMetricType> BE_METRIC_SET = new ImmutableSet.Builder<WorkloadMetricType>().add(WorkloadMetricType.BE_SCAN_ROWS) - .add(WorkloadMetricType.BE_SCAN_BYTES).add(WorkloadMetricType.QUERY_TIME).build(); + .add(WorkloadMetricType.BE_SCAN_BYTES).add(WorkloadMetricType.QUERY_TIME) + .add(WorkloadMetricType.QUERY_BE_MEMORY_BYTES).build(); + + // used for convert fe type to thrift type + public static final ImmutableMap<WorkloadMetricType, TWorkloadMetricType> METRIC_MAP + = new ImmutableMap.Builder<WorkloadMetricType, TWorkloadMetricType>() + .put(WorkloadMetricType.QUERY_TIME, TWorkloadMetricType.QUERY_TIME) + .put(WorkloadMetricType.BE_SCAN_ROWS, TWorkloadMetricType.BE_SCAN_ROWS) + .put(WorkloadMetricType.BE_SCAN_BYTES, TWorkloadMetricType.BE_SCAN_BYTES) + .put(WorkloadMetricType.QUERY_BE_MEMORY_BYTES, TWorkloadMetricType.QUERY_BE_MEMORY_BYTES).build(); + public static final ImmutableMap<WorkloadActionType, TWorkloadActionType> ACTION_MAP + = new ImmutableMap.Builder<WorkloadActionType, TWorkloadActionType>() + .put(WorkloadActionType.MOVE_QUERY_TO_GROUP, TWorkloadActionType.MOVE_QUERY_TO_GROUP) + .put(WorkloadActionType.CANCEL_QUERY, TWorkloadActionType.CANCEL_QUERY).build(); + + public static final Map<String, WorkloadMetricType> STRING_METRIC_MAP = new HashMap<>(); + public static final Map<String, WorkloadActionType> STRING_ACTION_MAP = new HashMap<>(); + + static { + for (WorkloadMetricType metricType : FE_METRIC_SET) { + STRING_METRIC_MAP.put(metricType.toString(), metricType); + } + for (WorkloadMetricType metricType : BE_METRIC_SET) { + STRING_METRIC_MAP.put(metricType.toString(), metricType); + } + + for (WorkloadActionType actionType : FE_ACTION_SET) { + STRING_ACTION_MAP.put(actionType.toString(), actionType); + } + for (WorkloadActionType actionType : BE_ACTION_SET) { + STRING_ACTION_MAP.put(actionType.toString(), actionType); + } + } private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index 46018fc947d..26cf411f739 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -272,6 +272,7 @@ enum TWorkloadMetricType { QUERY_TIME BE_SCAN_ROWS BE_SCAN_BYTES + QUERY_BE_MEMORY_BYTES } enum TCompareOperator { diff --git a/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy b/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy index c51a10f89b5..ca0c00fc895 100644 --- a/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy +++ b/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy @@ -23,6 +23,9 @@ suite("test_workload_sched_policy") { sql "drop workload policy if exists set_action_policy;" sql "drop workload policy if exists fe_policy;" sql "drop workload policy if exists be_policy;" + sql "drop workload policy if exists be_scan_row_policy;" + sql "drop workload policy if exists be_scan_bytes_policy;" + sql "drop workload policy if exists query_be_memory_used;" // 1 create cancel policy sql "create workload policy test_cancel_policy " + @@ -106,16 +109,42 @@ suite("test_workload_sched_policy") { exception "duplicate set_session_variable action args one policy" } + test { + sql "create workload policy invalid_metric_value_policy conditions(query_be_memory_bytes > '-1') actions(cancel_query);" + exception "invalid" + } + + test { + sql "create workload policy invalid_metric_value_policy conditions(query_time > '-1') actions(cancel_query);" + exception "invalid" + } + + test { + sql "create workload policy invalid_metric_value_policy conditions(be_scan_rows > '-1') actions(cancel_query);" + exception "invalid" + } + + test { + sql "create workload policy invalid_metric_value_policy conditions(be_scan_bytes > '-1') actions(cancel_query);" + exception "invalid" + } + + sql "create workload policy be_scan_row_policy conditions(be_scan_rows > 1) actions(cancel_query) properties('enabled'='false');" + sql "create workload policy be_scan_bytes_policy conditions(be_scan_bytes > 1) actions(cancel_query) properties('enabled'='false');" + sql "create workload policy query_be_memory_used conditions(query_be_memory_bytes > 1) actions(cancel_query) properties('enabled'='false');" + // drop sql "drop workload policy test_cancel_policy;" sql "drop workload policy set_action_policy;" sql "drop workload policy fe_policy;" sql "drop workload policy be_policy;" + sql "drop workload policy be_scan_row_policy;" + sql "drop workload policy be_scan_bytes_policy;" + sql "drop workload policy query_be_memory_used;" qt_select_policy_tvf_after_drop "select name,condition,action,priority,enabled,version from information_schema.workload_policy where name in('be_policy','fe_policy','set_action_policy','test_cancel_policy') order by name;" // test workload policy - sql "ADMIN SET FRONTEND CONFIG ('workload_sched_policy_interval_ms' = '500');" sql """drop user if exists test_workload_sched_user""" sql """create user test_workload_sched_user identified by '12345'""" sql """grant ADMIN_PRIV on *.*.* to test_workload_sched_user""" @@ -149,8 +178,6 @@ suite("test_workload_sched_policy") { } assertEquals("parallel_pipeline_task_num", result3[0][0]) assertEquals("33", result3[0][1]) - - sql "ADMIN SET FRONTEND CONFIG ('workload_sched_policy_interval_ms' = '10000');" sql "drop workload policy if exists test_set_var_policy;" sql "drop workload policy if exists test_set_var_policy2;" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org