This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new ed9794a0fe6 [Pick][Improment]publish workload to BE by tag (#38486) 
(#39730)
ed9794a0fe6 is described below

commit ed9794a0fe66f7e0458c251d367a376c54f1de27
Author: wangbo <wan...@apache.org>
AuthorDate: Thu Aug 22 00:48:16 2024 +0800

    [Pick][Improment]publish workload to BE by tag (#38486) (#39730)
    
    A workload group's tag property may be three cases as below: 1 empty
    string, null or '', it could be published to all BE. 2 a value match
    some BE' location, then the workload group could only be published to
    the BE with same tag.
    3 not an empty string, but some invalid string which can not math any
    BE's location, then it could not be published any BE.
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
---
 .../doris/common/publish/TopicPublisherThread.java | 28 +++++++++++++++-
 .../main/java/org/apache/doris/resource/Tag.java   |  2 ++
 .../resource/workloadgroup/WorkloadGroup.java      | 24 +++++++++++---
 .../resource/workloadgroup/WorkloadGroupMgr.java   |  3 ++
 .../main/java/org/apache/doris/system/Backend.java | 37 ++++++++++++++++++++++
 .../workloadgroup/WorkloadGroupMgrTest.java        | 13 ++++++--
 gensrc/thrift/BackendService.thrift                |  3 ++
 7 files changed, 103 insertions(+), 7 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java
index dde45e44e29..2407e3a2516 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/publish/TopicPublisherThread.java
@@ -27,6 +27,7 @@ import org.apache.doris.thrift.BackendService;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TPublishTopicRequest;
 import org.apache.doris.thrift.TTopicInfoType;
+import org.apache.doris.thrift.TWorkloadGroupInfo;
 import org.apache.doris.thrift.TopicInfo;
 
 import org.apache.logging.log4j.LogManager;
@@ -35,8 +36,10 @@ import org.apache.logging.log4j.Logger;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 
 public class TopicPublisherThread extends MasterDaemon {
@@ -120,7 +123,30 @@ public class TopicPublisherThread extends MasterDaemon {
             try {
                 address = new TNetworkAddress(be.getHost(), be.getBePort());
                 client = ClientPool.backendPool.borrowObject(address);
-                client.publishTopicInfo(request);
+                // check whether workload group tag math current be
+                TPublishTopicRequest copiedRequest = request.deepCopy();
+                if (copiedRequest.isSetTopicMap()) {
+                    Map<TTopicInfoType, List<TopicInfo>> topicMap = 
copiedRequest.getTopicMap();
+                    List<TopicInfo> topicInfoList = 
topicMap.get(TTopicInfoType.WORKLOAD_GROUP);
+                    if (topicInfoList != null) {
+                        Set<String> beTagSet = be.getBeWorkloadGroupTagSet();
+                        Iterator<TopicInfo> topicIter = 
topicInfoList.iterator();
+                        while (topicIter.hasNext()) {
+                            TopicInfo topicInfo = topicIter.next();
+                            if (topicInfo.isSetWorkloadGroupInfo()) {
+                                TWorkloadGroupInfo tWgInfo = 
topicInfo.getWorkloadGroupInfo();
+                                if (tWgInfo.isSetTag() && 
!Backend.isMatchWorkloadGroupTag(
+                                        tWgInfo.getTag(), beTagSet)) {
+                                    // currently TopicInfo could not contain 
both policy and workload group,
+                                    // so we can remove TopicInfo directly.
+                                    topicIter.remove();
+                                }
+                            }
+                        }
+                    }
+                }
+
+                client.publishTopicInfo(copiedRequest);
                 ok = true;
                 LOG.info("[topic_publish]publish topic info to be {} success, 
time cost={} ms, details:{}",
                         be.getHost(), (System.currentTimeMillis() - 
beginTime), logStr);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java 
b/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java
index b9ba7af89ba..777f20a8ea1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java
@@ -66,6 +66,8 @@ public class Tag implements Writable {
     public static final String VALUE_DEFAULT_TAG = "default";
     public static final String VALUE_INVALID_TAG = "invalid";
 
+    public static final String WORKLOAD_GROUP = "workload_group";
+
     public static final ImmutableSet<String> RESERVED_TAG_TYPE = 
ImmutableSet.of(
             TYPE_ROLE, TYPE_FUNCTION, TYPE_LOCATION);
     public static final ImmutableSet<String> RESERVED_TAG_VALUES = 
ImmutableSet.of(
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
index e5ec2c619b6..8588e10cf34 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
@@ -18,8 +18,10 @@
 package org.apache.doris.resource.workloadgroup;
 
 import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.common.FeNameFormat;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.proc.BaseProcResult;
@@ -29,7 +31,6 @@ import org.apache.doris.thrift.TPipelineWorkloadGroup;
 import org.apache.doris.thrift.TWorkloadGroupInfo;
 import org.apache.doris.thrift.TopicInfo;
 
-import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableSet;
 import com.google.gson.annotations.SerializedName;
 import org.apache.commons.lang3.StringUtils;
@@ -184,9 +185,7 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
             throws DdlException {
         Map<String, String> newProperties = new 
HashMap<>(currentWorkloadGroup.getProperties());
         for (Map.Entry<String, String> kv : updateProperties.entrySet()) {
-            if (!Strings.isNullOrEmpty(kv.getValue())) {
-                newProperties.put(kv.getKey(), kv.getValue());
-            }
+            newProperties.put(kv.getKey(), kv.getValue());
         }
 
         checkProperties(newProperties);
@@ -382,6 +381,18 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
                     + SPILL_THRESHOLD_LOW_WATERMARK + "(" + lowWaterMark + 
")");
         }
 
+        String tagStr = properties.get(TAG);
+        if (!StringUtils.isEmpty(tagStr)) {
+            String[] tagArr = tagStr.split(",");
+            for (String tag : tagArr) {
+                try {
+                    FeNameFormat.checkCommonName("workload group tag name", 
tag);
+                } catch (AnalysisException e) {
+                    throw new DdlException("workload group tag name format is 
illegal, " + tagStr);
+                }
+            }
+        }
+
     }
 
     public long getId() {
@@ -553,6 +564,11 @@ public class WorkloadGroup implements Writable, 
GsonPostProcessable {
             
tWorkloadGroupInfo.setSpillThresholdHighWatermark(Integer.parseInt(spillHighWatermarkStr));
         }
 
+        String tagStr = properties.get(TAG);
+        if (!StringUtils.isEmpty(tagStr)) {
+            tWorkloadGroupInfo.setTag(tagStr);
+        }
+
         TopicInfo topicInfo = new TopicInfo();
         topicInfo.setWorkloadGroupInfo(tWorkloadGroupInfo);
         return topicInfo;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
index 2cfc59dfd2f..b54114c6bf1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
@@ -385,6 +385,9 @@ public class WorkloadGroupMgr extends MasterDaemon 
implements Writable, GsonPost
     public void alterWorkloadGroup(AlterWorkloadGroupStmt stmt) throws 
DdlException {
         String workloadGroupName = stmt.getWorkloadGroupName();
         Map<String, String> properties = stmt.getProperties();
+        if (properties.size() == 0) {
+            throw new DdlException("alter workload group should contain at 
least one property");
+        }
         WorkloadGroup newWorkloadGroup;
         writeLock();
         try {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
index 1e84273678b..91f1624d56d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
@@ -37,7 +37,9 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import com.google.gson.annotations.SerializedName;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -45,9 +47,12 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -834,4 +839,36 @@ public class Backend implements Writable {
         return "{" + new PrintableMap<>(tagMap, ":", true, false).toString() + 
"}";
     }
 
+    public Set<String> getBeWorkloadGroupTagSet() {
+        Set<String> beTagSet = Sets.newHashSet();
+        String beTagStr = this.tagMap.get(Tag.WORKLOAD_GROUP);
+        if (StringUtils.isEmpty(beTagStr)) {
+            return beTagSet;
+        }
+
+        String[] beTagArr = beTagStr.split(",");
+        for (String beTag : beTagArr) {
+            beTagSet.add(beTag.trim());
+        }
+
+        return beTagSet;
+    }
+
+    public static boolean isMatchWorkloadGroupTag(String wgTagStr, Set<String> 
beTagSet) {
+        if (StringUtils.isEmpty(wgTagStr)) {
+            return true;
+        }
+        if (beTagSet.isEmpty()) {
+            return false;
+        }
+
+        String[] wgTagArr = wgTagStr.split(",");
+        Set<String> wgTagSet = new HashSet<>();
+        for (String wgTag : wgTagArr) {
+            wgTagSet.add(wgTag.trim());
+        }
+
+        return !Collections.disjoint(wgTagSet, beTagSet);
+    }
+
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java
index 1e73dc79510..5f1e3565966 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java
@@ -194,15 +194,24 @@ public class WorkloadGroupMgrTest {
         Config.enable_workload_group = true;
         ConnectContext context = new ConnectContext();
         WorkloadGroupMgr workloadGroupMgr = new WorkloadGroupMgr();
-        Map<String, String> properties = Maps.newHashMap();
+        Map<String, String> p0 = Maps.newHashMap();
         String name = "g1";
         try {
-            AlterWorkloadGroupStmt stmt1 = new AlterWorkloadGroupStmt(name, 
properties);
+            AlterWorkloadGroupStmt stmt1 = new AlterWorkloadGroupStmt(name, 
p0);
+            workloadGroupMgr.alterWorkloadGroup(stmt1);
+        } catch (DdlException e) {
+            Assert.assertTrue(e.getMessage().contains("alter workload group 
should contain at least one property"));
+        }
+
+        p0.put(WorkloadGroup.CPU_SHARE, "10");
+        try {
+            AlterWorkloadGroupStmt stmt1 = new AlterWorkloadGroupStmt(name, 
p0);
             workloadGroupMgr.alterWorkloadGroup(stmt1);
         } catch (DdlException e) {
             Assert.assertTrue(e.getMessage().contains("does not exist"));
         }
 
+        Map<String, String> properties = Maps.newHashMap();
         properties.put(WorkloadGroup.CPU_SHARE, "10");
         properties.put(WorkloadGroup.MEMORY_LIMIT, "30%");
         CreateWorkloadGroupStmt createStmt = new 
CreateWorkloadGroupStmt(false, name, properties);
diff --git a/gensrc/thrift/BackendService.thrift 
b/gensrc/thrift/BackendService.thrift
index 12036ef93ce..f80c66dd827 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -182,6 +182,9 @@ struct TWorkloadGroupInfo {
   11: optional i32 min_remote_scan_thread_num
   12: optional i32 spill_threshold_low_watermark
   13: optional i32 spill_threshold_high_watermark
+  14: optional i64 read_bytes_per_second
+  15: optional i64 remote_read_bytes_per_second
+  16: optional string tag
 }
 
 enum TWorkloadMetricType {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to