This is an automated email from the ASF dual-hosted git repository.

wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 012e66729a5 [improvement](executor) Add tvf and regression test for 
Workload Scheduler (#28733)
012e66729a5 is described below

commit 012e66729a5cd465c6c4b2cb34ed00e5c86aebf0
Author: wangbo <wan...@apache.org>
AuthorDate: Fri Dec 22 12:09:51 2023 +0800

    [improvement](executor) Add tvf and regression test for Workload Scheduler 
(#28733)
    
    1 Add select workload schedule policy tvf
    2 Add reg test
---
 be/src/vec/exec/scan/vmeta_scanner.cpp             |  20 +++
 be/src/vec/exec/scan/vmeta_scanner.h               |   2 +
 .../analysis/CreateWorkloadSchedPolicyStmt.java    |  34 ++++
 .../resource/workloadgroup/WorkloadGroupMgr.java   |  13 ++
 .../workloadschedpolicy/WorkloadActionMeta.java    |  16 ++
 .../WorkloadConditionCompareUtils.java             |  18 ++
 .../workloadschedpolicy/WorkloadConditionMeta.java |   2 +-
 .../workloadschedpolicy/WorkloadQueryInfo.java     |   2 +-
 .../workloadschedpolicy/WorkloadSchedPolicy.java   |  11 +-
 .../WorkloadSchedPolicyMgr.java                    |  67 +++----
 .../doris/tablefunction/MetadataGenerator.java     |  30 ++++
 .../tablefunction/MetadataTableValuedFunction.java |   2 +
 .../doris/tablefunction/TableValuedFunctionIf.java |   2 +
 .../WorkloadSchedPolicyTableValuedFunction.java    |  88 +++++++++
 .../apache/doris/resource/WorkloadSchedTest.java   | 197 +++++++++++++++++++++
 gensrc/thrift/Types.thrift                         |   1 +
 .../test_workload_sched_policy.out                 |   9 +
 .../test_workload_sched_policy.groovy              | 168 ++++++++++++++++++
 18 files changed, 637 insertions(+), 45 deletions(-)

diff --git a/be/src/vec/exec/scan/vmeta_scanner.cpp 
b/be/src/vec/exec/scan/vmeta_scanner.cpp
index 1f4dcd8593d..22545fa4dce 100644
--- a/be/src/vec/exec/scan/vmeta_scanner.cpp
+++ b/be/src/vec/exec/scan/vmeta_scanner.cpp
@@ -238,6 +238,9 @@ Status VMetaScanner::_fetch_metadata(const TMetaScanRange& 
meta_scan_range) {
     case TMetadataType::WORKLOAD_GROUPS:
         
RETURN_IF_ERROR(_build_workload_groups_metadata_request(meta_scan_range, 
&request));
         break;
+    case TMetadataType::WORKLOAD_SCHED_POLICY:
+        
RETURN_IF_ERROR(_build_workload_sched_policy_metadata_request(meta_scan_range, 
&request));
+        break;
     case TMetadataType::CATALOGS:
         RETURN_IF_ERROR(_build_catalogs_metadata_request(meta_scan_range, 
&request));
         break;
@@ -379,6 +382,23 @@ Status 
VMetaScanner::_build_workload_groups_metadata_request(
     return Status::OK();
 }
 
+Status VMetaScanner::_build_workload_sched_policy_metadata_request(
+        const TMetaScanRange& meta_scan_range, TFetchSchemaTableDataRequest* 
request) {
+    VLOG_CRITICAL << 
"VMetaScanner::_build_workload_sched_policy_metadata_request";
+
+    // create request
+    request->__set_cluster_name("");
+    request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE);
+
+    // create TMetadataTableRequestParams
+    TMetadataTableRequestParams metadata_table_params;
+    
metadata_table_params.__set_metadata_type(TMetadataType::WORKLOAD_SCHED_POLICY);
+    metadata_table_params.__set_current_user_ident(_user_identity);
+
+    request->__set_metada_table_params(metadata_table_params);
+    return Status::OK();
+}
+
 Status VMetaScanner::_build_catalogs_metadata_request(const TMetaScanRange& 
meta_scan_range,
                                                       
TFetchSchemaTableDataRequest* request) {
     VLOG_CRITICAL << "VMetaScanner::_build_catalogs_metadata_request";
diff --git a/be/src/vec/exec/scan/vmeta_scanner.h 
b/be/src/vec/exec/scan/vmeta_scanner.h
index 7c4a1f2b2de..59bd55dc2d8 100644
--- a/be/src/vec/exec/scan/vmeta_scanner.h
+++ b/be/src/vec/exec/scan/vmeta_scanner.h
@@ -79,6 +79,8 @@ private:
                                                    
TFetchSchemaTableDataRequest* request);
     Status _build_workload_groups_metadata_request(const TMetaScanRange& 
meta_scan_range,
                                                    
TFetchSchemaTableDataRequest* request);
+    Status _build_workload_sched_policy_metadata_request(const TMetaScanRange& 
meta_scan_range,
+                                                         
TFetchSchemaTableDataRequest* request);
     Status _build_catalogs_metadata_request(const TMetaScanRange& 
meta_scan_range,
                                             TFetchSchemaTableDataRequest* 
request);
     Status _build_materialized_views_metadata_request(const TMetaScanRange& 
meta_scan_range,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadSchedPolicyStmt.java
 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadSchedPolicyStmt.java
index ee82b57822f..001068476d4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadSchedPolicyStmt.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadSchedPolicyStmt.java
@@ -88,4 +88,38 @@ public class CreateWorkloadSchedPolicyStmt extends DdlStmt {
     public Map<String, String> getProperties() {
         return properties;
     }
+
+    @Override
+    public String toSql() {
+        String str = "";
+        str = str + "CREAYE ";
+        str = str + "WORKLOAD SCHEDULE POLICY " + policyName + " ";
+
+        str = str + " CONDITIONS( ";
+        if (conditions != null) {
+            for (WorkloadConditionMeta wcm : conditions) {
+                str += wcm.toString() + ",";
+            }
+        }
+        str = str.substring(0, str.length() - 1);
+        str = str + ")";
+
+        str = str + " ACTIONS( ";
+        if (actions != null) {
+            for (WorkloadActionMeta wam : actions) {
+                str = str + wam.toString() + ",";
+            }
+        }
+        str = str.substring(0, str.length() - 1);
+        str = str + ")";
+
+        str = str + " PROPERTIES(";
+        for (Map.Entry<String, String> entry : properties.entrySet()) {
+            str = str + "\"" + entry.getKey() + "\"" + "=" + "\"" + 
entry.getValue() + "\",";
+        }
+        str = str.substring(0, str.length() - 1);
+        str = str + ")";
+
+        return str;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
index 2285db603b0..c8b49a5ebaa 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
@@ -445,6 +445,19 @@ public class WorkloadGroupMgr implements Writable, 
GsonPostProcessable {
         }
     }
 
+    public String getWorkloadGroupNameById(Long id) {
+        readLock();
+        try {
+            WorkloadGroup wg = idToWorkloadGroup.get(id);
+            if (wg == null) {
+                return null;
+            }
+            return wg.getName();
+        } finally {
+            readUnlock();
+        }
+    }
+
     // for ut
     public Map<String, WorkloadGroup> getNameToWorkloadGroup() {
         return nameToWorkloadGroup;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMeta.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMeta.java
index 776c0bccfdc..57f6ba37993 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMeta.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionMeta.java
@@ -17,9 +17,11 @@
 
 package org.apache.doris.resource.workloadschedpolicy;
 
+import org.apache.doris.catalog.Env;
 import org.apache.doris.common.UserException;
 
 import com.google.gson.annotations.SerializedName;
+import org.apache.commons.lang3.StringUtils;
 
 public class WorkloadActionMeta {
 
@@ -44,4 +46,18 @@ public class WorkloadActionMeta {
         }
         throw new UserException("invalid action type " + strType);
     }
+
+    public String toString() {
+        if (StringUtils.isEmpty(actionArgs)) {
+            return action.toString();
+        } else {
+            String retActionArgs = actionArgs;
+            if (WorkloadActionType.MOVE_QUERY_TO_GROUP.equals(action)) {
+                retActionArgs = Env.getCurrentEnv().getWorkloadGroupMgr()
+                        .getWorkloadGroupNameById(Long.valueOf(actionArgs));
+            }
+            retActionArgs = retActionArgs == null ? "-1" : retActionArgs;
+            return action + " \"" + retActionArgs + "\"";
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionCompareUtils.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionCompareUtils.java
index 8aa53a6f340..ac4c51acdff 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionCompareUtils.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionCompareUtils.java
@@ -39,6 +39,24 @@ public class WorkloadConditionCompareUtils {
         }
     }
 
+    // used for select tvf
+    static String getOperatorStr(WorkloadConditionOperator op) {
+        switch (op) {
+            case EQUAL:
+                return "=";
+            case GREATER:
+                return ">";
+            case GREATER_EQUAL:
+                return ">=";
+            case LESS:
+                return "<";
+            case LESS_EQUAl:
+                return "<=";
+            default:
+                throw new RuntimeException("unexpected compare operator " + 
op);
+        }
+    }
+
     static boolean compareInteger(WorkloadConditionOperator operator, long 
firstArgs, long secondArgs) {
         switch (operator) {
             case EQUAL:
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionMeta.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionMeta.java
index c6bfb526b9b..d5d2f922f3f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionMeta.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadConditionMeta.java
@@ -49,6 +49,6 @@ public class WorkloadConditionMeta {
     }
 
     public String toString() {
-        return metricName + " " + op + " " + value;
+        return metricName + " " + 
WorkloadConditionCompareUtils.getOperatorStr(op) + " " + value;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadQueryInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadQueryInfo.java
index 27d821c32c0..b6a98633c58 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadQueryInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadQueryInfo.java
@@ -26,5 +26,5 @@ public class WorkloadQueryInfo {
     String queryId = null;
     TUniqueId tUniqueId = null;
     ConnectContext context = null;
-    Map<WorkloadMetricType, String> metricMap;
+    public Map<WorkloadMetricType, String> metricMap;
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java
index 7186d4409a5..827c2367133 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicy.java
@@ -62,6 +62,15 @@ public class WorkloadSchedPolicy implements Writable, 
GsonPostProcessable {
     private List<WorkloadCondition> workloadConditionList;
     private List<WorkloadAction> workloadActionList;
 
+    // for ut
+    public WorkloadSchedPolicy() {
+    }
+
+    // for ut
+    public void setWorkloadConditionList(List<WorkloadCondition> 
workloadConditionList) {
+        this.workloadConditionList = workloadConditionList;
+    }
+
     public WorkloadSchedPolicy(long id, String name, List<WorkloadCondition> 
workloadConditionList,
             List<WorkloadAction> workloadActionList, Map<String, String> 
properties) throws UserException {
         this.id = id;
@@ -77,7 +86,7 @@ public class WorkloadSchedPolicy implements Writable, 
GsonPostProcessable {
     // return false,
     //    1 metric not match
     //    2 condition value not match query info's value
-    boolean isMatch(WorkloadQueryInfo queryInfo) {
+    public boolean isMatch(WorkloadQueryInfo queryInfo) {
         for (WorkloadCondition condition : workloadConditionList) {
             WorkloadMetricType metricType = condition.getMetricType();
             String value = queryInfo.metricMap.get(metricType);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
index 9e2e4cd91af..346e34796c7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
@@ -34,12 +34,12 @@ import org.apache.doris.persist.gson.GsonPostProcessable;
 import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.service.ExecuteEnv;
+import org.apache.doris.thrift.TUserIdentity;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 import com.google.gson.annotations.SerializedName;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -69,7 +69,7 @@ public class WorkloadSchedPolicyMgr implements Writable, 
GsonPostProcessable {
 
     public static final ImmutableList<String> 
WORKLOAD_SCHED_POLICY_NODE_TITLE_NAMES
             = new ImmutableList.Builder<String>()
-            .add("Id").add("Name").add("ItemName").add("ItemValue")
+            
.add("Id").add("Name").add("Condition").add("Action").add("Priority").add("Enabled").add("Version")
             .build();
 
     private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -349,7 +349,8 @@ public class WorkloadSchedPolicyMgr implements Writable, 
GsonPostProcessable {
                     throw new UserException("policy's priority can only 
between 0 ~ 100");
                 }
             } catch (NumberFormatException e) {
-                throw new UserException("policy's priority must be a number, 
input value=" + priorityStr);
+                throw new UserException(
+                        "invalid priority property value, it must be a number, 
input value=" + priorityStr);
             }
         }
     }
@@ -448,6 +449,11 @@ public class WorkloadSchedPolicyMgr implements Writable, 
GsonPostProcessable {
         return policyProcNode.fetchResult(currentUserIdentity).getRows();
     }
 
+    public List<List<String>> getWorkloadSchedPolicyTvfInfo(TUserIdentity 
tcurrentUserIdentity) {
+        UserIdentity currentUserIdentity = 
UserIdentity.fromThrift(tcurrentUserIdentity);
+        return policyProcNode.fetchResult(currentUserIdentity).getRows();
+    }
+
     public class PolicyProcNode {
         public ProcResult fetchResult(UserIdentity currentUserIdentity) {
             BaseProcResult result = new BaseProcResult();
@@ -460,54 +466,31 @@ public class WorkloadSchedPolicyMgr implements Writable, 
GsonPostProcessable {
                         continue;
                     }
 
-                    String pId = String.valueOf(policy.getId());
+                    List<String> row = new ArrayList<>();
                     String pName = policy.getName();
+                    row.add(String.valueOf(policy.getId()));
+                    row.add(pName);
 
                     List<WorkloadConditionMeta> conditionList = 
policy.getConditionMetaList();
+                    StringBuilder cmStr = new StringBuilder();
                     for (WorkloadConditionMeta cm : conditionList) {
-                        List<String> condRow = new ArrayList<>();
-                        condRow.add(pId);
-                        condRow.add(pName);
-                        condRow.add("condition");
-                        condRow.add(cm.toString());
-                        result.addRow(condRow);
+                        cmStr.append(cm.toString()).append(";");
                     }
+                    String retStr = cmStr.toString().toLowerCase();
+                    row.add(retStr.substring(0, retStr.length() - 1));
 
                     List<WorkloadActionMeta> actionList = 
policy.getActionMetaList();
-                    for (WorkloadActionMeta workloadActionMeta : actionList) {
-                        List<String> actionRow = new ArrayList<>();
-                        actionRow.add(pId);
-                        actionRow.add(pName);
-                        actionRow.add("action");
-                        if 
(StringUtils.isEmpty(workloadActionMeta.actionArgs)) {
-                            
actionRow.add(workloadActionMeta.action.toString());
-                        } else {
-                            actionRow.add(workloadActionMeta.action + " " + 
workloadActionMeta.actionArgs);
-                        }
-                        result.addRow(actionRow);
+                    StringBuilder actionStr = new StringBuilder();
+                    for (WorkloadActionMeta am : actionList) {
+                        actionStr.append(am.toString()).append(";");
                     }
+                    String retStr2 = actionStr.toString().toLowerCase();
+                    row.add(retStr2.substring(0, retStr2.length() - 1));
 
-                    List<String> prioRow = new ArrayList<>();
-                    prioRow.add(pId);
-                    prioRow.add(pName);
-                    prioRow.add("priority");
-                    prioRow.add(String.valueOf(policy.getPriority()));
-                    result.addRow(prioRow);
-
-                    List<String> enabledRow = new ArrayList<>();
-                    enabledRow.add(pId);
-                    enabledRow.add(pName);
-                    enabledRow.add("enabled");
-                    enabledRow.add(String.valueOf(policy.isEnabled()));
-                    result.addRow(enabledRow);
-
-
-                    List<String> versionRow = new ArrayList<>();
-                    versionRow.add(pId);
-                    versionRow.add(pName);
-                    versionRow.add("version");
-                    versionRow.add(String.valueOf(policy.getVersion()));
-                    result.addRow(versionRow);
+                    row.add(String.valueOf(policy.getPriority()));
+                    row.add(String.valueOf(policy.isEnabled()));
+                    row.add(String.valueOf(policy.getVersion()));
+                    result.addRow(row);
                 }
             } finally {
                 readUnlock();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
index e8620b105b6..9c773b37dca 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
@@ -116,6 +116,9 @@ public class MetadataGenerator {
             case QUERIES:
                 result = queriesMetadataResult(params, request);
                 break;
+            case WORKLOAD_SCHED_POLICY:
+                result = workloadSchedPolicyMetadataResult(params);
+                break;
             default:
                 return errorResult("Metadata table params is not set.");
         }
@@ -383,6 +386,33 @@ public class MetadataGenerator {
         return result;
     }
 
+    private static TFetchSchemaTableDataResult 
workloadSchedPolicyMetadataResult(TMetadataTableRequestParams params) {
+        if (!params.isSetCurrentUserIdent()) {
+            return errorResult("current user ident is not set.");
+        }
+
+        TUserIdentity tcurrentUserIdentity = params.getCurrentUserIdent();
+        List<List<String>> workloadPolicyList = 
Env.getCurrentEnv().getWorkloadSchedPolicyMgr()
+                .getWorkloadSchedPolicyTvfInfo(tcurrentUserIdentity);
+        TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
+        List<TRow> dataBatch = Lists.newArrayList();
+        for (List<String> policyRow : workloadPolicyList) {
+            TRow trow = new TRow();
+            trow.addToColumnValue(new 
TCell().setLongVal(Long.valueOf(policyRow.get(0))));    // id
+            trow.addToColumnValue(new TCell().setStringVal(policyRow.get(1))); 
               // name
+            trow.addToColumnValue(new TCell().setStringVal(policyRow.get(2))); 
               // condition
+            trow.addToColumnValue(new TCell().setStringVal(policyRow.get(3))); 
               // action
+            trow.addToColumnValue(new 
TCell().setIntVal(Integer.valueOf(policyRow.get(4))));  // priority
+            trow.addToColumnValue(new 
TCell().setBoolVal(Boolean.valueOf(policyRow.get(5)))); // enabled
+            trow.addToColumnValue(new 
TCell().setIntVal(Integer.valueOf(policyRow.get(6)))); // version
+            dataBatch.add(trow);
+        }
+
+        result.setDataBatch(dataBatch);
+        result.setStatus(new TStatus(TStatusCode.OK));
+        return result;
+    }
+
     private static TFetchSchemaTableDataResult 
queriesMetadataResult(TMetadataTableRequestParams params,
                                                                      
TFetchSchemaTableDataRequest parentRequest) {
         if (!params.isSetQueriesMetadataParams()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
index d2c3278314e..53a0b7ee5b8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
@@ -51,6 +51,8 @@ public abstract class MetadataTableValuedFunction extends 
TableValuedFunctionIf
                 return 
TasksTableValuedFunction.getColumnIndexFromColumnName(columnName, params);
             case QUERIES:
                 return 
QueriesTableValuedFunction.getColumnIndexFromColumnName(columnName);
+            case WORKLOAD_SCHED_POLICY:
+                return 
WorkloadSchedPolicyTableValuedFunction.getColumnIndexFromColumnName(columnName);
             default:
                 throw new AnalysisException("Unknown Metadata 
TableValuedFunction type");
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
index b14a09769cb..c9547c91bd2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
@@ -76,6 +76,8 @@ public abstract class TableValuedFunctionIf {
                 return new GroupCommitTableValuedFunction(params);
             case QueriesTableValuedFunction.NAME:
                 return new QueriesTableValuedFunction(params);
+            case WorkloadSchedPolicyTableValuedFunction.NAME:
+                return new WorkloadSchedPolicyTableValuedFunction(params);
             default:
                 throw new AnalysisException("Could not find table function " + 
funcName);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/WorkloadSchedPolicyTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/WorkloadSchedPolicyTableValuedFunction.java
new file mode 100644
index 00000000000..b4795b21058
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/WorkloadSchedPolicyTableValuedFunction.java
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tablefunction;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.thrift.TMetaScanRange;
+import org.apache.doris.thrift.TMetadataType;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import java.util.List;
+import java.util.Map;
+
+public class WorkloadSchedPolicyTableValuedFunction extends 
MetadataTableValuedFunction {
+
+    public static final String NAME = "workload_schedule_policy";
+
+    private static final ImmutableList<Column> SCHEMA = ImmutableList.of(
+            new Column("Id", ScalarType.createType(PrimitiveType.BIGINT)),
+            new Column("Name", ScalarType.createStringType()),
+            new Column("Condition", 
ScalarType.createType(PrimitiveType.STRING)),
+            new Column("Action", ScalarType.createType(PrimitiveType.STRING)),
+            new Column("Priority", ScalarType.createType(PrimitiveType.INT)),
+            new Column("Enabled", 
ScalarType.createType(PrimitiveType.BOOLEAN)),
+            new Column("Version", ScalarType.createType(PrimitiveType.INT)));
+
+    private static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
+
+    static {
+        ImmutableMap.Builder<String, Integer> builder = new 
ImmutableMap.Builder();
+        for (int i = 0; i < SCHEMA.size(); i++) {
+            builder.put(SCHEMA.get(i).getName().toLowerCase(), i);
+        }
+        COLUMN_TO_INDEX = builder.build();
+    }
+
+    public static Integer getColumnIndexFromColumnName(String columnName) {
+        return COLUMN_TO_INDEX.get(columnName.toLowerCase());
+    }
+
+    public WorkloadSchedPolicyTableValuedFunction(Map<String, String> params) {
+        if (params.size() > 0) {
+            throw new org.apache.doris.nereids.exceptions.AnalysisException(
+                    "workload schedule policy table-valued-function does not 
support any params");
+        }
+    }
+
+    @Override
+    public TMetadataType getMetadataType() {
+        return TMetadataType.WORKLOAD_SCHED_POLICY;
+    }
+
+    @Override
+    public TMetaScanRange getMetaScanRange() {
+        TMetaScanRange metaScanRange = new TMetaScanRange();
+        metaScanRange.setMetadataType(TMetadataType.WORKLOAD_SCHED_POLICY);
+        return metaScanRange;
+    }
+
+    @Override
+    public String getTableName() {
+        return "WorkloadSchedPolicyTableValuedFunction";
+    }
+
+    @Override
+    public List<Column> getTableColumns() throws AnalysisException {
+        return SCHEMA;
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/resource/WorkloadSchedTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/resource/WorkloadSchedTest.java
new file mode 100644
index 00000000000..11c00eca234
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/resource/WorkloadSchedTest.java
@@ -0,0 +1,197 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.resource;
+
+import org.apache.doris.resource.workloadschedpolicy.WorkloadCondition;
+import org.apache.doris.resource.workloadschedpolicy.WorkloadConditionOperator;
+import 
org.apache.doris.resource.workloadschedpolicy.WorkloadConditionQueryTime;
+import org.apache.doris.resource.workloadschedpolicy.WorkloadConditionUsername;
+import org.apache.doris.resource.workloadschedpolicy.WorkloadMetricType;
+import org.apache.doris.resource.workloadschedpolicy.WorkloadQueryInfo;
+import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicy;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+public class WorkloadSchedTest {
+
+    @Test
+    public void testPolicyCondition() {
+            // 1 test compare operator
+            // 1.1 >
+            {
+                List<WorkloadCondition> operatorList = new ArrayList<>();
+                WorkloadCondition intCondition = new 
WorkloadConditionQueryTime(WorkloadConditionOperator.GREATER, 100);
+                operatorList.add(intCondition);
+
+                WorkloadSchedPolicy workloadSchedPolicy1 = new 
WorkloadSchedPolicy();
+                workloadSchedPolicy1.setWorkloadConditionList(operatorList);
+
+                WorkloadQueryInfo queryInfo = new WorkloadQueryInfo();
+                queryInfo.metricMap = new HashMap<>();
+                queryInfo.metricMap.put(WorkloadMetricType.QUERY_TIME, "101");
+
+                // match
+                Assert.assertTrue(workloadSchedPolicy1.isMatch(queryInfo));
+
+                // not match
+                queryInfo.metricMap.put(WorkloadMetricType.QUERY_TIME, "100");
+                Assert.assertFalse(workloadSchedPolicy1.isMatch(queryInfo));
+            }
+
+            // 1.2 >=
+            {
+                List<WorkloadCondition> operatorList = new ArrayList<>();
+                WorkloadCondition intCondition = new 
WorkloadConditionQueryTime(WorkloadConditionOperator.GREATER_EQUAL, 100);
+                operatorList.add(intCondition);
+
+                WorkloadSchedPolicy workloadSchedPolicy1 = new 
WorkloadSchedPolicy();
+                workloadSchedPolicy1.setWorkloadConditionList(operatorList);
+
+                WorkloadQueryInfo queryInfo = new WorkloadQueryInfo();
+                queryInfo.metricMap = new HashMap<>();
+                queryInfo.metricMap.put(WorkloadMetricType.QUERY_TIME, "100");
+
+                // match
+                Assert.assertTrue(workloadSchedPolicy1.isMatch(queryInfo));
+
+                // not match
+                queryInfo.metricMap.put(WorkloadMetricType.QUERY_TIME, "10");
+                Assert.assertFalse(workloadSchedPolicy1.isMatch(queryInfo));
+            }
+
+            // 1.3 =
+            {
+                List<WorkloadCondition> operatorList = new ArrayList<>();
+                WorkloadCondition intCondition = new 
WorkloadConditionQueryTime(WorkloadConditionOperator.EQUAL, 100);
+                operatorList.add(intCondition);
+
+                WorkloadSchedPolicy workloadSchedPolicy1 = new 
WorkloadSchedPolicy();
+                workloadSchedPolicy1.setWorkloadConditionList(operatorList);
+
+                WorkloadQueryInfo queryInfo = new WorkloadQueryInfo();
+                queryInfo.metricMap = new HashMap<>();
+                queryInfo.metricMap.put(WorkloadMetricType.QUERY_TIME, "100");
+
+                // match
+                Assert.assertTrue(workloadSchedPolicy1.isMatch(queryInfo));
+
+                // not match
+                queryInfo.metricMap.put(WorkloadMetricType.QUERY_TIME, "10");
+                Assert.assertFalse(workloadSchedPolicy1.isMatch(queryInfo));
+            }
+
+            // 1.4 <
+            {
+                List<WorkloadCondition> operatorList = new ArrayList<>();
+                WorkloadCondition intCondition = new 
WorkloadConditionQueryTime(WorkloadConditionOperator.LESS, 100);
+                operatorList.add(intCondition);
+
+                WorkloadSchedPolicy workloadSchedPolicy1 = new 
WorkloadSchedPolicy();
+                workloadSchedPolicy1.setWorkloadConditionList(operatorList);
+
+                WorkloadQueryInfo queryInfo = new WorkloadQueryInfo();
+                queryInfo.metricMap = new HashMap<>();
+                queryInfo.metricMap.put(WorkloadMetricType.QUERY_TIME, "99");
+
+                // match
+                Assert.assertTrue(workloadSchedPolicy1.isMatch(queryInfo));
+
+                // not match
+                queryInfo.metricMap.put(WorkloadMetricType.QUERY_TIME, "100");
+                Assert.assertFalse(workloadSchedPolicy1.isMatch(queryInfo));
+            }
+
+            // 1.5 <=
+            {
+                List<WorkloadCondition> operatorList = new ArrayList<>();
+                WorkloadCondition intCondition = new 
WorkloadConditionQueryTime(WorkloadConditionOperator.LESS_EQUAl, 100);
+                operatorList.add(intCondition);
+
+                WorkloadSchedPolicy workloadSchedPolicy1 = new 
WorkloadSchedPolicy();
+                workloadSchedPolicy1.setWorkloadConditionList(operatorList);
+
+                WorkloadQueryInfo queryInfo = new WorkloadQueryInfo();
+                queryInfo.metricMap = new HashMap<>();
+                queryInfo.metricMap.put(WorkloadMetricType.QUERY_TIME, "100");
+
+                // match
+                Assert.assertTrue(workloadSchedPolicy1.isMatch(queryInfo));
+
+                // not match
+                queryInfo.metricMap.put(WorkloadMetricType.QUERY_TIME, "101");
+                Assert.assertFalse(workloadSchedPolicy1.isMatch(queryInfo));
+            }
+
+            // 2 string compare
+            {
+                List<WorkloadCondition> operatorList = new ArrayList<>();
+                WorkloadCondition strCondition = new 
WorkloadConditionUsername(WorkloadConditionOperator.EQUAL, "root");
+                operatorList.add(strCondition);
+
+                WorkloadSchedPolicy workloadSchedPolicy1 = new 
WorkloadSchedPolicy();
+                workloadSchedPolicy1.setWorkloadConditionList(operatorList);
+
+                WorkloadQueryInfo queryInfo = new WorkloadQueryInfo();
+                queryInfo.metricMap = new HashMap<>();
+                queryInfo.metricMap.put(WorkloadMetricType.USERNAME, "root");
+
+                // match
+                Assert.assertTrue(workloadSchedPolicy1.isMatch(queryInfo));
+
+                // not match
+                queryInfo.metricMap.put(WorkloadMetricType.USERNAME, "abc");
+                Assert.assertFalse(workloadSchedPolicy1.isMatch(queryInfo));
+            }
+
+            // 3 mixed condition
+            {
+                List<WorkloadCondition> operatorList = new ArrayList<>();
+                WorkloadCondition strCondition = new 
WorkloadConditionUsername(WorkloadConditionOperator.EQUAL, "root");
+                operatorList.add(strCondition);
+
+                WorkloadCondition intCondition = new 
WorkloadConditionQueryTime(WorkloadConditionOperator.EQUAL, 100);
+                operatorList.add(intCondition);
+
+                WorkloadSchedPolicy workloadSchedPolicy1 = new 
WorkloadSchedPolicy();
+                workloadSchedPolicy1.setWorkloadConditionList(operatorList);
+
+                WorkloadQueryInfo queryInfo = new WorkloadQueryInfo();
+                queryInfo.metricMap = new HashMap<>();
+                queryInfo.metricMap.put(WorkloadMetricType.USERNAME, "root");
+                queryInfo.metricMap.put(WorkloadMetricType.QUERY_TIME, "100");
+
+                // match
+                Assert.assertTrue(workloadSchedPolicy1.isMatch(queryInfo));
+
+                // not match 1
+                queryInfo.metricMap.remove(WorkloadMetricType.USERNAME);
+                Assert.assertFalse(workloadSchedPolicy1.isMatch(queryInfo));
+
+                // not match 2
+                queryInfo.metricMap.put(WorkloadMetricType.USERNAME, "abc");
+                Assert.assertFalse(workloadSchedPolicy1.isMatch(queryInfo));
+            }
+
+    }
+
+}
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index 4f101f1177e..2d0f380dbce 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -702,6 +702,7 @@ enum TMetadataType {
   JOBS,
   TASKS,
   QUERIES,
+  WORKLOAD_SCHED_POLICY
 }
 
 enum TIcebergQueryType {
diff --git 
a/regression-test/data/workload_manager_p0/test_workload_sched_policy.out 
b/regression-test/data/workload_manager_p0/test_workload_sched_policy.out
new file mode 100644
index 00000000000..4e8482384c4
--- /dev/null
+++ b/regression-test/data/workload_manager_p0/test_workload_sched_policy.out
@@ -0,0 +1,9 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_policy_tvf --
+full_policy_policy     query_time > 10;username = root set_session_variable 
"workload_group=normal"    10      false   0
+move_action_policy     username = root move_query_to_group "normal"    0       
true    0
+set_action_policy      query_time > 10;username = root set_session_variable 
"workload_group=normal"    0       true    0
+test_cancel_policy     query_time > 10 cancel_query    0       false   0
+
+-- !select_policy_tvf_after_drop --
+
diff --git 
a/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy 
b/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy
new file mode 100644
index 00000000000..be4d7411e3d
--- /dev/null
+++ 
b/regression-test/suites/workload_manager_p0/test_workload_sched_policy.groovy
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_workload_sched_policy") {
+
+    sql "set experimental_enable_nereids_planner = false;"
+
+    sql "drop workload schedule policy if exists full_policy_policy;"
+    sql "drop workload schedule policy if exists set_action_policy;"
+    sql "drop workload schedule policy if exists move_action_policy;"
+    sql "drop workload schedule policy if exists test_cancel_policy;"
+    sql "drop workload schedule policy if exists test_set_var_policy;"
+    sql "drop workload schedule policy if exists test_set_var_policy2;"
+
+    // 1 create cancel policy
+    sql "create workload schedule policy test_cancel_policy " +
+            " conditions(query_time > 10) " +
+            " actions(cancel_query) properties('enabled'='false'); "
+
+    // 2 create cancel policy
+    sql "create workload schedule policy move_action_policy " +
+            "conditions(username='root') " +
+            "actions(move_query_to_group 'normal');"
+
+    // 3 create set policy
+    sql "create workload schedule policy set_action_policy " +
+            "conditions(query_time > 10, username='root') " +
+            "actions(set_session_variable 'workload_group=normal');"
+
+    // 4 create policy with property
+    sql "create workload schedule policy full_policy_policy " +
+            "conditions(query_time > 10, username='root') " +
+            "actions(set_session_variable 'workload_group=normal') " +
+            "properties( " +
+            "'enabled' = 'false', " +
+            "'priority'='10' " +
+            ");"
+
+    qt_select_policy_tvf "select 
name,condition,action,priority,enabled,version from workload_schedule_policy() 
order by name;"
+
+    // test_alter
+    sql "alter workload schedule policy full_policy_policy 
properties('priority'='2', 'enabled'='false');"
+
+    // create failed check
+    try {
+        sql "create workload schedule policy failed_policy " +
+                "conditions(abc > 123) actions(cancel_query);"
+    } catch(Exception e) {
+        assertTrue(e.getMessage().contains("invalid metric name"))
+    }
+
+    try {
+        sql "create workload schedule policy failed_policy " +
+                "conditions(query_time > 123) actions(abc);"
+    } catch(Exception e) {
+        assertTrue(e.getMessage().contains("invalid action type"))
+    }
+
+    try {
+        sql "alter workload schedule policy full_policy_policy 
properties('priority'='abc');"
+    } catch (Exception e) {
+        assertTrue(e.getMessage().contains("invalid priority property value"))
+    }
+
+    try {
+        sql "alter workload schedule policy full_policy_policy 
properties('enabled'='abc');"
+    } catch (Exception e) {
+        assertTrue(e.getMessage().contains("invalid enabled property value"))
+    }
+
+    try {
+        sql "alter workload schedule policy full_policy_policy 
properties('priority'='10000');"
+    } catch (Exception e) {
+        assertTrue(e.getMessage().contains("priority can only between"))
+    }
+
+    try {
+        sql "create workload schedule policy conflict_policy " +
+                "conditions (username = 'root')" +
+                "actions(cancel_query, move_query_to_group 'normal');"
+    } catch (Exception e) {
+        assertTrue(e.getMessage().contains("can not exist in one policy at 
same time"))
+    }
+
+    try {
+        sql "create workload schedule policy conflict_policy " +
+                "conditions (username = 'root') " +
+                "actions(cancel_query, cancel_query);"
+    } catch (Exception e) {
+        assertTrue(e.getMessage().contains("duplicate action in one policy"))
+    }
+
+    try {
+        sql "create workload schedule policy conflict_policy " +
+                "conditions (username = 'root') " +
+                "actions(set_session_variable 'workload_group=normal', 
set_session_variable 'workload_group=abc');"
+    } catch (Exception e) {
+        assertTrue(e.getMessage().contains("duplicate set_session_variable 
action args one policy"))
+    }
+
+    // drop
+    sql "drop workload schedule policy full_policy_policy;"
+    sql "drop workload schedule policy set_action_policy;"
+    sql "drop workload schedule policy move_action_policy;"
+    sql "drop workload schedule policy test_cancel_policy;"
+
+    qt_select_policy_tvf_after_drop "select 
name,condition,action,priority,enabled,version from workload_schedule_policy() 
order by name;"
+
+    // test workload schedule policy
+    sql "ADMIN SET FRONTEND CONFIG ('workload_sched_policy_interval_ms' = 
'500');"
+    sql """drop user if exists test_workload_sched_user"""
+    sql """create user test_workload_sched_user identified by '12345'"""
+    sql """grant ADMIN_PRIV on *.*.* to test_workload_sched_user"""
+
+    // 1 create test_set_var_policy
+    sql "create workload schedule policy test_set_var_policy 
conditions(username='test_workload_sched_user')" +
+            "actions(set_session_variable 'parallel_pipeline_task_num=33');"
+    def result1 = connect(user = 'test_workload_sched_user', password = 
'12345', url = context.config.jdbcUrl) {
+        logger.info("begin sleep 15s to wait")
+        Thread.sleep(15000)
+        sql "show variables like '%parallel_pipeline_task_num%';"
+    }
+    assertEquals("parallel_pipeline_task_num", result1[0][0])
+    assertEquals("33", result1[0][1])
+
+    // 2 create test_set_var_policy2 with higher priority
+    sql "create workload schedule policy test_set_var_policy2 
conditions(username='test_workload_sched_user') " +
+            "actions(set_session_variable 'parallel_pipeline_task_num=22') 
properties('priority'='10');"
+    def result2 = connect(user = 'test_workload_sched_user', password = 
'12345', url = context.config.jdbcUrl) {
+        Thread.sleep(3000)
+        sql "show variables like '%parallel_pipeline_task_num%';"
+    }
+    assertEquals("parallel_pipeline_task_num", result2[0][0])
+    assertEquals("22", result2[0][1])
+
+    // 3 disable test_set_var_policy2
+    sql "alter workload schedule policy test_set_var_policy2 
properties('enabled'='false');"
+    def result3 = connect(user = 'test_workload_sched_user', password = 
'12345', url = context.config.jdbcUrl) {
+        Thread.sleep(3000)
+        sql "show variables like '%parallel_pipeline_task_num%';"
+    }
+    assertEquals("parallel_pipeline_task_num", result3[0][0])
+    assertEquals("33", result3[0][1])
+
+    sql "ADMIN SET FRONTEND CONFIG ('workload_sched_policy_interval_ms' = 
'10000');"
+
+    sql "drop workload schedule policy if exists full_policy_policy;"
+    sql "drop workload schedule policy if exists set_action_policy;"
+    sql "drop workload schedule policy if exists move_action_policy;"
+    sql "drop workload schedule policy if exists test_cancel_policy;"
+    sql "drop workload schedule policy if exists test_set_var_policy;"
+    sql "drop workload schedule policy if exists test_set_var_policy2;"
+
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to