This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 95b1f76664b [Feature](executor)broker load support workload group 
(#30866) (#31580)
95b1f76664b is described below

commit 95b1f76664b9c7ffd7788782d05dfa423d124d3c
Author: wangbo <wan...@apache.org>
AuthorDate: Thu Feb 29 15:09:10 2024 +0800

    [Feature](executor)broker load support workload group (#30866) (#31580)
---
 .../java/org/apache/doris/load/loadv2/BrokerLoadJob.java     |  1 +
 .../src/main/java/org/apache/doris/load/loadv2/LoadJob.java  |  7 ++++++-
 .../java/org/apache/doris/load/loadv2/LoadLoadingTask.java   | 12 ++++++++++++
 .../main/java/org/apache/doris/load/loadv2/LoadManager.java  |  8 +++++++-
 .../src/main/java/org/apache/doris/qe/MultiLoadMgr.java      |  2 +-
 .../suites/load_p0/broker_load/test_s3_load.groovy           |  4 ++++
 6 files changed, 31 insertions(+), 3 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index 0cbb4e0cfb5..50e46fc383f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -223,6 +223,7 @@ public class BrokerLoadJob extends BulkLoadJob {
                 TUniqueId loadId = new 
TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
                 task.init(loadId, attachment.getFileStatusByTable(aggKey),
                         attachment.getFileNumByTable(aggKey), getUserInfo());
+                task.settWorkloadGroups(tWorkloadGroups);
                 idToTasks.put(task.getSignature(), task);
                 // idToTasks contains previous LoadPendingTasks, so idToTasks 
is just used to save all tasks.
                 // use newLoadingTasks to save new created loading tasks and 
submit them later.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
index c83ce68338f..4eb6be72795 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
@@ -52,6 +52,7 @@ import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.Coordinator;
 import org.apache.doris.qe.QeProcessorImpl;
 import org.apache.doris.thrift.TEtlState;
+import org.apache.doris.thrift.TPipelineWorkloadGroup;
 import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.transaction.AbstractTxnStateChangeCallback;
 import org.apache.doris.transaction.BeginTransactionException;
@@ -134,7 +135,7 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
 
     protected String comment = "";
 
-
+    protected List<TPipelineWorkloadGroup> tWorkloadGroups = null;
 
     public LoadJob(EtlJobType jobType) {
         this.jobType = jobType;
@@ -1166,4 +1167,8 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
     public LoadStatistic getLoadStatistic() {
         return loadStatistic;
     }
+
+    public void settWorkloadGroups(List<TPipelineWorkloadGroup> 
tWorkloadGroups) {
+        this.tWorkloadGroups = tWorkloadGroups;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
index a56ba1acd14..8fbabea8629 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
@@ -34,6 +34,7 @@ import org.apache.doris.load.FailMsg;
 import org.apache.doris.qe.Coordinator;
 import org.apache.doris.qe.QeProcessorImpl;
 import org.apache.doris.thrift.TBrokerFileStatus;
+import org.apache.doris.thrift.TPipelineWorkloadGroup;
 import org.apache.doris.thrift.TQueryType;
 import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.transaction.ErrorTabletInfo;
@@ -79,6 +80,8 @@ public class LoadLoadingTask extends LoadTask {
     private Profile jobProfile;
     private long beginTime;
 
+    private List<TPipelineWorkloadGroup> tWorkloadGroups = null;
+
     public LoadLoadingTask(Database db, OlapTable table,
             BrokerDesc brokerDesc, List<BrokerFileGroup> fileGroups,
             long jobDeadlineMs, long execMemLimit, boolean strictMode, boolean 
isPartialUpdate,
@@ -164,6 +167,10 @@ public class LoadLoadingTask extends LoadTask {
         int timeoutS = Math.max((int) (leftTimeMs / 1000), 1);
         curCoordinator.setTimeout(timeoutS);
 
+        if (tWorkloadGroups != null) {
+            curCoordinator.setTWorkloadGroups(tWorkloadGroups);
+        }
+
         try {
             QeProcessorImpl.INSTANCE.registerQuery(loadId, curCoordinator);
             actualExecute(curCoordinator, timeoutS);
@@ -221,4 +228,9 @@ public class LoadLoadingTask extends LoadTask {
         this.loadId = new TUniqueId(uuid.getMostSignificantBits(), 
uuid.getLeastSignificantBits());
         planner.updateLoadId(this.loadId);
     }
+
+    void settWorkloadGroups(List<TPipelineWorkloadGroup> tWorkloadGroups) {
+        this.tWorkloadGroups = tWorkloadGroups;
+    }
+
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index 1840494dcb2..a1de8e4405a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -115,7 +115,7 @@ public class LoadManager implements Writable {
     /**
      * This method will be invoked by the broker load(v2) now.
      */
-    public long createLoadJobFromStmt(LoadStmt stmt) throws DdlException {
+    public long createLoadJobFromStmt(LoadStmt stmt) throws DdlException, 
UserException {
         Database database = checkDb(stmt.getLabel().getDbName());
         long dbId = database.getId();
         LoadJob loadJob;
@@ -144,6 +144,12 @@ public class LoadManager implements Writable {
         } finally {
             writeUnlock();
         }
+
+        if (Config.enable_workload_group) {
+            loadJob.settWorkloadGroups(
+                    
Env.getCurrentEnv().getWorkloadGroupMgr().getWorkloadGroup(ConnectContext.get()));
+        }
+
         Env.getCurrentEnv().getEditLog().logCreateLoadJob(loadJob);
 
         // The job must be submitted after edit log.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java
index 2d1f512e29e..63f6de760cb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java
@@ -140,7 +140,7 @@ public class MultiLoadMgr {
 
     // 'db' and 'label' form a multiLabel used to
     // user can pass commitLabel which use this string commit to jobmgr
-    public void commit(String fullDbName, String label) throws DdlException {
+    public void commit(String fullDbName, String label) throws DdlException, 
UserException {
         LabelName multiLabel = new LabelName(fullDbName, label);
         List<Long> jobIds = Lists.newArrayList();
         lock.writeLock().lock();
diff --git a/regression-test/suites/load_p0/broker_load/test_s3_load.groovy 
b/regression-test/suites/load_p0/broker_load/test_s3_load.groovy
index 6312cc3feb7..3ebf4348dab 100644
--- a/regression-test/suites/load_p0/broker_load/test_s3_load.groovy
+++ b/regression-test/suites/load_p0/broker_load/test_s3_load.groovy
@@ -17,6 +17,10 @@
 
 suite("test_s3_load", "load_p0") {
 
+    sql "create workload group if not exists broker_load_test properties ( 
'cpu_share'='1024'); "
+
+    sql "set workload_group=broker_load_test;"
+
     def tables = [
             "agg_tbl_basic",
             "dup_tbl_array",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to