This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 194f3432ab86a42b80825a17258b08ebf7e47067
Author: wangbo <wan...@apache.org>
AuthorDate: Mon Mar 11 22:51:15 2024 +0800

    [Improvement](executor)Routine load support workload group #31671
---
 .../doris/analysis/AlterRoutineLoadStmt.java       |  8 +++++
 .../doris/analysis/CreateRoutineLoadStmt.java      | 14 ++++++++
 .../doris/load/routineload/KafkaTaskInfo.java      | 27 +++++++++++++++
 .../doris/load/routineload/RoutineLoadJob.java     | 14 ++++++++
 .../resource/workloadgroup/WorkloadGroupMgr.java   | 39 ++++++++++++++++++++--
 regression-test/pipeline/p0/conf/fe.conf           |  1 +
 .../load_p0/routine_load/test_routine_load.groovy  |  8 ++++-
 .../workload_manager_p0/test_curd_wlg.groovy       | 12 +++----
 8 files changed, 114 insertions(+), 9 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java
index 2df891fbb3c..5a1f1ba56aa 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java
@@ -29,6 +29,7 @@ import org.apache.doris.common.util.Util;
 import org.apache.doris.load.routineload.AbstractDataSourceProperties;
 import org.apache.doris.load.routineload.RoutineLoadDataSourcePropertyFactory;
 import org.apache.doris.load.routineload.RoutineLoadJob;
