This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 194f3432ab86a42b80825a17258b08ebf7e47067 Author: wangbo <wan...@apache.org> AuthorDate: Mon Mar 11 22:51:15 2024 +0800 [Improvement](executor)Routine load support workload group #31671 --- .../doris/analysis/AlterRoutineLoadStmt.java | 8 +++++ .../doris/analysis/CreateRoutineLoadStmt.java | 14 ++++++++ .../doris/load/routineload/KafkaTaskInfo.java | 27 +++++++++++++++ .../doris/load/routineload/RoutineLoadJob.java | 14 ++++++++ .../resource/workloadgroup/WorkloadGroupMgr.java | 39 ++++++++++++++++++++-- regression-test/pipeline/p0/conf/fe.conf | 1 + .../load_p0/routine_load/test_routine_load.groovy | 8 ++++- .../workload_manager_p0/test_curd_wlg.groovy | 12 +++---- 8 files changed, 114 insertions(+), 9 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java index 2df891fbb3c..5a1f1ba56aa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java @@ -29,6 +29,7 @@ import org.apache.doris.common.util.Util; import org.apache.doris.load.routineload.AbstractDataSourceProperties; import org.apache.doris.load.routineload.RoutineLoadDataSourcePropertyFactory; import org.apache.doris.load.routineload.RoutineLoadJob; +import org.apache.doris.qe.ConnectContext; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; @@ -66,6 +67,7 @@ public class AlterRoutineLoadStmt extends DdlStmt { .add(CreateRoutineLoadStmt.PARTIAL_COLUMNS) .add(LoadStmt.STRICT_MODE) .add(LoadStmt.TIMEZONE) + .add(CreateRoutineLoadStmt.WORKLOAD_GROUP) .build(); private final LabelName labelName; @@ -242,6 +244,12 @@ public class AlterRoutineLoadStmt extends DdlStmt { analyzedJobProperties.put(CreateRoutineLoadStmt.PARTIAL_COLUMNS, String.valueOf(isPartialUpdate)); } + if (jobProperties.containsKey(CreateRoutineLoadStmt.WORKLOAD_GROUP)) { + String workloadGroup = jobProperties.get(CreateRoutineLoadStmt.WORKLOAD_GROUP); + long wgId = Env.getCurrentEnv().getWorkloadGroupMgr() + .getWorkloadGroup(ConnectContext.get().getCurrentUserIdentity(), workloadGroup); + analyzedJobProperties.put(CreateRoutineLoadStmt.WORKLOAD_GROUP, String.valueOf(wgId)); + } } private void checkDataSourceProperties() throws UserException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java index f859d7d8f05..d58b25195c7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java @@ -111,6 +111,8 @@ public class CreateRoutineLoadStmt extends DdlStmt { public static final String PARTIAL_COLUMNS = "partial_columns"; + public static final String WORKLOAD_GROUP = "workload_group"; + private static final String NAME_TYPE = "ROUTINE LOAD NAME"; public static final String ENDPOINT_REGEX = "[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]"; public static final String SEND_BATCH_PARALLELISM = "send_batch_parallelism"; @@ -138,6 +140,7 @@ public class CreateRoutineLoadStmt extends DdlStmt { .add(SEND_BATCH_PARALLELISM) .add(LOAD_TO_SINGLE_TABLET) .add(PARTIAL_COLUMNS) + .add(WORKLOAD_GROUP) .build(); private final LabelName labelName; @@ -179,6 +182,8 @@ public class CreateRoutineLoadStmt extends DdlStmt { private String escape; + private long workloadGroupId = -1; + /** * support partial columns load(Only Unique Key Columns) */ @@ -330,6 +335,10 @@ public class CreateRoutineLoadStmt extends DdlStmt { return comment; } + public long getWorkloadGroupId() { + return workloadGroupId; + } + @Override public void analyze(Analyzer analyzer) throws UserException { super.analyze(analyzer); @@ -506,6 +515,11 @@ public class CreateRoutineLoadStmt extends DdlStmt { if (escape != null && escape.length() != 1) { throw new AnalysisException("escape must be single-char"); } + String inputWorkloadGroupStr = jobProperties.get(WORKLOAD_GROUP); + if (!StringUtils.isEmpty(inputWorkloadGroupStr)) { + this.workloadGroupId = Env.getCurrentEnv().getWorkloadGroupMgr() + .getWorkloadGroup(ConnectContext.get().getCurrentUserIdentity(), inputWorkloadGroupStr); + } if (ConnectContext.get() != null) { timezone = ConnectContext.get().getSessionVariable().getTimeZone(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java index a8d387a2f6d..d8b79d9bdce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java @@ -28,6 +28,7 @@ import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TKafkaLoadInfo; import org.apache.doris.thrift.TLoadSourceType; import org.apache.doris.thrift.TPipelineFragmentParams; +import org.apache.doris.thrift.TPipelineWorkloadGroup; import org.apache.doris.thrift.TPlanFragment; import org.apache.doris.thrift.TRoutineLoadTask; import org.apache.doris.thrift.TUniqueId; @@ -130,6 +131,19 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo { TExecPlanFragmentParams tExecPlanFragmentParams = routineLoadJob.plan(loadId, txnId); TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment(); tPlanFragment.getOutputSink().getOlapTableSink().setTxnId(txnId); + + long wgId = routineLoadJob.getWorkloadId(); + List<TPipelineWorkloadGroup> tWgList = new ArrayList<>(); + if (wgId > 0) { + tWgList = Env.getCurrentEnv().getWorkloadGroupMgr() + .getTWorkloadGroupById(wgId); + } + if (tWgList.size() == 0) { + tWgList = Env.getCurrentEnv().getWorkloadGroupMgr() + .getTWorkloadGroupByUserIdentity(routineLoadJob.getUserIdentity()); + } + tExecPlanFragmentParams.setWorkloadGroups(tWgList); + return tExecPlanFragmentParams; } @@ -139,6 +153,19 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo { TPipelineFragmentParams tExecPlanFragmentParams = routineLoadJob.planForPipeline(loadId, txnId); TPlanFragment tPlanFragment = tExecPlanFragmentParams.getFragment(); tPlanFragment.getOutputSink().getOlapTableSink().setTxnId(txnId); + + long wgId = routineLoadJob.getWorkloadId(); + List<TPipelineWorkloadGroup> tWgList = new ArrayList<>(); + if (wgId > 0) { + tWgList = Env.getCurrentEnv().getWorkloadGroupMgr() + .getTWorkloadGroupById(wgId); + } + if (tWgList.size() == 0) { + tWgList = Env.getCurrentEnv().getWorkloadGroupMgr() + .getTWorkloadGroupByUserIdentity(routineLoadJob.getUserIdentity()); + } + tExecPlanFragmentParams.setWorkloadGroups(tWgList); + return tExecPlanFragmentParams; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 8760dc4b71c..f9be2014e30 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -69,6 +69,7 @@ import org.apache.doris.transaction.TransactionException; import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionStatus; +import com.aliyuncs.utils.StringUtils; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -117,6 +118,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl protected static final String STAR_STRING = "*"; + public static final String WORKLOAD_GROUP = "workload_group"; + @Getter @Setter private boolean isMultiTable = false; @@ -394,6 +397,9 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl if (stmt.getEscape() != null) { jobProperties.put(LoadStmt.KEY_ESCAPE, stmt.getEscape()); } + if (stmt.getWorkloadGroupId() > 0) { + jobProperties.put(WORKLOAD_GROUP, String.valueOf(stmt.getWorkloadGroupId())); + } } private void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) { @@ -479,6 +485,14 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl return database.getTableOrMetaException(tableId).getName(); } + public long getWorkloadId() { + String workloadIdStr = jobProperties.get(WORKLOAD_GROUP); + if (!StringUtils.isEmpty(workloadIdStr)) { + return Long.parseLong(workloadIdStr); + } + return -1; + } + public JobState getState() { return state; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java index a7c26f7cec5..1bd1a357127 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java @@ -183,13 +183,48 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { return workloadGroups; } - public WorkloadGroup getWorkloadGroupById(long wgId) { + public long getWorkloadGroup(UserIdentity currentUser, String groupName) throws UserException { + Long workloadId = getWorkloadGroupIdByName(groupName); + if (workloadId == null) { + throw new UserException("Workload group " + groupName + " does not exist"); + } + if (!Env.getCurrentEnv().getAccessManager() + .checkWorkloadGroupPriv(currentUser, groupName, PrivPredicate.USAGE)) { + ErrorReport.reportAnalysisException( + "Access denied; you need (at least one of) the %s privilege(s) to use workload group '%s'.", + ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "USAGE/ADMIN", groupName); + } + return workloadId.longValue(); + } + + public List<TPipelineWorkloadGroup> getTWorkloadGroupById(long wgId) { + List<TPipelineWorkloadGroup> tWorkloadGroups = Lists.newArrayList(); + readLock(); + try { + WorkloadGroup wg = idToWorkloadGroup.get(wgId); + if (wg != null) { + tWorkloadGroups.add(wg.toThrift()); + } + } finally { + readUnlock(); + } + return tWorkloadGroups; + } + + public List<TPipelineWorkloadGroup> getTWorkloadGroupByUserIdentity(UserIdentity user) throws UserException { + String groupName = Env.getCurrentEnv().getAuth().getWorkloadGroup(user.getQualifiedUser()); + List<TPipelineWorkloadGroup> ret = new ArrayList<>(); readLock(); try { - return idToWorkloadGroup.get(wgId); + WorkloadGroup wg = nameToWorkloadGroup.get(groupName); + if (wg == null) { + throw new UserException("can not find workload group " + groupName); + } + ret.add(wg.toThrift()); } finally { readUnlock(); } + return ret; } public List<TopicInfo> getPublishTopicInfo() { diff --git a/regression-test/pipeline/p0/conf/fe.conf b/regression-test/pipeline/p0/conf/fe.conf index 22f58cb22f5..28f1972c701 100644 --- a/regression-test/pipeline/p0/conf/fe.conf +++ b/regression-test/pipeline/p0/conf/fe.conf @@ -109,6 +109,7 @@ label_keep_max_second = 300 enable_job_schedule_second_for_test = true enable_workload_group = true +publish_topic_info_interval_ms = 1000 master_sync_policy = WRITE_NO_SYNC replica_sync_policy = WRITE_NO_SYNC diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load.groovy index 963d6ebc110..87eae26a48a 100644 --- a/regression-test/suites/load_p0/routine_load/test_routine_load.groovy +++ b/regression-test/suites/load_p0/routine_load/test_routine_load.groovy @@ -22,6 +22,10 @@ import org.apache.kafka.clients.producer.ProducerConfig suite("test_routine_load","p0") { + sql "create workload group if not exists create_routine_load_group properties ( 'cpu_share'='123');" + sql "create workload group if not exists alter_routine_load_group properties ( 'cpu_share'='123');" + Thread.sleep(5000) // wait publish workload group to be + def tables = [ "dup_tbl_basic", "uniq_tbl_basic", @@ -226,7 +230,8 @@ suite("test_routine_load","p0") { "send_batch_parallelism" = "2", "max_batch_interval" = "5", "max_batch_rows" = "300000", - "max_batch_size" = "209715200" + "max_batch_size" = "209715200", + "workload_group" = "create_routine_load_group" ) FROM KAFKA ( @@ -1932,6 +1937,7 @@ suite("test_routine_load","p0") { sql "ALTER ROUTINE LOAD FOR ${jobs[i]} PROPERTIES(\"timezone\" = \"Asia/Shanghai\");" sql "ALTER ROUTINE LOAD FOR ${jobs[i]} PROPERTIES(\"num_as_string\" = \"true\");" sql "ALTER ROUTINE LOAD FOR ${jobs[i]} PROPERTIES(\"fuzzy_parse\" = \"true\");" + sql "ALTER ROUTINE LOAD FOR ${jobs[i]} PROPERTIES(\"workload_group\" = \"alter_routine_load_group\");" res = sql "show routine load for ${jobs[i]}" log.info("routine load job properties: ${res[0][11].toString()}".toString()) diff --git a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy index 4b4fa51486e..864d1ab5b21 100644 --- a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy +++ b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy @@ -117,7 +117,7 @@ suite("test_crud_wlg") { ");" sql "set workload_group=test_group;" - qt_show_1 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num from workload_groups() order by name;" + qt_show_1 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num from workload_groups() where name in ('normal','test_group') order by name;" // test memory_limit test { @@ -128,7 +128,7 @@ suite("test_crud_wlg") { sql "alter workload group test_group properties ( 'memory_limit'='11%' );" qt_mem_limit_1 """ select count(1) from ${table_name} """ - qt_mem_limit_2 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num from workload_groups() order by name;" + qt_mem_limit_2 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num from workload_groups() where name in ('normal','test_group') order by name;" // test enable_memory_overcommit test { @@ -141,7 +141,7 @@ suite("test_crud_wlg") { qt_mem_overcommit_1 """ select count(1) from ${table_name} """ sql "alter workload group test_group properties ( 'enable_memory_overcommit'='false' );" qt_mem_overcommit_2 """ select count(1) from ${table_name} """ - qt_mem_overcommit_3 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num from workload_groups() order by name;" + qt_mem_overcommit_3 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num from workload_groups() where name in ('normal','test_group') order by name;" // test cpu_hard_limit test { @@ -160,7 +160,7 @@ suite("test_crud_wlg") { sql "alter workload group test_group properties ( 'cpu_hard_limit'='20%' );" qt_cpu_hard_limit_1 """ select count(1) from ${table_name} """ - qt_cpu_hard_limit_2 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num from workload_groups() order by name;" + qt_cpu_hard_limit_2 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num from workload_groups() where name in ('normal','test_group') order by name;" // test query queue test { @@ -183,7 +183,7 @@ suite("test_crud_wlg") { sql "alter workload group test_group properties ( 'max_concurrency'='100' );" qt_queue_1 """ select count(1) from ${table_name} """ - qt_show_queue "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num from workload_groups() order by name;" + qt_show_queue "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num from workload_groups() where name in ('normal','test_group') order by name;" // test create group failed // failed for cpu_share @@ -261,7 +261,7 @@ suite("test_crud_wlg") { } // test show workload groups - qt_select_tvf_1 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num from workload_groups() order by name;" + qt_select_tvf_1 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num from workload_groups() where name in ('normal','test_group') order by name;" // test auth sql """drop user if exists test_wlg_user""" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org