This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 95b1f76664b [Feature](executor)broker load support workload group (#30866) (#31580) 95b1f76664b is described below commit 95b1f76664b9c7ffd7788782d05dfa423d124d3c Author: wangbo <wan...@apache.org> AuthorDate: Thu Feb 29 15:09:10 2024 +0800 [Feature](executor)broker load support workload group (#30866) (#31580) --- .../java/org/apache/doris/load/loadv2/BrokerLoadJob.java | 1 + .../src/main/java/org/apache/doris/load/loadv2/LoadJob.java | 7 ++++++- .../java/org/apache/doris/load/loadv2/LoadLoadingTask.java | 12 ++++++++++++ .../main/java/org/apache/doris/load/loadv2/LoadManager.java | 8 +++++++- .../src/main/java/org/apache/doris/qe/MultiLoadMgr.java | 2 +- .../suites/load_p0/broker_load/test_s3_load.groovy | 4 ++++ 6 files changed, 31 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index 0cbb4e0cfb5..50e46fc383f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -223,6 +223,7 @@ public class BrokerLoadJob extends BulkLoadJob { TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); task.init(loadId, attachment.getFileStatusByTable(aggKey), attachment.getFileNumByTable(aggKey), getUserInfo()); + task.settWorkloadGroups(tWorkloadGroups); idToTasks.put(task.getSignature(), task); // idToTasks contains previous LoadPendingTasks, so idToTasks is just used to save all tasks. // use newLoadingTasks to save new created loading tasks and submit them later. diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index c83ce68338f..4eb6be72795 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -52,6 +52,7 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.Coordinator; import org.apache.doris.qe.QeProcessorImpl; import org.apache.doris.thrift.TEtlState; +import org.apache.doris.thrift.TPipelineWorkloadGroup; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.AbstractTxnStateChangeCallback; import org.apache.doris.transaction.BeginTransactionException; @@ -134,7 +135,7 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements protected String comment = ""; - + protected List<TPipelineWorkloadGroup> tWorkloadGroups = null; public LoadJob(EtlJobType jobType) { this.jobType = jobType; @@ -1166,4 +1167,8 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements public LoadStatistic getLoadStatistic() { return loadStatistic; } + + public void settWorkloadGroups(List<TPipelineWorkloadGroup> tWorkloadGroups) { + this.tWorkloadGroups = tWorkloadGroups; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java index a56ba1acd14..8fbabea8629 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java @@ -34,6 +34,7 @@ import org.apache.doris.load.FailMsg; import org.apache.doris.qe.Coordinator; import org.apache.doris.qe.QeProcessorImpl; import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TPipelineWorkloadGroup; import org.apache.doris.thrift.TQueryType; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.ErrorTabletInfo; @@ -79,6 +80,8 @@ public class LoadLoadingTask extends LoadTask { private Profile jobProfile; private long beginTime; + private List<TPipelineWorkloadGroup> tWorkloadGroups = null; + public LoadLoadingTask(Database db, OlapTable table, BrokerDesc brokerDesc, List<BrokerFileGroup> fileGroups, long jobDeadlineMs, long execMemLimit, boolean strictMode, boolean isPartialUpdate, @@ -164,6 +167,10 @@ public class LoadLoadingTask extends LoadTask { int timeoutS = Math.max((int) (leftTimeMs / 1000), 1); curCoordinator.setTimeout(timeoutS); + if (tWorkloadGroups != null) { + curCoordinator.setTWorkloadGroups(tWorkloadGroups); + } + try { QeProcessorImpl.INSTANCE.registerQuery(loadId, curCoordinator); actualExecute(curCoordinator, timeoutS); @@ -221,4 +228,9 @@ public class LoadLoadingTask extends LoadTask { this.loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); planner.updateLoadId(this.loadId); } + + void settWorkloadGroups(List<TPipelineWorkloadGroup> tWorkloadGroups) { + this.tWorkloadGroups = tWorkloadGroups; + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index 1840494dcb2..a1de8e4405a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -115,7 +115,7 @@ public class LoadManager implements Writable { /** * This method will be invoked by the broker load(v2) now. */ - public long createLoadJobFromStmt(LoadStmt stmt) throws DdlException { + public long createLoadJobFromStmt(LoadStmt stmt) throws DdlException, UserException { Database database = checkDb(stmt.getLabel().getDbName()); long dbId = database.getId(); LoadJob loadJob; @@ -144,6 +144,12 @@ public class LoadManager implements Writable { } finally { writeUnlock(); } + + if (Config.enable_workload_group) { + loadJob.settWorkloadGroups( + Env.getCurrentEnv().getWorkloadGroupMgr().getWorkloadGroup(ConnectContext.get())); + } + Env.getCurrentEnv().getEditLog().logCreateLoadJob(loadJob); // The job must be submitted after edit log. diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java index 2d1f512e29e..63f6de760cb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java @@ -140,7 +140,7 @@ public class MultiLoadMgr { // 'db' and 'label' form a multiLabel used to // user can pass commitLabel which use this string commit to jobmgr - public void commit(String fullDbName, String label) throws DdlException { + public void commit(String fullDbName, String label) throws DdlException, UserException { LabelName multiLabel = new LabelName(fullDbName, label); List<Long> jobIds = Lists.newArrayList(); lock.writeLock().lock(); diff --git a/regression-test/suites/load_p0/broker_load/test_s3_load.groovy b/regression-test/suites/load_p0/broker_load/test_s3_load.groovy index 6312cc3feb7..3ebf4348dab 100644 --- a/regression-test/suites/load_p0/broker_load/test_s3_load.groovy +++ b/regression-test/suites/load_p0/broker_load/test_s3_load.groovy @@ -17,6 +17,10 @@ suite("test_s3_load", "load_p0") { + sql "create workload group if not exists broker_load_test properties ( 'cpu_share'='1024'); " + + sql "set workload_group=broker_load_test;" + def tables = [ "agg_tbl_basic", "dup_tbl_array", --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org