+import org.apache.doris.qe.ConnectContext;
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
@@ -66,6 +67,7 @@ public class AlterRoutineLoadStmt extends DdlStmt {
             .add(CreateRoutineLoadStmt.PARTIAL_COLUMNS)
             .add(LoadStmt.STRICT_MODE)
             .add(LoadStmt.TIMEZONE)
+            .add(CreateRoutineLoadStmt.WORKLOAD_GROUP)
             .build();
 
     private final LabelName labelName;
@@ -242,6 +244,12 @@ public class AlterRoutineLoadStmt extends DdlStmt {
             analyzedJobProperties.put(CreateRoutineLoadStmt.PARTIAL_COLUMNS,
                     String.valueOf(isPartialUpdate));
         }
+        if (jobProperties.containsKey(CreateRoutineLoadStmt.WORKLOAD_GROUP)) {
+            String workloadGroup = 
jobProperties.get(CreateRoutineLoadStmt.WORKLOAD_GROUP);
+            long wgId = Env.getCurrentEnv().getWorkloadGroupMgr()
+                    
.getWorkloadGroup(ConnectContext.get().getCurrentUserIdentity(), workloadGroup);
+            analyzedJobProperties.put(CreateRoutineLoadStmt.WORKLOAD_GROUP, 
String.valueOf(wgId));
+        }
     }
 
     private void checkDataSourceProperties() throws UserException {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
index f859d7d8f05..d58b25195c7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
@@ -111,6 +111,8 @@ public class CreateRoutineLoadStmt extends DdlStmt {
 
     public static final String PARTIAL_COLUMNS = "partial_columns";
 
+    public static final String WORKLOAD_GROUP = "workload_group";
+
     private static final String NAME_TYPE = "ROUTINE LOAD NAME";
     public static final String ENDPOINT_REGEX = 
"[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]";
     public static final String SEND_BATCH_PARALLELISM = 
"send_batch_parallelism";
@@ -138,6 +140,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
             .add(SEND_BATCH_PARALLELISM)
             .add(LOAD_TO_SINGLE_TABLET)
             .add(PARTIAL_COLUMNS)
+            .add(WORKLOAD_GROUP)
             .build();
 
     private final LabelName labelName;
@@ -179,6 +182,8 @@ public class CreateRoutineLoadStmt extends DdlStmt {
 
     private String escape;
 
+    private long workloadGroupId = -1;
+
     /**
      * support partial columns load(Only Unique Key Columns)
      */
@@ -330,6 +335,10 @@ public class CreateRoutineLoadStmt extends DdlStmt {
         return comment;
     }
 
+    public long getWorkloadGroupId() {
+        return workloadGroupId;
+    }
+
     @Override
     public void analyze(Analyzer analyzer) throws UserException {
         super.analyze(analyzer);
@@ -506,6 +515,11 @@ public class CreateRoutineLoadStmt extends DdlStmt {
         if (escape != null && escape.length() != 1) {
             throw new AnalysisException("escape must be single-char");
         }
+        String inputWorkloadGroupStr = jobProperties.get(WORKLOAD_GROUP);
+        if (!StringUtils.isEmpty(inputWorkloadGroupStr)) {
+            this.workloadGroupId = Env.getCurrentEnv().getWorkloadGroupMgr()
+                    
.getWorkloadGroup(ConnectContext.get().getCurrentUserIdentity(), 
inputWorkloadGroupStr);
+        }
 
         if (ConnectContext.get() != null) {
             timezone = ConnectContext.get().getSessionVariable().getTimeZone();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
index a8d387a2f6d..d8b79d9bdce 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
@@ -28,6 +28,7 @@ import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TKafkaLoadInfo;
 import org.apache.doris.thrift.TLoadSourceType;
 import org.apache.doris.thrift.TPipelineFragmentParams;
+import org.apache.doris.thrift.TPipelineWorkloadGroup;
 import org.apache.doris.thrift.TPlanFragment;
 import org.apache.doris.thrift.TRoutineLoadTask;
 import org.apache.doris.thrift.TUniqueId;
@@ -130,6 +131,19 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
         TExecPlanFragmentParams tExecPlanFragmentParams = 
routineLoadJob.plan(loadId, txnId);
         TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment();
         tPlanFragment.getOutputSink().getOlapTableSink().setTxnId(txnId);
+
+        long wgId = routineLoadJob.getWorkloadId();
+        List<TPipelineWorkloadGroup> tWgList = new ArrayList<>();
+        if (wgId > 0) {
+            tWgList = Env.getCurrentEnv().getWorkloadGroupMgr()
+                    .getTWorkloadGroupById(wgId);
+        }
+        if (tWgList.size() == 0) {
+            tWgList = Env.getCurrentEnv().getWorkloadGroupMgr()
+                    
.getTWorkloadGroupByUserIdentity(routineLoadJob.getUserIdentity());
+        }
+        tExecPlanFragmentParams.setWorkloadGroups(tWgList);
+
         return tExecPlanFragmentParams;
     }
 
@@ -139,6 +153,19 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
         TPipelineFragmentParams tExecPlanFragmentParams = 
routineLoadJob.planForPipeline(loadId, txnId);
         TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment();
         tPlanFragment.getOutputSink().getOlapTableSink().setTxnId(txnId);
+
+        long wgId = routineLoadJob.getWorkloadId();
+        List<TPipelineWorkloadGroup> tWgList = new ArrayList<>();
+        if (wgId > 0) {
+            tWgList = Env.getCurrentEnv().getWorkloadGroupMgr()
+                    .getTWorkloadGroupById(wgId);
+        }
+        if (tWgList.size() == 0) {
+            tWgList = Env.getCurrentEnv().getWorkloadGroupMgr()
+                    
.getTWorkloadGroupByUserIdentity(routineLoadJob.getUserIdentity());
+        }
+        tExecPlanFragmentParams.setWorkloadGroups(tWgList);
+
         return tExecPlanFragmentParams;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 8760dc4b71c..f9be2014e30 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -69,6 +69,7 @@ import org.apache.doris.transaction.TransactionException;
 import org.apache.doris.transaction.TransactionState;
 import org.apache.doris.transaction.TransactionStatus;
 
+import com.aliyuncs.utils.StringUtils;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
@@ -117,6 +118,8 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
 
     protected static final String STAR_STRING = "*";
 
+    public static final String WORKLOAD_GROUP = "workload_group";
+
     @Getter
     @Setter
     private boolean isMultiTable = false;
@@ -394,6 +397,9 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
         if (stmt.getEscape() != null) {
             jobProperties.put(LoadStmt.KEY_ESCAPE, stmt.getEscape());
         }
+        if (stmt.getWorkloadGroupId() > 0) {
+            jobProperties.put(WORKLOAD_GROUP, 
String.valueOf(stmt.getWorkloadGroupId()));
+        }
     }
 
     private void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) {
@@ -479,6 +485,14 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
         return database.getTableOrMetaException(tableId).getName();
     }
 
+    public long getWorkloadId() {
+        String workloadIdStr = jobProperties.get(WORKLOAD_GROUP);
+        if (!StringUtils.isEmpty(workloadIdStr)) {
+            return Long.parseLong(workloadIdStr);
+        }
+        return -1;
+    }
+
     public JobState getState() {
         return state;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
index a7c26f7cec5..1bd1a357127 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
@@ -183,13 +183,48 @@ public class WorkloadGroupMgr implements Writable, 
GsonPostProcessable {
         return workloadGroups;
     }
 
-    public WorkloadGroup getWorkloadGroupById(long wgId) {
+    public long getWorkloadGroup(UserIdentity currentUser, String groupName) 
throws UserException {
+        Long workloadId = getWorkloadGroupIdByName(groupName);
+        if (workloadId == null) {
+            throw new UserException("Workload group " + groupName + " does not 
exist");
+        }
+        if (!Env.getCurrentEnv().getAccessManager()
+                .checkWorkloadGroupPriv(currentUser, groupName, 
PrivPredicate.USAGE)) {
+            ErrorReport.reportAnalysisException(
+                    "Access denied; you need (at least one of) the %s 
privilege(s) to use workload group '%s'.",
+                    ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "USAGE/ADMIN", 
groupName);
+        }
+        return workloadId.longValue();
+    }
+
+    public List<TPipelineWorkloadGroup> getTWorkloadGroupById(long wgId) {
+        List<TPipelineWorkloadGroup> tWorkloadGroups = Lists.newArrayList();
+        readLock();
+        try {
+            WorkloadGroup wg = idToWorkloadGroup.get(wgId);
+            if (wg != null) {
+                tWorkloadGroups.add(wg.toThrift());
+            }
+        } finally {
+            readUnlock();
+        }
+        return tWorkloadGroups;
+    }
+
+    public List<TPipelineWorkloadGroup> 
getTWorkloadGroupByUserIdentity(UserIdentity user) throws UserException {
+        String groupName = 
Env.getCurrentEnv().getAuth().getWorkloadGroup(user.getQualifiedUser());
+        List<TPipelineWorkloadGroup> ret = new ArrayList<>();
         readLock();
         try {
-            return idToWorkloadGroup.get(wgId);
+            WorkloadGroup wg = nameToWorkloadGroup.get(groupName);
+            if (wg == null) {
+                throw new UserException("can not find workload group " + 
groupName);
+            }
+            ret.add(wg.toThrift());
         } finally {
             readUnlock();
         }
+        return ret;
     }
 
     public List<TopicInfo> getPublishTopicInfo() {
diff --git a/regression-test/pipeline/p0/conf/fe.conf 
b/regression-test/pipeline/p0/conf/fe.conf
index 22f58cb22f5..28f1972c701 100644
--- a/regression-test/pipeline/p0/conf/fe.conf
+++ b/regression-test/pipeline/p0/conf/fe.conf
@@ -109,6 +109,7 @@ label_keep_max_second = 300
 enable_job_schedule_second_for_test = true
 
 enable_workload_group = true
+publish_topic_info_interval_ms = 1000
 
 master_sync_policy = WRITE_NO_SYNC
 replica_sync_policy = WRITE_NO_SYNC
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load.groovy 
b/regression-test/suites/load_p0/routine_load/test_routine_load.groovy
index 963d6ebc110..87eae26a48a 100644
--- a/regression-test/suites/load_p0/routine_load/test_routine_load.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_routine_load.groovy
@@ -22,6 +22,10 @@ import org.apache.kafka.clients.producer.ProducerConfig
 
 suite("test_routine_load","p0") {
 
+    sql "create workload group if not exists create_routine_load_group 
properties ( 'cpu_share'='123');"
+    sql "create workload group if not exists alter_routine_load_group 
properties ( 'cpu_share'='123');"
+    Thread.sleep(5000) // wait publish workload group to be
+
     def tables = [
                   "dup_tbl_basic",
                   "uniq_tbl_basic",
@@ -226,7 +230,8 @@ suite("test_routine_load","p0") {
                         "send_batch_parallelism" = "2",
                         "max_batch_interval" = "5",
                         "max_batch_rows" = "300000",
-                        "max_batch_size" = "209715200"
+                        "max_batch_size" = "209715200",
+                        "workload_group" = "create_routine_load_group"
                     )
                     FROM KAFKA
                     (
@@ -1932,6 +1937,7 @@ suite("test_routine_load","p0") {
                 sql "ALTER ROUTINE LOAD FOR ${jobs[i]} PROPERTIES(\"timezone\" 
= \"Asia/Shanghai\");"
                 sql "ALTER ROUTINE LOAD FOR ${jobs[i]} 
PROPERTIES(\"num_as_string\" = \"true\");"
                 sql "ALTER ROUTINE LOAD FOR ${jobs[i]} 
PROPERTIES(\"fuzzy_parse\" = \"true\");"
+                sql "ALTER ROUTINE LOAD FOR ${jobs[i]} 
PROPERTIES(\"workload_group\" = \"alter_routine_load_group\");"
                 res = sql "show routine load for ${jobs[i]}"
                 log.info("routine load job properties: 
${res[0][11].toString()}".toString())
 
diff --git a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy 
b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
index 4b4fa51486e..864d1ab5b21 100644
--- a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
+++ b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy
@@ -117,7 +117,7 @@ suite("test_crud_wlg") {
             ");"
     sql "set workload_group=test_group;"
 
-    qt_show_1 "select 
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num
 from workload_groups() order by name;"
+    qt_show_1 "select 
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num
 from workload_groups() where name in ('normal','test_group') order by name;"
 
     // test memory_limit
     test {
@@ -128,7 +128,7 @@ suite("test_crud_wlg") {
 
     sql "alter workload group test_group properties ( 'memory_limit'='11%' );"
     qt_mem_limit_1 """ select count(1) from ${table_name} """
-    qt_mem_limit_2 "select 
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num
 from workload_groups() order by name;"
+    qt_mem_limit_2 "select 
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num
 from workload_groups() where name in ('normal','test_group') order by name;"
 
     // test enable_memory_overcommit
     test {
@@ -141,7 +141,7 @@ suite("test_crud_wlg") {
     qt_mem_overcommit_1 """ select count(1) from ${table_name} """
     sql "alter workload group test_group properties ( 
'enable_memory_overcommit'='false' );"
     qt_mem_overcommit_2 """ select count(1) from ${table_name} """
-    qt_mem_overcommit_3 "select 
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num
 from workload_groups() order by name;"
+    qt_mem_overcommit_3 "select 
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num
 from workload_groups() where name in ('normal','test_group') order by name;"
 
     // test cpu_hard_limit
     test {
@@ -160,7 +160,7 @@ suite("test_crud_wlg") {
 
     sql "alter workload group test_group properties ( 'cpu_hard_limit'='20%' 
);"
     qt_cpu_hard_limit_1 """ select count(1) from ${table_name} """
-    qt_cpu_hard_limit_2 "select 
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num
 from workload_groups() order by name;"
+    qt_cpu_hard_limit_2 "select 
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num
 from workload_groups() where name in ('normal','test_group') order by name;"
 
     // test query queue
     test {
@@ -183,7 +183,7 @@ suite("test_crud_wlg") {
 
     sql "alter workload group test_group properties ( 'max_concurrency'='100' 
);"
     qt_queue_1 """ select count(1) from ${table_name} """
-    qt_show_queue "select 
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num
 from workload_groups() order by name;"
+    qt_show_queue "select 
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num
 from workload_groups() where name in ('normal','test_group') order by name;"
 
     // test create group failed
     // failed for cpu_share
@@ -261,7 +261,7 @@ suite("test_crud_wlg") {
     }
 
     // test show workload groups
-    qt_select_tvf_1 "select 
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num
 from workload_groups() order by name;"
+    qt_select_tvf_1 "select 
name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num
 from workload_groups() where name in ('normal','test_group') order by name;"
 
     // test auth
     sql """drop user if exists test_wlg_user"""


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to