This is an automated email from the ASF dual-hosted git repository.

wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b6acf53a54 [Improment]Add ComputeGroupMgr (#48187)
6b6acf53a54 is described below

commit 6b6acf53a5483b293dd90e2fdab0b1a4d670fb18
Author: wangbo <wan...@selectdb.com>
AuthorDate: Mon Mar 17 11:23:34 2025 +0800

    [Improment]Add ComputeGroupMgr (#48187)
---
 .../main/java/org/apache/doris/catalog/Env.java    |   8 +
 .../java/org/apache/doris/catalog/OlapTable.java   |  18 +-
 .../doris/datasource/FederationBackendPolicy.java  |  54 +-
 .../org/apache/doris/httpv2/rest/LoadAction.java   |  21 +-
 .../apache/doris/load/loadv2/BrokerLoadJob.java    |  36 ++
 .../doris/load/routineload/RoutineLoadJob.java     |  34 +-
 .../doris/load/routineload/RoutineLoadManager.java |  36 +-
 .../java/org/apache/doris/mysql/MysqlProto.java    |   2 +-
 .../org/apache/doris/mysql/privilege/Auth.java     |   6 +-
 .../apache/doris/mysql/privilege/UserProperty.java |   2 +-
 .../doris/mysql/privilege/UserPropertyMgr.java     |  11 +-
 .../org/apache/doris/planner/OlapScanNode.java     |  16 +-
 .../java/org/apache/doris/qe/ConnectContext.java   |  52 +-
 .../java/org/apache/doris/qe/ConnectProcessor.java |   4 +-
 .../main/java/org/apache/doris/qe/Coordinator.java |   4 +-
 .../computegroup/AllBackendComputeGroup.java       |  63 ++
 .../resource/computegroup/CloudComputeGroup.java   |  36 ++
 .../doris/resource/computegroup/ComputeGroup.java  | 102 ++++
 .../resource/computegroup/ComputeGroupMgr.java     |  59 ++
 .../resource/computegroup/MergedComputeGroup.java  |  66 +++
 .../org/apache/doris/system/BeSelectionPolicy.java |   3 +-
 .../org/apache/doris/system/SystemInfoService.java |  34 +-
 .../org/apache/doris/catalog/OlapTableTest.java    |  10 +-
 .../doris/planner/FederationBackendPolicyTest.java |  43 ++
 .../apache/doris/planner/ResourceTagQueryTest.java |  31 +-
 .../apache/doris/resource/ComputeGroupTest.java    | 636 +++++++++++++++++++++
 26 files changed, 1264 insertions(+), 123 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index f88653ade36..404590a786d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -259,6 +259,7 @@ import org.apache.doris.qe.StmtExecutor;
 import org.apache.doris.qe.VariableMgr;
 import org.apache.doris.resource.AdmissionControl;
 import org.apache.doris.resource.Tag;
+import org.apache.doris.resource.computegroup.ComputeGroupMgr;
 import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
 import org.apache.doris.resource.workloadschedpolicy.WorkloadRuntimeStatusMgr;
 import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyMgr;
@@ -537,6 +538,8 @@ public class Env {
 
     private WorkloadGroupMgr workloadGroupMgr;
 
+    private ComputeGroupMgr computeGroupMgr;
+
     private WorkloadSchedPolicyMgr workloadSchedPolicyMgr;
 
     private WorkloadRuntimeStatusMgr workloadRuntimeStatusMgr;
@@ -821,6 +824,7 @@ public class Env {
         this.statisticsJobAppender = new StatisticsJobAppender();
         this.globalFunctionMgr = new GlobalFunctionMgr();
         this.workloadGroupMgr = new WorkloadGroupMgr();
+        this.computeGroupMgr = new ComputeGroupMgr(systemInfo);
         this.workloadSchedPolicyMgr = new WorkloadSchedPolicyMgr();
         this.workloadRuntimeStatusMgr = new WorkloadRuntimeStatusMgr();
         this.admissionControl = new AdmissionControl(systemInfo);
@@ -964,6 +968,10 @@ public class Env {
         return auditEventProcessor;
     }
 
+    public ComputeGroupMgr getComputeGroupMgr() {
+        return computeGroupMgr;
+    }
+
     public WorkloadGroupMgr getWorkloadGroupMgr() {
         return workloadGroupMgr;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 868909dc9b8..060f77aca84 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -66,6 +66,7 @@ import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.OriginStatement;
 import org.apache.doris.resource.Tag;
+import org.apache.doris.resource.computegroup.ComputeGroup;
 import org.apache.doris.rpc.RpcException;
 import org.apache.doris.statistics.AnalysisInfo;
 import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
@@ -114,7 +115,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -3105,19 +3105,23 @@ public class OlapTable extends Table implements 
MTMVRelatedTableIf, GsonPostProc
         fetchOption.setFetchRowStore(useStoreRow);
         fetchOption.setUseTwoPhaseFetch(true);
 
-        // get backend by tag
-        Set<Tag> tagSet = new HashSet<>();
         ConnectContext context = ConnectContext.get();
-        if (context != null) {
-            tagSet = context.getResourceTags();
+        if (context == null) {
+            context = new ConnectContext();
         }
         BeSelectionPolicy policy = new BeSelectionPolicy.Builder()
                 .needQueryAvailable()
                 .setRequireAliveBe()
-                .addTags(tagSet)
                 .build();
+
         TPaloNodesInfo nodesInfo = new TPaloNodesInfo();
-        for (Backend backend : 
Env.getCurrentSystemInfo().getBackendsByPolicy(policy)) {
+        ComputeGroup computeGroup = context.getComputeGroupSafely();
+
+        if (ComputeGroup.INVALID_COMPUTE_GROUP.equals(computeGroup)) {
+            throw new 
RuntimeException(ComputeGroup.INVALID_COMPUTE_GROUP_ERR_MSG);
+        }
+
+        for (Backend backend : 
policy.getCandidateBackends(computeGroup.getBackendList())) {
             nodesInfo.addToNodes(new TNodeInfo(backend.getId(), 0, 
backend.getHost(), backend.getBrpcPort()));
         }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
index 1e1787c1f64..288b55448af 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
@@ -20,15 +20,15 @@
 
 package org.apache.doris.datasource;
 
-import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.IndexedPriorityQueue;
+import org.apache.doris.common.LoadException;
 import org.apache.doris.common.ResettableRandomizedIterator;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.ConsistentHash;
-import org.apache.doris.mysql.privilege.UserProperty;
 import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.resource.Tag;
+import org.apache.doris.resource.computegroup.ComputeGroup;
 import org.apache.doris.spi.Split;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.BeSelectionPolicy;
@@ -36,7 +36,6 @@ import org.apache.doris.system.SystemInfoService;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
@@ -46,7 +45,6 @@ import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
 import com.google.common.hash.Funnel;
 import com.google.common.hash.Hashing;
 import com.google.common.hash.PrimitiveSink;
@@ -154,41 +152,35 @@ public class FederationBackendPolicy {
     }
 
     public void init(List<String> preLocations) throws UserException {
-        Set<Tag> tags = Sets.newHashSet();
-        if (ConnectContext.get() != null && 
ConnectContext.get().getCurrentUserIdentity() != null) {
-            String qualifiedUser = 
ConnectContext.get().getCurrentUserIdentity().getQualifiedUser();
-            // Some request from stream load(eg, mysql load) may not set user 
info in ConnectContext
-            // just ignore it.
-            if (!Strings.isNullOrEmpty(qualifiedUser)) {
-                tags = 
Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser);
-                if (tags == UserProperty.INVALID_RESOURCE_TAGS) {
-                    throw new UserException("No valid resource tag for user: " 
+ qualifiedUser);
-                }
-            }
-        } else {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("user info in ExternalFileScanNode should not be 
null, add log to observer");
-            }
-        }
-
         // scan node is used for query
-        BeSelectionPolicy policy = new BeSelectionPolicy.Builder()
-                .needQueryAvailable()
+        BeSelectionPolicy.Builder builder = new BeSelectionPolicy.Builder();
+        builder.needQueryAvailable()
                 .needLoadAvailable()
-                .addTags(tags)
                 
.preferComputeNode(Config.prefer_compute_node_for_external_table)
                 .assignExpectBeNum(Config.min_backend_num_for_external_table)
-                .addPreLocations(preLocations)
-                .build();
-        init(policy);
+                .addPreLocations(preLocations);
+        init(builder.build());
     }
 
     public void init(BeSelectionPolicy policy) throws UserException {
-        backends.addAll(policy.getCandidateBackends(Env.getCurrentSystemInfo()
-                .getBackendsByCurrentCluster().values().asList()));
+        ConnectContext ctx = ConnectContext.get();
+        if (ctx == null) {
+            if (Config.isCloudMode()) {
+                throw new AnalysisException("ConnectContext is null");
+            } else {
+                ctx = new ConnectContext();
+            }
+
+        }
+        ComputeGroup computeGroup = ctx.getComputeGroup();
+        if (Config.isNotCloudMode() && 
computeGroup.equals(ComputeGroup.INVALID_COMPUTE_GROUP)) {
+            throw new 
LoadException(ComputeGroup.INVALID_COMPUTE_GROUP_ERR_MSG);
+        }
+
+        
backends.addAll(policy.getCandidateBackends(computeGroup.getBackendList()));
         if (backends.isEmpty()) {
             throw new UserException("No available backends, "
-                + "in cloud maybe this cluster has been dropped, please `use 
@otherClusterName` switch it");
+                    + "in cloud maybe this cluster has been dropped, please 
`use @otherClusterName` switch it");
         }
         for (Backend backend : backends) {
             assignedWeightPerBackend.put(backend, 0L);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
index 48776da1569..8401eb32ab5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
@@ -47,7 +47,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.planner.GroupCommitPlanner;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.SessionVariable;
-import org.apache.doris.resource.Tag;
+import org.apache.doris.resource.computegroup.ComputeGroup;
 import org.apache.doris.service.ExecuteEnv;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.BeSelectionPolicy;
@@ -439,19 +439,16 @@ public class LoadAction extends RestBaseController {
             throws LoadException {
         Backend backend = null;
         BeSelectionPolicy policy = null;
-        String qualifiedUser = ConnectContext.get().getQualifiedUser();
-        Set<Tag> userTags = 
Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser);
-        policy = new BeSelectionPolicy.Builder()
-                .addTags(userTags)
-                .setEnableRoundRobin(true)
-                .needLoadAvailable().build();
+        ConnectContext ctx = ConnectContext.get();
+        if (ctx == null) {
+            throw new LoadException("ConnectContext should not be null");
+        }
+        ComputeGroup computeGroup = ctx.getComputeGroupSafely();
+        policy = new 
BeSelectionPolicy.Builder().setEnableRoundRobin(true).needLoadAvailable().build();
         policy.nextRoundRobinIndex = getLastSelectedBackendIndexAndUpdate();
         List<Long> backendIds;
-        if (groupCommit) {
-            backendIds = 
Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, -1);
-        } else {
-            backendIds = 
Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
-        }
+        int number = groupCommit ? -1 : 1;
+        backendIds = 
Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, number, 
computeGroup.getBackendList());
         if (backendIds.isEmpty()) {
             throw new 
LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + 
policy);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index 45ad61a1e30..dab9ee32624 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -52,6 +52,7 @@ import org.apache.doris.load.FailMsg;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.OriginStatement;
 import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.resource.computegroup.ComputeGroup;
 import org.apache.doris.service.ExecuteEnv;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.thrift.TStatusCode;
@@ -63,6 +64,7 @@ import 
org.apache.doris.transaction.TransactionState.TxnSourceType;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -219,6 +221,37 @@ public class BrokerLoadJob extends BulkLoadJob {
         loadStartTimestamp = System.currentTimeMillis();
     }
 
+    // make public for UT
+    public void setComputeGroup() {
+        ComputeGroup computeGroup = null;
+        ConnectContext context = ConnectContext.get();
+        try {
+            if (context == null) {
+                context = new ConnectContext();
+                context.setThreadLocalInfo();
+            }
+
+            String currentUser = getUserInfo().getQualifiedUser();
+            // user is null or get an invalid compute group should not be 
normal case,
+            // broker load job can get all backends when meets it.
+            if (StringUtils.isEmpty(currentUser)) {
+                computeGroup = 
Env.getCurrentEnv().getComputeGroupMgr().getAllBackendComputeGroup();
+                LOG.warn("can not find user in broker load, then skip compute 
group.");
+            } else {
+                computeGroup = 
Env.getCurrentEnv().getAuth().getComputeGroup(currentUser);
+                if (ComputeGroup.INVALID_COMPUTE_GROUP.equals(computeGroup)) {
+                    LOG.warn("get an invalid compute group in broker load 
job.");
+                    computeGroup = 
Env.getCurrentEnv().getComputeGroupMgr().getAllBackendComputeGroup();
+                }
+            }
+        } catch (Throwable t) {
+            LOG.warn("error happens when set compute group for broker load", 
t);
+            computeGroup = 
Env.getCurrentEnv().getComputeGroupMgr().getAllBackendComputeGroup();
+        }
+
+        context.setComputeGroup(computeGroup);
+    }
+
     protected LoadLoadingTask createTask(Database db, OlapTable table, 
List<BrokerFileGroup> brokerFileGroups,
             boolean isEnableMemtableOnSinkNode, int batchSize, FileGroupAggKey 
aggKey,
             BrokerPendingTaskAttachment attachment) throws UserException {
@@ -231,6 +264,9 @@ public class BrokerLoadJob extends BulkLoadJob {
 
         UUID uuid = UUID.randomUUID();
         TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), 
uuid.getLeastSignificantBits());
+
+        setComputeGroup();
+
         task.init(loadId, attachment.getFileStatusByTable(aggKey),
                 attachment.getFileNumByTable(aggKey), getUserInfo());
         task.settWorkloadGroups(tWorkloadGroups);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 859d5df43e6..b30e100a38f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -61,6 +61,7 @@ import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.OriginStatement;
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.qe.SqlModeHelper;
+import org.apache.doris.resource.computegroup.ComputeGroup;
 import org.apache.doris.task.LoadTaskInfo;
 import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TFileType;
@@ -71,7 +72,6 @@ import org.apache.doris.transaction.TransactionException;
 import org.apache.doris.transaction.TransactionState;
 import org.apache.doris.transaction.TransactionStatus;
 
-import com.aliyuncs.utils.StringUtils;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
@@ -83,6 +83,7 @@ import com.google.gson.GsonBuilder;
 import com.google.gson.annotations.SerializedName;
 import lombok.Getter;
 import lombok.Setter;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -1013,6 +1014,35 @@ public abstract class RoutineLoadJob
     // derived class can override this.
     public abstract void prepare() throws UserException;
 
+    // make this public here just for UT.
+    public void setComputeGroup() {
+        ComputeGroup computeGroup = null;
+        try {
+            if (ConnectContext.get() == null) {
+                ConnectContext ctx = new ConnectContext();
+                ctx.setThreadLocalInfo();
+            }
+            String currentUser = ConnectContext.get().getQualifiedUser();
+            if (StringUtils.isEmpty(currentUser)) {
+                currentUser = getUserIdentity().getQualifiedUser();
+            }
+            if (StringUtils.isEmpty(currentUser)) {
+                LOG.warn("can not find user in routine load");
+                computeGroup = 
Env.getCurrentEnv().getComputeGroupMgr().getAllBackendComputeGroup();
+            } else {
+                computeGroup = 
Env.getCurrentEnv().getAuth().getComputeGroup(currentUser);
+            }
+            if (ComputeGroup.INVALID_COMPUTE_GROUP.equals(computeGroup)) {
+                LOG.warn("get an invalid compute group in routine load");
+                computeGroup = 
Env.getCurrentEnv().getComputeGroupMgr().getAllBackendComputeGroup();
+            }
+        } catch (Throwable t) {
+            LOG.warn("error happens when set compute group for routine load", 
t);
+            computeGroup = 
Env.getCurrentEnv().getComputeGroupMgr().getAllBackendComputeGroup();
+        }
+        ConnectContext.get().setComputeGroup(computeGroup);
+    }
+
     public TPipelineFragmentParams plan(StreamLoadPlanner planner, TUniqueId 
loadId, long txnId) throws UserException {
         Preconditions.checkNotNull(planner);
         Database db = 
Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
@@ -1037,6 +1067,8 @@ public abstract class RoutineLoadJob
                 } else {
                     ConnectContext.get().setCloudCluster(clusterName);
                 }
+            } else {
+                setComputeGroup();
             }
 
             TPipelineFragmentParams planParams = planner.plan(loadId);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index bfe42ad7695..3bb25e85619 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -44,11 +44,11 @@ import org.apache.doris.common.util.LogBuilder;
 import org.apache.doris.common.util.LogKey;
 import org.apache.doris.datasource.InternalCatalog;
 import org.apache.doris.mysql.privilege.PrivPredicate;
-import org.apache.doris.mysql.privilege.UserProperty;
 import org.apache.doris.persist.AlterRoutineLoadJobOperationLog;
 import org.apache.doris.persist.RoutineLoadOperation;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.resource.Tag;
+import org.apache.doris.resource.computegroup.ComputeGroup;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.BeSelectionPolicy;
 
@@ -553,6 +553,11 @@ public class RoutineLoadManager implements Writable {
         }
     }
 
+    // just for UT
+    public List<Long> getAvailableBackendIdsForUt(long jobId) throws 
LoadException {
+        return getAvailableBackendIds(jobId);
+    }
+
     /**
      * The routine load task can only be scheduled on backends which has 
proper resource tags.
      * The tags should be got from user property.
@@ -565,24 +570,43 @@ public class RoutineLoadManager implements Writable {
      * @throws LoadException
      */
     protected List<Long> getAvailableBackendIds(long jobId) throws 
LoadException {
+        // Usually Cloud node could not reach here(refer 
CloudRoutineLoadManager.getAvailableBackendIds),
+        // check cloud mode here is just to be on the safe side.
+        if (Config.isCloudMode()) {
+            throw new LoadException("cloud mode should not reach here");
+        }
+
         RoutineLoadJob job = getJob(jobId);
         if (job == null) {
             throw new LoadException("job " + jobId + " does not exist");
         }
-        Set<Tag> tags;
+        Set<Tag> tags = null;
+        ComputeGroup computeGroup = null;
         if (job.getUserIdentity() == null) {
             // For old job, there may be no user info. So we have to use tags 
from replica allocation
             tags = getTagsFromReplicaAllocation(job.getDbId(), 
job.getTableId());
+            BeSelectionPolicy policy = new 
BeSelectionPolicy.Builder().addTags(tags).needLoadAvailable().build();
+            return Env.getCurrentSystemInfo()
+                    .selectBackendIdsByPolicy(policy, -1 /* as many as 
possible */);
         } else {
-            tags = 
Env.getCurrentEnv().getAuth().getResourceTags(job.getUserIdentity().getQualifiedUser());
-            if (tags == UserProperty.INVALID_RESOURCE_TAGS) {
+            computeGroup = 
Env.getCurrentEnv().getAuth().getComputeGroup(job.getUserIdentity().getQualifiedUser());
+            if (ComputeGroup.INVALID_COMPUTE_GROUP.equals(computeGroup)) {
                 // user may be dropped, or may not set resource tag property.
                 // Here we fall back to use replica tag
                 tags = getTagsFromReplicaAllocation(job.getDbId(), 
job.getTableId());
             }
+
+            if (computeGroup != null && 
!ComputeGroup.INVALID_COMPUTE_GROUP.equals(computeGroup)) {
+                BeSelectionPolicy policy = new 
BeSelectionPolicy.Builder().needLoadAvailable().build();
+                return Env.getCurrentSystemInfo()
+                        .selectBackendIdsByPolicy(policy, -1 /* as many as 
possible */,
+                                computeGroup.getBackendList());
+            } else {
+                BeSelectionPolicy policy = new 
BeSelectionPolicy.Builder().addTags(tags).needLoadAvailable().build();
+                return Env.getCurrentSystemInfo()
+                        .selectBackendIdsByPolicy(policy, -1 /* as many as 
possible */);
+            }
         }
-        BeSelectionPolicy policy = new 
BeSelectionPolicy.Builder().needLoadAvailable().addTags(tags).build();
-        return Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, -1 
/* as many as possible */);
     }
 
     private Set<Tag> getTagsFromReplicaAllocation(long dbId, long tblId) 
throws LoadException {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java 
b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java
index a672a217a33..92154779661 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java
@@ -266,7 +266,7 @@ public class MysqlProto {
         }
 
         // set resource tag if has
-        
context.setResourceTags(Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser));
+        
context.setComputeGroup(Env.getCurrentEnv().getAuth().getComputeGroup(qualifiedUser));
         return true;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java 
b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java
index 5d3324bca2b..590b4970be7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java
@@ -66,7 +66,7 @@ import org.apache.doris.persist.AlterUserOperationLog;
 import org.apache.doris.persist.LdapInfo;
 import org.apache.doris.persist.PrivInfo;
 import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.resource.Tag;
+import org.apache.doris.resource.computegroup.ComputeGroup;
 import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
 import org.apache.doris.thrift.TPrivilegeStatus;
 
@@ -1229,10 +1229,10 @@ public class Auth implements Writable {
         }
     }
 
-    public Set<Tag> getResourceTags(String qualifiedUser) {
+    public ComputeGroup getComputeGroup(String qualifiedUser) {
         readLock();
         try {
-            return propertyMgr.getResourceTags(qualifiedUser);
+            return propertyMgr.getComputeGroup(qualifiedUser);
         } finally {
             readUnlock();
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java 
b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java
index a637fb6c182..01f8fac9dec 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java
@@ -259,7 +259,7 @@ public class UserProperty implements Writable {
                 }
 
                 newDefaultLoadCluster = value;
-            }  else if (keyArr[0].equalsIgnoreCase(DEFAULT_CLOUD_CLUSTER)) {
+            } else if (keyArr[0].equalsIgnoreCase(DEFAULT_CLOUD_CLUSTER)) {
                 newDefaultCloudCluster = checkCloudDefaultCluster(keyArr, 
value, DEFAULT_CLOUD_CLUSTER, isReplay);
             } else if (keyArr[0].equalsIgnoreCase(DEFAULT_COMPUTE_GROUP)) {
                 newDefaultCloudCluster = checkCloudDefaultCluster(keyArr, 
value, DEFAULT_COMPUTE_GROUP, isReplay);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyMgr.java
index 29ae1f438a1..67415abb7a5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyMgr.java
@@ -32,6 +32,7 @@ import org.apache.doris.load.DppConfig;
 import org.apache.doris.mysql.authenticate.AuthenticateType;
 import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.resource.Tag;
+import org.apache.doris.resource.computegroup.ComputeGroup;
 
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -161,11 +162,11 @@ public class UserPropertyMgr implements Writable {
         return existProperty.getParallelFragmentExecInstanceNum();
     }
 
-    public Set<Tag> getResourceTags(String qualifiedUser) {
+    public ComputeGroup getComputeGroup(String qualifiedUser) {
         UserProperty existProperty = propertyMap.get(qualifiedUser);
         existProperty = getPropertyIfNull(qualifiedUser, existProperty);
         if (existProperty == null) {
-            return UserProperty.INVALID_RESOURCE_TAGS;
+            return ComputeGroup.INVALID_COMPUTE_GROUP;
         }
         Set<Tag> tags = existProperty.getCopiedResourceTags();
         // only root and admin can return empty tag.
@@ -176,7 +177,11 @@ public class UserPropertyMgr implements Writable {
                 && Config.force_olap_table_replication_allocation.isEmpty()) {
             tags = Sets.newHashSet(Tag.DEFAULT_BACKEND_TAG);
         }
-        return tags;
+        if (!tags.isEmpty()) {
+            return 
Env.getCurrentEnv().getComputeGroupMgr().getComputeGroup(tags);
+        } else {
+            return 
Env.getCurrentEnv().getComputeGroupMgr().getAllBackendComputeGroup();
+        }
     }
 
     public Pair<String, DppConfig> getLoadClusterInfo(String qualifiedUser, 
String cluster) throws DdlException {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 5a1aaec5085..5e8c8e5df67 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -70,7 +70,7 @@ import 
org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
 import org.apache.doris.planner.normalize.Normalizer;
 import org.apache.doris.planner.normalize.PartitionRangePredicateNormalizer;
 import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.resource.Tag;
+import org.apache.doris.resource.computegroup.ComputeGroup;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.statistics.StatsDeriveResult;
 import org.apache.doris.statistics.StatsRecursiveDerive;
@@ -757,14 +757,12 @@ public class OlapScanNode extends ScanNode {
         }
         String visibleVersionStr = String.valueOf(visibleVersion);
 
-        Set<Tag> allowedTags = Sets.newHashSet();
         int useFixReplica = -1;
-        boolean needCheckTags = false;
         boolean skipMissingVersion = false;
         ConnectContext context = ConnectContext.get();
+        ComputeGroup computeGroup = null;
         if (context != null) {
-            allowedTags = context.getResourceTags();
-            needCheckTags = context.isResourceTagsSet();
+            computeGroup = context.getComputeGroupSafely();
             useFixReplica = context.getSessionVariable().useFixReplica;
             if (useFixReplica == -1
                     && context.getState().isNereids() && 
context.getSessionVariable().getEnableQueryCache()) {
@@ -915,10 +913,12 @@ public class OlapScanNode extends ScanNode {
                 if (!backend.isMixNode()) {
                     continue;
                 }
-                if (needCheckTags && !allowedTags.isEmpty() && 
!allowedTags.contains(backend.getLocationTag())) {
+                String beTagName = backend.getLocationTag().value;
+                if ((ComputeGroup.INVALID_COMPUTE_GROUP.equals(computeGroup)) 
|| (computeGroup != null
+                        && !Config.isCloudMode() && 
!computeGroup.containsBackend(beTagName))) {
                     String err = String.format(
-                            "Replica on backend %d with tag %s," + " which is 
not in user's resource tags: %s",
-                            backend.getId(), backend.getLocationTag(), 
allowedTags);
+                            "Replica on backend %d with tag %s," + " which is 
not in user's resource tag: %s",
+                            backend.getId(), beTagName, 
computeGroup.toString());
                     if (LOG.isDebugEnabled()) {
                         LOG.debug(err);
                     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 14d7d03b0f5..dc3e5841ef2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -64,6 +64,8 @@ import org.apache.doris.plsql.Exec;
 import org.apache.doris.plsql.executor.PlSqlOperation;
 import org.apache.doris.plugin.AuditEvent.AuditEventBuilder;
 import org.apache.doris.resource.Tag;
+import org.apache.doris.resource.computegroup.ComputeGroup;
+import org.apache.doris.resource.computegroup.ComputeGroupMgr;
 import org.apache.doris.service.arrowflight.results.FlightSqlChannel;
 import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation;
 import org.apache.doris.statistics.ColumnStatistic;
@@ -79,7 +81,6 @@ import org.apache.doris.transaction.TransactionStatus;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 import io.netty.util.concurrent.FastThreadLocal;
 import lombok.Getter;
 import lombok.Setter;
@@ -211,14 +212,11 @@ public class ConnectContext {
 
     // If set to true, the nondeterministic function will not be rewrote to 
constant.
     private boolean notEvalNondeterministicFunction = false;
-    // The resource tag is used to limit the node resources that the user can 
use for query.
+    // The compute group tag is used to limit the node resources that the user 
can use for query.
     // The default is empty, that is, unlimited.
     // This property is obtained from UserProperty when the client connection 
is created.
     // Only when the connection is created again, the new resource tags will 
be retrieved from the UserProperty
-    private Set<Tag> resourceTags = Sets.newHashSet();
-    // If set to true, the resource tags set in resourceTags will be used to 
limit the query resources.
-    // If set to false, the system will not restrict query resources.
-    private boolean isResourceTagsSet = false;
+    private ComputeGroup computeGroup = null;
 
     private PlSqlOperation plSqlOperation = null;
 
@@ -1086,17 +1084,12 @@ public class ConnectContext {
         return threadInfo;
     }
 
-    public boolean isResourceTagsSet() {
-        return isResourceTagsSet;
+    public boolean isSetComputeGroup() {
+        return computeGroup != null;
     }
 
-    public Set<Tag> getResourceTags() {
-        return resourceTags;
-    }
-
-    public void setResourceTags(Set<Tag> resourceTags) {
-        this.resourceTags = resourceTags;
-        this.isResourceTagsSet = !this.resourceTags.isEmpty();
+    public void setComputeGroup(ComputeGroup computeGroup) {
+        this.computeGroup = computeGroup;
     }
 
     public void setCurrentConnectedFEIp(String ip) {
@@ -1360,6 +1353,35 @@ public class ConnectContext {
             : new CloudClusterResult(hasAuthCluster.get(0), 
CloudClusterResult.Comment.FOUND_BY_FRIST_CLUSTER_HAS_AUTH);
     }
 
+    public ComputeGroup getComputeGroupSafely() {
+        try {
+            return getComputeGroup();
+        } catch (UserException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public ComputeGroup getComputeGroup() throws UserException {
+        ComputeGroupMgr cgMgr = Env.getCurrentEnv().getComputeGroupMgr();
+        if (Config.isCloudMode()) {
+            return cgMgr.getComputeGroupByName(getCurrentCloudCluster());
+        } else {
+            // In order to be compatible with resource tag's old logic,
+            // when a user login in FE by mysql client, then its tags are set 
in ConnectContext which
+            // means isSetComputeGroup = true
+            if (this.isSetComputeGroup()) {
+                return computeGroup;
+            } else {
+                String currentUser = getQualifiedUser();
+                if (!StringUtils.isEmpty(currentUser)) {
+                    return 
Env.getCurrentEnv().getAuth().getComputeGroup(currentUser);
+                } else {
+                    return 
Env.getCurrentEnv().getComputeGroupMgr().getComputeGroupByName(Tag.VALUE_DEFAULT_TAG);
+                }
+            }
+        }
+    }
+
     /**
      * Tries to choose an available cluster in the following order
      * 1. Do nothing if a cluster has been chosen for current session. It may 
be
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index a415f2dccc3..2cbdd92f7e4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -622,8 +622,8 @@ public abstract class ConnectProcessor {
             
ctx.setUserVars(userVariableFromThrift(request.getUserVariables()));
         }
 
-        // set resource tag
-        
ctx.setResourceTags(Env.getCurrentEnv().getAuth().getResourceTags(ctx.qualifiedUser));
+        // set compute group
+        
ctx.setComputeGroup(Env.getCurrentEnv().getAuth().getComputeGroup(ctx.qualifiedUser));
 
         ctx.setThreadLocalInfo();
         StmtExecutor executor = null;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 795e24b4704..110385ed382 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1740,7 +1740,7 @@ public class Coordinator implements CoordInterface {
                 TNetworkAddress execHostport;
                 if (groupCommitBackend != null) {
                     execHostport = getGroupCommitBackend(addressToBackendID);
-                } else if (((ConnectContext.get() != null && 
ConnectContext.get().isResourceTagsSet()) || (
+                } else if (((ConnectContext.get() != null && 
ConnectContext.get().isSetComputeGroup()) || (
                         isAllExternalScan
                                 && 
Config.prefer_compute_node_for_external_table)) && 
!addressToBackendID.isEmpty()) {
                     // 2 cases:
@@ -1932,7 +1932,7 @@ public class Coordinator implements CoordInterface {
                 TNetworkAddress execHostport;
                 if (groupCommitBackend != null) {
                     execHostport = getGroupCommitBackend(addressToBackendID);
-                } else if (ConnectContext.get() != null && 
ConnectContext.get().isResourceTagsSet()
+                } else if (ConnectContext.get() != null && 
ConnectContext.get().isSetComputeGroup()
                         && !addressToBackendID.isEmpty()) {
                     // In this case, we only use the BE where the replica 
selected by the tag is located to
                     // execute this query. Otherwise, except for the scan 
node, the rest of the execution nodes
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/computegroup/AllBackendComputeGroup.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/computegroup/AllBackendComputeGroup.java
new file mode 100644
index 00000000000..665fd9601b0
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/computegroup/AllBackendComputeGroup.java
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.resource.computegroup;
+
+import org.apache.doris.common.Config;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
+
+import com.google.common.collect.ImmutableList;
+
+public class AllBackendComputeGroup extends ComputeGroup {
+
+    public AllBackendComputeGroup(SystemInfoService systemInfoService) {
+        super(AllBackendComputeGroup.class.getSimpleName(), 
AllBackendComputeGroup.class.getSimpleName(),
+                systemInfoService);
+    }
+
+    @Override
+    public boolean containsBackend(String beTag) {
+        if (Config.isCloudMode()) {
+            throw new RuntimeException("AllBackendComputeGroup not implements 
containsBackend in cloud mode.");
+        }
+        // currently AllBackendComputeGroup is used when admin/root user not 
specifies a resource tag,
+        // then they can get all backends
+        return true;
+    }
+
+    @Override
+    public String getId() {
+        throw new RuntimeException("AllBackendComputeGroup not implements 
getId.");
+    }
+
+    @Override
+    public String getName() {
+        throw new RuntimeException("AllBackendComputeGroup not implements 
getName.");
+    }
+
+    @Override
+    public ImmutableList<Backend> getBackendList() {
+        return 
systemInfoService.getAllClusterBackendsNoException().values().asList();
+    }
+
+    @Override
+    public String toString() {
+        return AllBackendComputeGroup.class.getSimpleName();
+    }
+
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/computegroup/CloudComputeGroup.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/computegroup/CloudComputeGroup.java
new file mode 100644
index 00000000000..8d2a670d170
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/computegroup/CloudComputeGroup.java
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.resource.computegroup;
+
+import org.apache.doris.cloud.system.CloudSystemInfoService;
+import org.apache.doris.system.Backend;
+
+import java.util.List;
+
+public class CloudComputeGroup extends ComputeGroup {
+
+    public CloudComputeGroup(String id, String name, CloudSystemInfoService 
systemInfoService) {
+        super(id, name, systemInfoService);
+    }
+
+    @Override
+    public List<Backend> getBackendList() {
+        return ((CloudSystemInfoService) 
(super.systemInfoService)).getBackendsByClusterName(name);
+    }
+
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/computegroup/ComputeGroup.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/computegroup/ComputeGroup.java
new file mode 100644
index 00000000000..8a9fd7665a8
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/computegroup/ComputeGroup.java
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.resource.computegroup;
+
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class ComputeGroup {
+
+    private static final Logger LOG = LogManager.getLogger(ComputeGroup.class);
+
+    // NOTE: Why need an invalid compute group
+    // invalid compute group is mainly used in local-mode(not cloud mode),
+    // it comes from invalid resource tag(UserPropertyMgr.getComputeGroup), 
when trying to find a resource tag
+    // by user, but the user has not a property(maybe user is invalid),
+    // so introduce an invalid compute group to cover this case.
+    // caller should deal the invalid compute case by case;
+    // eg, if routine load cant not find a valid compute group when finding 
Slot BE, it could find BE by replica's tag.
+    // some other case may throw an Exception.
+    public static final String INVALID_COMPUTE_GROUP_NAME = 
"_invalid_compute_group_name";
+
+    public static final ComputeGroup INVALID_COMPUTE_GROUP = new 
ComputeGroup(INVALID_COMPUTE_GROUP_NAME,
+            INVALID_COMPUTE_GROUP_NAME, null);
+
+    public static final String INVALID_COMPUTE_GROUP_ERR_MSG
+            = "can not find a valid compute group, please check whether 
current user is valid.";
+
+    protected SystemInfoService systemInfoService;
+
+    protected String id;
+
+    protected String name;
+
+    public ComputeGroup(String id, String name, SystemInfoService 
systemInfoService) {
+        this.id = id;
+        this.name = name;
+        this.systemInfoService = systemInfoService;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public boolean containsBackend(String backendTag) {
+        checkInvalidComputeGroup();
+        return StringUtils.equals(this.name, backendTag);
+    }
+
+    public List<Backend> getBackendList() {
+        checkInvalidComputeGroup();
+        Set<String> cgSet = new HashSet<>();
+        cgSet.add(name);
+        return systemInfoService.getBackendListByComputeGroup(cgSet);
+    }
+
+    @Override
+    public String toString() {
+        return String.format("%s id=%s, name=%s", 
this.getClass().getSimpleName(), id, name);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        // NOTE: currently equals is used to compare INVALID_COMPUTE_GROUP, 
just using ```==``` is enough.
+        return (this == obj);
+    }
+
+    private void checkInvalidComputeGroup() {
+        if (this.getName() == INVALID_COMPUTE_GROUP_NAME) {
+            throw new RuntimeException(
+                    "invalid compute group can not be used, please check 
whether current user is valid.");
+        }
+    }
+
+}
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/computegroup/ComputeGroupMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/computegroup/ComputeGroupMgr.java
new file mode 100644
index 00000000000..167b6cc5040
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/computegroup/ComputeGroupMgr.java
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.resource.computegroup;
+
+import org.apache.doris.cloud.system.CloudSystemInfoService;
+import org.apache.doris.common.Config;
+import org.apache.doris.resource.Tag;
+import org.apache.doris.system.SystemInfoService;
+
+import com.google.common.collect.Sets;
+
+import java.util.Set;
+
+public class ComputeGroupMgr {
+
+    private SystemInfoService systemInfoService;
+
+    public ComputeGroupMgr(SystemInfoService systemInfoService) {
+        this.systemInfoService = systemInfoService;
+    }
+
+    public ComputeGroup getComputeGroupByName(String name) {
+        if (Config.isCloudMode()) {
+            return new CloudComputeGroup("", name, (CloudSystemInfoService) 
systemInfoService);
+        } else {
+            return new ComputeGroup("", name, systemInfoService);
+        }
+    }
+
+    public ComputeGroup getComputeGroup(Set<Tag> rgTags) {
+        Set<String> tagStrSet = Sets.newHashSet();
+        for (Tag tag : rgTags) {
+            tagStrSet.add(tag.value);
+        }
+        return new MergedComputeGroup(tagStrSet, systemInfoService);
+    }
+
+    // to be compatible with resource tag's logic, if root/admin user not 
specify a resource tag,
+    // which means return all backends.
+    public ComputeGroup getAllBackendComputeGroup() {
+        return new AllBackendComputeGroup(systemInfoService);
+    }
+
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/computegroup/MergedComputeGroup.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/computegroup/MergedComputeGroup.java
new file mode 100644
index 00000000000..bebc8a37bdd
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/computegroup/MergedComputeGroup.java
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.resource.computegroup;
+
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.Set;
+
+public class MergedComputeGroup extends ComputeGroup {
+
+    private Set<String> computeGroupSet;
+
+    public MergedComputeGroup(Set<String> computeGroupSet, SystemInfoService 
systemInfoService) {
+        super(MergedComputeGroup.class.getSimpleName(), 
MergedComputeGroup.class.getSimpleName(), systemInfoService);
+        this.computeGroupSet = computeGroupSet;
+    }
+
+    @Override
+    public boolean containsBackend(String computeGroupName) {
+        return computeGroupSet.contains(computeGroupName);
+    }
+
+    @Override
+    public ImmutableList<Backend> getBackendList() {
+        return systemInfoService.getBackendListByComputeGroup(computeGroupSet);
+    }
+
+    @Override
+    public String getId() {
+        throw new RuntimeException("MergedComputeGroup not implements getId.");
+    }
+
+    @Override
+    public String getName() {
+        throw new RuntimeException("MergedComputeGroup not implements 
getName.");
+    }
+
+    // current main for UT
+    public Set<String> getComputeGroupNameSet() {
+        return computeGroupSet;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("%s %s", 
MergedComputeGroup.class.getSimpleName(), String.join(",", computeGroupSet));
+    }
+
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java
index 0d72c40cf3f..384b126aba0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java
@@ -185,7 +185,8 @@ public class BeSelectionPolicy {
     }
 
     public List<Backend> getCandidateBackends(Collection<Backend> backends) {
-        List<Backend> filterBackends = 
backends.stream().filter(this::isMatch).collect(Collectors.toList());
+        List<Backend> filterBackends = backends.stream().filter(this::isMatch)
+                .collect(Collectors.toList());
         List<Backend> preLocationFilterBackends = filterBackends.stream()
                 .filter(iterm -> 
preferredLocations.contains(iterm.getHost())).collect(Collectors.toList());
         // If preLocations were chosen, use the preLocation backends. 
Otherwise we just ignore this filter.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
index 3d0df46ca69..20e625ad239 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
@@ -41,6 +41,7 @@ import org.apache.doris.thrift.TStorageMedium;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -566,17 +567,24 @@ public class SystemInfoService {
         return sb.toString();
     }
 
+    public List<Long> selectBackendIdsByPolicy(BeSelectionPolicy policy, int 
number) {
+        return selectBackendIdsByPolicy(policy, number, 
getAllClusterBackendsNoException().values().asList());
+    }
+
     /**
      * Select a set of backends by the given policy.
      *
      * @param policy if policy is enableRoundRobin, will update its 
nextRoundRobinIndex
      * @param number number of backends which need to be selected. -1 means 
return as many as possible.
      * @return return #number of backend ids,
-     * or empty set if no backends match the policy, or the number of matched 
backends is less than "number";
+     *         or empty set if no backends match the policy, or the number of 
matched backends is less than "number";
      */
-    public List<Long> selectBackendIdsByPolicy(BeSelectionPolicy policy, int 
number) {
+    public List<Long> selectBackendIdsByPolicy(BeSelectionPolicy policy, int 
number,
+            List<Backend> backendList) {
         Preconditions.checkArgument(number >= -1);
-        List<Backend> candidates = 
policy.getCandidateBackends(getAllClusterBackendsNoException().values());
+
+        List<Backend> candidates = policy.getCandidateBackends(backendList);
+
         if (candidates.size() < number || candidates.isEmpty()) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Not match policy: {}. candidates num: {}, expected: 
{}", policy, candidates.size(), number);
@@ -1085,6 +1093,16 @@ public class SystemInfoService {
         return idToBackendRef;
     }
 
+    public ImmutableList<Backend> getBackendListByComputeGroup(Set<String> 
cgSet) {
+        List<Backend> result = new ArrayList<>();
+        for (Backend be : idToBackendRef.values()) {
+            if (cgSet.contains(be.getLocationTag().value)) {
+                result.add(be);
+            }
+        }
+        return ImmutableList.copyOf(result);
+    }
+
     // Cloud and NonCloud get all bes
     public ImmutableMap<Long, Backend> getAllBackendsByAllCluster() throws 
AnalysisException {
         return idToBackendRef;
@@ -1118,14 +1136,4 @@ public class SystemInfoService {
         return Env.getCurrentInvertedIndex().getTabletNumByBackendId(beId);
     }
 
-    public List<Backend> getBackendsByPolicy(BeSelectionPolicy 
beSelectionPolicy) {
-        try {
-            return 
beSelectionPolicy.getCandidateBackends(Env.getCurrentSystemInfo()
-                    .getBackendsByCurrentCluster().values().asList());
-        } catch (Throwable t) {
-            LOG.warn("get backends by policy failed, policy: {}", 
beSelectionPolicy.toString());
-        }
-        return Lists.newArrayList();
-    }
-
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/catalog/OlapTableTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/catalog/OlapTableTest.java
index 3f6230b99cf..b190229bf71 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/OlapTableTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/OlapTableTest.java
@@ -24,6 +24,7 @@ import org.apache.doris.common.io.FastByteArrayOutputStream;
 import org.apache.doris.common.util.UnitTestUtil;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.resource.Tag;
+import org.apache.doris.resource.computegroup.ComputeGroup;
 import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.TFetchOption;
 import org.apache.doris.thrift.TStorageType;
@@ -39,7 +40,6 @@ import org.junit.Test;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -269,15 +269,15 @@ public class OlapTableTest {
         Env.getCurrentSystemInfo().addBackend(be1);
         Env.getCurrentSystemInfo().addBackend(be2);
 
+        ConnectContext connectContext = UtFrameUtils.createDefaultCtx();
+        connectContext.setQualifiedUser("root");
+
         OlapTable tab = new OlapTable();
         TFetchOption tfetchOption = tab.generateTwoPhaseReadOption(-1);
         Assert.assertTrue(tfetchOption.nodes_info.nodes.size() == 2);
 
-        ConnectContext connectContext = UtFrameUtils.createDefaultCtx();
-        Set<Tag> tagSet = new HashSet<>();
-        tagSet.add(taga);
+        connectContext.setComputeGroup(new ComputeGroup("taga", "taga", 
Env.getCurrentSystemInfo()));
 
-        connectContext.setResourceTags(tagSet);
         TFetchOption tfetchOption2 = tab.generateTwoPhaseReadOption(-1);
         Assert.assertTrue(tfetchOption2.nodes_info.nodes.size() == 1);
         ConnectContext.remove();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
index 3b3e2eeedf7..04e097211c9 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
@@ -24,6 +24,7 @@ import org.apache.doris.common.util.LocationPath;
 import org.apache.doris.datasource.FederationBackendPolicy;
 import org.apache.doris.datasource.FileSplit;
 import org.apache.doris.datasource.NodeSelectionStrategy;
+import org.apache.doris.resource.computegroup.ComputeGroupMgr;
 import org.apache.doris.spi.Split;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
@@ -69,11 +70,17 @@ public class FederationBackendPolicyTest {
         backend3.setAlive(true);
         service.addBackend(backend3);
 
+        ComputeGroupMgr cgmgr = new ComputeGroupMgr(service);
         new MockUp<Env>() {
             @Mock
             public SystemInfoService getCurrentSystemInfo() {
                 return service;
             }
+
+            @Mock
+            public ComputeGroupMgr getComputeGroupMgr() {
+                return cgmgr;
+            }
         };
 
         List<Split> splits = new ArrayList<>();
@@ -134,11 +141,17 @@ public class FederationBackendPolicyTest {
         backend3.setAlive(true);
         service.addBackend(backend3);
 
+        ComputeGroupMgr cgmgr = new ComputeGroupMgr(service);
         new MockUp<Env>() {
             @Mock
             public SystemInfoService getCurrentSystemInfo() {
                 return service;
             }
+
+            @Mock
+            public ComputeGroupMgr getComputeGroupMgr() {
+                return cgmgr;
+            }
         };
 
         List<Split> splits = new ArrayList<>();
@@ -228,11 +241,17 @@ public class FederationBackendPolicyTest {
         backend3.setAlive(true);
         service.addBackend(backend3);
 
+        ComputeGroupMgr cgmgr = new ComputeGroupMgr(service);
         new MockUp<Env>() {
             @Mock
             public SystemInfoService getCurrentSystemInfo() {
                 return service;
             }
+
+            @Mock
+            public ComputeGroupMgr getComputeGroupMgr() {
+                return cgmgr;
+            }
         };
 
         List<Split> splits = new ArrayList<>();
@@ -307,11 +326,17 @@ public class FederationBackendPolicyTest {
     @Test
     public void testGenerateRandomly() throws UserException {
         SystemInfoService service = new SystemInfoService();
+        ComputeGroupMgr cgmgr = new ComputeGroupMgr(service);
         new MockUp<Env>() {
             @Mock
             public SystemInfoService getCurrentSystemInfo() {
                 return service;
             }
+
+            @Mock
+            public ComputeGroupMgr getComputeGroupMgr() {
+                return cgmgr;
+            }
         };
 
         Random random = new Random();
@@ -425,11 +450,17 @@ public class FederationBackendPolicyTest {
     @Test
     public void testNonAliveNodes() throws UserException {
         SystemInfoService service = new SystemInfoService();
+        ComputeGroupMgr cgmgr = new ComputeGroupMgr(service);
         new MockUp<Env>() {
             @Mock
             public SystemInfoService getCurrentSystemInfo() {
                 return service;
             }
+
+            @Mock
+            public ComputeGroupMgr getComputeGroupMgr() {
+                return cgmgr;
+            }
         };
 
         Random random = new Random();
@@ -599,11 +630,17 @@ public class FederationBackendPolicyTest {
         backend3.setAlive(true);
         service.addBackend(backend3);
 
+        ComputeGroupMgr cgmgr = new ComputeGroupMgr(service);
         new MockUp<Env>() {
             @Mock
             public SystemInfoService getCurrentSystemInfo() {
                 return service;
             }
+
+            @Mock
+            public ComputeGroupMgr getComputeGroupMgr() {
+                return cgmgr;
+            }
         };
 
         List<Split> splits = new ArrayList<>();
@@ -771,11 +808,17 @@ public class FederationBackendPolicyTest {
         backend3.setAlive(true);
         service.addBackend(backend3);
 
+        ComputeGroupMgr cgmgr = new ComputeGroupMgr(service);
         new MockUp<Env>() {
             @Mock
             public SystemInfoService getCurrentSystemInfo() {
                 return service;
             }
+
+            @Mock
+            public ComputeGroupMgr getComputeGroupMgr() {
+                return cgmgr;
+            }
         };
 
         List<Split> splits = new ArrayList<>();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java
index 850d8b27b06..1a8b27f149a 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java
@@ -41,6 +41,9 @@ import org.apache.doris.mysql.privilege.Auth;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.DdlExecutor;
 import org.apache.doris.resource.Tag;
+import org.apache.doris.resource.computegroup.AllBackendComputeGroup;
+import org.apache.doris.resource.computegroup.ComputeGroup;
+import org.apache.doris.resource.computegroup.MergedComputeGroup;
 import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.TDisk;
 import org.apache.doris.thrift.TStorageMedium;
@@ -195,17 +198,21 @@ public class ResourceTagQueryTest {
         Database db = Env.getCurrentInternalCatalog().getDbNullable("test");
         OlapTable tbl = (OlapTable) db.getTableNullable("tbl1");
 
-        Set<Tag> userTags = 
Env.getCurrentEnv().getAuth().getResourceTags(Auth.ROOT_USER);
-        Assert.assertEquals(0, userTags.size());
+        ComputeGroup cg1 = 
Env.getCurrentEnv().getAuth().getComputeGroup(Auth.ROOT_USER);
+        Assert.assertTrue(cg1 instanceof AllBackendComputeGroup);
 
         // set default tag for root
         String setPropStr = "set property for 'root' 'resource_tags.location' 
= 'default';";
         ExceptionChecker.expectThrowsNoException(() -> 
setProperty(setPropStr));
-        userTags = 
Env.getCurrentEnv().getAuth().getResourceTags(Auth.ROOT_USER);
-        Assert.assertEquals(1, userTags.size());
+        ComputeGroup cg2 = 
Env.getCurrentEnv().getAuth().getComputeGroup(Auth.ROOT_USER);
+        Assert.assertTrue(cg2 instanceof MergedComputeGroup);
+        MergedComputeGroup cg3 = (MergedComputeGroup) cg2;
+        Set<String> cgNameSet1 = cg3.getComputeGroupNameSet();
+        Assert.assertTrue(cgNameSet1.size() == 1);
+        Assert.assertTrue(cgNameSet1.contains("default"));
 
         // update connection context and query
-        connectContext.setResourceTags(userTags);
+        connectContext.setComputeGroup(cg2);
         String queryStr = "explain select * from test.tbl1";
         String explainString = 
UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
         System.out.println(explainString);
@@ -214,15 +221,15 @@ public class ResourceTagQueryTest {
         // set zone1 tag for root
         String setPropStr2 = "set property for 'root' 'resource_tags.location' 
= 'zone1';";
         ExceptionChecker.expectThrowsNoException(() -> 
setProperty(setPropStr2));
-        userTags = 
Env.getCurrentEnv().getAuth().getResourceTags(Auth.ROOT_USER);
-        Assert.assertEquals(1, userTags.size());
-        for (Tag tag : userTags) {
-            Assert.assertEquals(tag1, tag);
-        }
+        ComputeGroup cg4 = 
Env.getCurrentEnv().getAuth().getComputeGroup(Auth.ROOT_USER);
+        Assert.assertTrue(cg4 instanceof MergedComputeGroup);
+        MergedComputeGroup cg5 = (MergedComputeGroup) cg4;
+        Assert.assertTrue(cg5.getComputeGroupNameSet().size() == 1);
+        Assert.assertTrue(cg5.getComputeGroupNameSet().contains("zone1"));
 
         // update connection context and query, it will failed because no 
zone1 backend
-        connectContext.setResourceTags(userTags);
-        Assert.assertTrue(connectContext.isResourceTagsSet());
+        connectContext.setComputeGroup(cg4);
+        Assert.assertTrue(connectContext.isSetComputeGroup());
         queryStr = "explain select * from test.tbl1";
         String error = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, 
queryStr);
         Assert.assertTrue(error.contains("no queryable replicas"));
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/resource/ComputeGroupTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/resource/ComputeGroupTest.java
new file mode 100644
index 00000000000..2ad93dfbf82
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/resource/ComputeGroupTest.java
@@ -0,0 +1,636 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.resource;
+
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.CreateUserStmt;
+import org.apache.doris.analysis.SetUserPropertyStmt;
+import org.apache.doris.analysis.UserDesc;
+import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.ExceptionChecker;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.LoadException;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.RandomIdentifierGenerator;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.FederationBackendPolicy;
+import org.apache.doris.datasource.InternalCatalog;
+import org.apache.doris.load.loadv2.BrokerLoadJob;
+import org.apache.doris.load.routineload.KafkaRoutineLoadJob;
+import org.apache.doris.load.routineload.RoutineLoadJob;
+import org.apache.doris.load.routineload.RoutineLoadManager;
+import org.apache.doris.mysql.privilege.AccessControllerManager;
+import org.apache.doris.mysql.privilege.Auth;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.resource.computegroup.AllBackendComputeGroup;
+import org.apache.doris.resource.computegroup.CloudComputeGroup;
+import org.apache.doris.resource.computegroup.ComputeGroup;
+import org.apache.doris.resource.computegroup.ComputeGroupMgr;
+import org.apache.doris.resource.computegroup.MergedComputeGroup;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.BeSelectionPolicy;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.utframe.UtFrameUtils;
+
+import com.google.common.collect.Sets;
+import mockit.Expectations;
+import mockit.Mocked;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Set;
+
+public class ComputeGroupTest {
+    private static final Logger LOG = 
LogManager.getLogger(ComputeGroupTest.class);
+
+    // use a unique dir so that it won't be conflict with other unit test which
+    // may also start a Mocked Frontend
+    private static String runningDirBase = "fe";
+    private static ConnectContext connectContext;
+
+    private Auth auth;
+    @Mocked
+    public Env env;
+    @Mocked
+    private Analyzer analyzer;
+    @Mocked
+    AccessControllerManager accessManager;
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        FeConstants.runningUnitTest = true;
+        connectContext = UtFrameUtils.createDefaultCtx();
+    }
+
+    @Before
+    public void setUp() throws MetaNotFoundException {
+        auth = new Auth();
+        accessManager = new AccessControllerManager(auth);
+
+        new Expectations() {
+            {
+                Env.getCurrentEnv();
+                minTimes = 0;
+                result = env;
+
+                env.getAuth();
+                minTimes = 0;
+                result = auth;
+
+                analyzer.getDefaultCatalog();
+                minTimes = 0;
+                result = InternalCatalog.INTERNAL_CATALOG_NAME;
+
+                accessManager.checkGlobalPriv((ConnectContext) any, 
PrivPredicate.ADMIN);
+                minTimes = 0;
+                result = true;
+
+                accessManager.checkGlobalPriv((ConnectContext) any, 
PrivPredicate.GRANT);
+                minTimes = 0;
+                result = true;
+
+                accessManager.checkGlobalPriv((ConnectContext) any, 
PrivPredicate.OPERATOR);
+                minTimes = 0;
+                result = true;
+
+                accessManager.checkGlobalPriv((ConnectContext) any, 
PrivPredicate.CREATE);
+                minTimes = 0;
+                result = true;
+            }
+        };
+    }
+
+    // @AfterClass
+    // public static void tearDown() {
+    //     UtFrameUtils.cleanDorisFeDir(runningDirBase);
+    // }
+
+    private static void setProperty(String sql) throws Exception {
+        SetUserPropertyStmt setUserPropertyStmt = (SetUserPropertyStmt) 
UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext);
+        Env.getCurrentEnv().getAuth().updateUserProperty(setUserPropertyStmt);
+    }
+
+    @Test
+    public void testGetSetResourceTagFromAuth() throws Exception {
+        new Expectations() {
+            {
+                Env.getCurrentEnv().getComputeGroupMgr();
+                minTimes = 0;
+                result = new ComputeGroupMgr(null);
+            }
+        };
+
+            // 1 get no user
+            {
+                String invalidUser = 
RandomIdentifierGenerator.generateRandomIdentifier(8);
+                ComputeGroup cg1 = auth.getComputeGroup(invalidUser);
+                Assert.assertTrue(cg1 == ComputeGroup.INVALID_COMPUTE_GROUP);
+            }
+            // 2 get a non-admin user without resource tag
+            {
+                String nonAdminUserStr = "non_admin_user";
+                UserIdentity nonAdminUser = new UserIdentity(nonAdminUserStr, 
"%");
+                UserDesc nonAdminUserDesc = new UserDesc(nonAdminUser, 
"12345", true);
+
+                CreateUserStmt createNonAdminUser = new CreateUserStmt(false, 
nonAdminUserDesc, null);
+                createNonAdminUser.analyze(analyzer);
+                auth.createUser(createNonAdminUser);
+                ComputeGroup cg = auth.getComputeGroup(nonAdminUserStr);
+                Assert.assertTrue(cg instanceof MergedComputeGroup);
+                Assert.assertTrue(((MergedComputeGroup) 
cg).getComputeGroupNameSet().contains(Tag.VALUE_DEFAULT_TAG));
+
+                // 2.1 get a non-admin user with resource tag
+                String setPropStr = "set property for '" + nonAdminUserStr + 
"' 'resource_tags.location' = 'test_rg1';";
+                ExceptionChecker.expectThrowsNoException(() -> 
setProperty(setPropStr));
+                ComputeGroup cg2 = auth.getComputeGroup(nonAdminUserStr);
+                Assert.assertTrue(cg2 instanceof MergedComputeGroup);
+                Assert.assertTrue(((MergedComputeGroup) 
cg2).getComputeGroupNameSet().contains("test_rg1"));
+
+                // 2.2 get a non-admin user with multi-resource tag
+                String setPropStr2 = "set property for '" + nonAdminUserStr
+                        + "' 'resource_tags.location' = 'test_rg1,test_rg2';";
+                ExceptionChecker.expectThrowsNoException(() -> 
setProperty(setPropStr2));
+                ComputeGroup cg3 = auth.getComputeGroup(nonAdminUserStr);
+                Assert.assertTrue(cg3 instanceof MergedComputeGroup);
+                Set<String> cgNameSet = ((MergedComputeGroup) 
cg3).getComputeGroupNameSet();
+                Assert.assertTrue(cgNameSet.contains("test_rg1"));
+                Assert.assertTrue(cgNameSet.contains("test_rg2"));
+                Assert.assertTrue(cgNameSet.size() == 2);
+
+                // 2.3 get a non-admin user with empty tag
+                String setPropStr3 = "set property for '" + nonAdminUserStr
+                        + "' 'resource_tags.location' = '';";
+                ExceptionChecker.expectThrowsNoException(() -> 
setProperty(setPropStr3));
+                ComputeGroup cg4 = auth.getComputeGroup(nonAdminUserStr);
+                Assert.assertTrue(cg4 instanceof MergedComputeGroup);
+                Set<String> cgNameSets = ((MergedComputeGroup) 
cg4).getComputeGroupNameSet();
+                Assert.assertTrue(cgNameSets.size() == 1);
+                Assert.assertTrue(cgNameSets.contains("default"));
+            }
+
+            // 4 get an admin user without resource tag
+            {
+                ComputeGroup cg1 = auth.getComputeGroup("root");
+                Assert.assertTrue(cg1 instanceof AllBackendComputeGroup);
+
+                // 4.1 get an admin user with a resource tag
+                String setPropStr = "set property for 'root' 
'resource_tags.location' = 'test_rg2';";
+                ExceptionChecker.expectThrowsNoException(() -> 
setProperty(setPropStr));
+                ComputeGroup cg2 = auth.getComputeGroup("root");
+                Assert.assertTrue(cg2 instanceof MergedComputeGroup);
+                Assert.assertTrue(((MergedComputeGroup) 
cg2).getComputeGroupNameSet().contains("test_rg2"));
+
+
+                // 4.2 get an admin user with an empty resource tag
+                String setPropStr2 = "set property for 'root' 
'resource_tags.location' = '';";
+                ExceptionChecker.expectThrowsNoException(() -> 
setProperty(setPropStr2));
+                ComputeGroup cg3 = auth.getComputeGroup("root");
+                Assert.assertTrue(cg3 instanceof AllBackendComputeGroup);
+            }
+    }
+
+    @Test
+    public void testComputeGroup() {
+            // test invalid compute group
+            {
+                try {
+                    ComputeGroup.INVALID_COMPUTE_GROUP.getBackendList();
+                } catch (Exception e) {
+                    Assert.assertTrue(e.getMessage().contains("invalid compute 
group can not be used"));
+                }
+
+                try {
+                    ComputeGroup.INVALID_COMPUTE_GROUP.containsBackend("");
+                } catch (Exception e) {
+                    Assert.assertTrue(e.getMessage().contains("invalid compute 
group can not be used"));
+                }
+
+                String invalidCgToString = 
ComputeGroup.INVALID_COMPUTE_GROUP.toString();
+                String expectedCgString = String.format("%s id=%s, name=%s",
+                        
ComputeGroup.INVALID_COMPUTE_GROUP.getClass().getSimpleName(),
+                        ComputeGroup.INVALID_COMPUTE_GROUP_NAME, 
ComputeGroup.INVALID_COMPUTE_GROUP_NAME);
+                Assert.assertTrue(expectedCgString.equals(invalidCgToString));
+            }
+
+            // test Compute group
+            {
+                String cgId = "test_cg_id";
+                String cgName = "test_cg_1";
+                ComputeGroup cg1 = new ComputeGroup(cgId, cgName, null);
+                Assert.assertTrue(cgId.equals(cg1.getId()));
+                Assert.assertTrue(cgName.equals(cg1.getName()));
+                String cg1ToString = String.format("%s id=%s, name=%s", 
ComputeGroup.class.getSimpleName(), cgId, cgName);
+                Assert.assertTrue(cg1ToString.equals(cg1.toString()));
+                Assert.assertTrue(cg1.containsBackend(cgName));
+                Assert.assertFalse(cg1.containsBackend("123"));
+            }
+
+            // test Cloud Compute group
+            {
+                String cgId = "test_cloud_cg_id";
+                String cgName = "test_cloud_cg_name";
+                ComputeGroup cg1 = new CloudComputeGroup(cgId, cgName, null);
+                Assert.assertTrue(cgId.equals(cg1.getId()));
+                Assert.assertTrue(cgName.equals(cg1.getName()));
+                String cg1ToString = String.format("%s id=%s, name=%s", 
CloudComputeGroup.class.getSimpleName(), cgId,
+                        cgName);
+                Assert.assertTrue(cg1ToString.equals(cg1.toString()));
+                Assert.assertTrue(cg1.containsBackend(cgName));
+                Assert.assertFalse(cg1.containsBackend("123"));
+            }
+
+            // test MergedComputeGroup
+            {
+                Set<String> emptyTags = Sets.newHashSet();
+                String cgName = "merged_cg_name";
+                ComputeGroup emptyMergedCg = new MergedComputeGroup(emptyTags, 
null);
+                try {
+                    emptyMergedCg.getId();
+                } catch (Exception e) {
+                    
Assert.assertTrue(e.getMessage().contains("MergedComputeGroup not implements 
getId"));
+                }
+
+                try {
+                    emptyMergedCg.getName();
+                } catch (Exception e) {
+                    
Assert.assertTrue(e.getMessage().contains("MergedComputeGroup not implements 
getName"));
+                }
+
+                String mergedCgToString = String.format("%s %s", 
MergedComputeGroup.class.getSimpleName(),
+                        String.join(",", emptyTags));
+                
Assert.assertTrue(mergedCgToString.equals(emptyMergedCg.toString()));
+                Assert.assertFalse(emptyMergedCg.containsBackend(cgName));
+
+                Set<String> tags = Sets.newHashSet();
+                tags.add(cgName);
+                ComputeGroup notEmptyMergedCg = new MergedComputeGroup(tags, 
null);
+                String mergedCgToString2 = String.format("%s %s", 
MergedComputeGroup.class.getSimpleName(),
+                        String.join(",", tags));
+                
Assert.assertTrue(mergedCgToString2.equals(notEmptyMergedCg.toString()));
+                Assert.assertTrue(notEmptyMergedCg.containsBackend(cgName));
+            }
+
+            // test AllBackendComputeGroup
+            {
+                AllBackendComputeGroup allBeCg = new 
AllBackendComputeGroup(null);
+                try {
+                    allBeCg.getName();
+                } catch (Exception e) {
+                    
Assert.assertTrue(e.getMessage().contains("AllBackendComputeGroup not 
implements getName"));
+                }
+
+                try {
+                    allBeCg.getId();
+                } catch (Exception e) {
+                    
Assert.assertTrue(e.getMessage().contains("AllBackendComputeGroup not 
implements getId"));
+                }
+
+                
Assert.assertTrue(allBeCg.getClass().getSimpleName().equals(allBeCg.toString()));
+            }
+
+            // test equals
+            {
+                String cgName1 = "cg_name1";
+                String cgId1 = "cg_id1";
+                ComputeGroup cg1 = new ComputeGroup(cgId1, cgName1, null);
+
+                String cgName11 = "cg_name11";
+                ComputeGroup cg11 = new ComputeGroup(cgId1, cgName11, null);
+
+                String cgName2 = "cg_name2";
+                String cgId2 = "cg_id2";
+                ComputeGroup cg2 = new ComputeGroup(cgId2, cgName2, null);
+
+                ComputeGroup cg3 = new ComputeGroup(cgId1, cgName1, null);
+
+                Assert.assertFalse(cg1.equals(cg2));
+                Assert.assertFalse(cg2.equals(cg1));
+
+                Assert.assertFalse(cg1.equals(cg3));
+                Assert.assertFalse(cg3.equals(cg1));
+
+                Assert.assertFalse(cg1.equals(cg11));
+                Assert.assertFalse(cg11.equals(cg1));
+
+                Assert.assertFalse(cg1.equals(null));
+
+                CloudComputeGroup cloudCg1 = new CloudComputeGroup(cgId1, 
cgName1, null);
+                CloudComputeGroup cloudCg2 = new CloudComputeGroup(cgId2, 
cgName2, null);
+                Assert.assertFalse(cloudCg1.equals(cloudCg2));
+                Assert.assertFalse(cloudCg2.equals(cloudCg1));
+
+                AllBackendComputeGroup allBecg1 = new 
AllBackendComputeGroup(null);
+                AllBackendComputeGroup allBecg2 = new 
AllBackendComputeGroup(null);
+                Assert.assertFalse(allBecg1.equals(allBecg2));
+                Assert.assertFalse(allBecg2.equals(allBecg1));
+
+                MergedComputeGroup mergedCg1 = new MergedComputeGroup(null, 
null);
+                MergedComputeGroup mergedCg2 = new MergedComputeGroup(null, 
null);
+                Assert.assertFalse(mergedCg1.equals(mergedCg2));
+                Assert.assertFalse(mergedCg2.equals(mergedCg1));
+
+                // ComputeGroup vs others
+                Assert.assertTrue(cg1.equals(cg1));
+                Assert.assertFalse(cg1.equals(cloudCg1));
+                Assert.assertFalse(cg1.equals(allBecg1));
+                Assert.assertFalse(cg1.equals(mergedCg1));
+
+                // CloudComputeGroup vs others
+                Assert.assertTrue(cloudCg1.equals(cloudCg1));
+                Assert.assertFalse(cloudCg1.equals(cg1));
+                Assert.assertFalse(cloudCg1.equals(allBecg1));
+                Assert.assertFalse(cloudCg1.equals(mergedCg1));
+
+                // AllBackendComputeGroup vs others
+                Assert.assertTrue(allBecg1.equals(allBecg1));
+                Assert.assertFalse(allBecg1.equals(cg1));
+                Assert.assertFalse(allBecg1.equals(cloudCg1));
+                Assert.assertFalse(allBecg1.equals(mergedCg1));
+
+                // MergedComputeGroup vs others
+                Assert.assertTrue(mergedCg1.equals(mergedCg1));
+                Assert.assertFalse(mergedCg1.equals(cg1));
+                Assert.assertFalse(mergedCg1.equals(allBecg1));
+                Assert.assertFalse(mergedCg1.equals(cloudCg1));
+            }
+    }
+
+    @Test
+    public void testComputeGroupMgr() throws Exception {
+        SystemInfoService systemInfoService = new SystemInfoService();
+        ComputeGroupMgr cgmgr = new ComputeGroupMgr(systemInfoService);
+
+        Tag beTag1 = Tag.create(Tag.TYPE_LOCATION, "be_tag1");
+        Tag beTag2 = Tag.create(Tag.TYPE_LOCATION, "be_tag2");
+        Tag beTag3 = Tag.create(Tag.TYPE_LOCATION, "be_tag3");
+
+            {
+                Backend be1 = new Backend(10001, "192.168.1.1", 9050);
+                be1.setTagMap(beTag1.toMap());
+                be1.setAlive(true);
+
+                Backend be2 = new Backend(10002, "192.168.1.2", 9050);
+                be2.setTagMap(beTag1.toMap());
+                be2.setAlive(true);
+
+                systemInfoService.addBackend(be1);
+                systemInfoService.addBackend(be2);
+            }
+
+            {
+                Backend be1 = new Backend(10003, "192.168.1.3", 9050);
+                be1.setTagMap(beTag2.toMap());
+                be1.setAlive(true);
+
+                Backend be2 = new Backend(10004, "192.168.1.4", 9050);
+                be2.setTagMap(beTag2.toMap());
+                be2.setAlive(true);
+                systemInfoService.addBackend(be1);
+                systemInfoService.addBackend(be2);
+            }
+
+            {
+                Backend be3 = new Backend(10005, "192.168.1.5", 9050);
+                be3.setTagMap(beTag3.toMap());
+                be3.setAlive(true);
+                systemInfoService.addBackend(be3);
+            }
+
+        ComputeGroup cg1 = cgmgr.getComputeGroupByName(beTag1.value);
+        Assert.assertTrue(cg1.getBackendList().size() == 2);
+        ComputeGroup cg2 = cgmgr.getComputeGroupByName("abc");
+        Assert.assertTrue(cg2.getBackendList().size() == 0);
+
+        Set<Tag> tagSet1 = Sets.newHashSet(beTag1, beTag2);
+        
Assert.assertTrue(cgmgr.getComputeGroup(tagSet1).getBackendList().size() == 4);
+
+        Tag beTag4 = Tag.create(Tag.TYPE_LOCATION, "abc");
+        Set<Tag> tagset2 = Sets.newHashSet(beTag4);
+        
Assert.assertTrue(cgmgr.getComputeGroup(tagset2).getBackendList().size() == 0);
+
+        Set<Tag> emptyTagSet = Sets.newHashSet();
+        
Assert.assertTrue(cgmgr.getComputeGroup(emptyTagSet).getBackendList().size() == 
0);
+
+        
Assert.assertTrue(cgmgr.getAllBackendComputeGroup().getBackendList().size() == 
5);
+
+    }
+
+    @Test
+    public void testConnectContextToFederationBackendPolicy() throws 
UserException, IOException {
+        SystemInfoService systemInfoService = new SystemInfoService();
+
+        Tag beTag1 = Tag.create(Tag.TYPE_LOCATION, "be_tag1");
+        Tag beTag2 = Tag.DEFAULT_BACKEND_TAG;
+        Backend defaultBe = null;
+        Backend tag1Be = null;
+
+            {
+                tag1Be = new Backend(10001, "192.168.1.1", 9050);
+                tag1Be.setTagMap(beTag1.toMap());
+                tag1Be.setAlive(true);
+
+                defaultBe = new Backend(10002, "192.168.1.2", 9050);
+                defaultBe.setTagMap(beTag2.toMap());
+                defaultBe.setAlive(true);
+
+                systemInfoService.addBackend(tag1Be);
+                systemInfoService.addBackend(defaultBe);
+            }
+
+
+        ComputeGroupMgr cgmgr = new ComputeGroupMgr(systemInfoService);
+        new Expectations() {
+            {
+                Env.getCurrentEnv().getComputeGroupMgr();
+                minTimes = 0;
+                result = cgmgr;
+            }
+        };
+        BeSelectionPolicy beSelPolicy = new 
BeSelectionPolicy.Builder().build();
+
+
+            // 1 when connectctx is null, return default tag be.
+            {
+                ConnectContext.remove();
+                FederationBackendPolicy fbPolicy = new 
FederationBackendPolicy();
+                fbPolicy.init(beSelPolicy);
+                Assert.assertTrue(fbPolicy.getBackends().size() == 1);
+                Assert.assertTrue(fbPolicy.getBackends().contains(defaultBe));
+            }
+
+            // 2 get compute group from connect ctx
+            {
+                ConnectContext.remove();
+                ConnectContext context = new ConnectContext();
+                context.setThreadLocalInfo();
+                
context.setComputeGroup(cgmgr.getComputeGroupByName(beTag1.value));
+                FederationBackendPolicy fbPolicy = new 
FederationBackendPolicy();
+                fbPolicy.init(beSelPolicy);
+                Assert.assertTrue(fbPolicy.getBackends().size() == 1);
+                Assert.assertTrue(fbPolicy.getBackends().contains(tag1Be));
+            }
+
+            // 3 test set invalid compute group
+            {
+                ConnectContext.remove();
+                ConnectContext context = new ConnectContext();
+                context.setThreadLocalInfo();
+                context.setComputeGroup(ComputeGroup.INVALID_COMPUTE_GROUP);
+                FederationBackendPolicy fbPolicy = new 
FederationBackendPolicy();
+                try {
+                    fbPolicy.init(beSelPolicy);
+                } catch (UserException e) {
+                    
Assert.assertTrue(e.getMessage().contains(ComputeGroup.INVALID_COMPUTE_GROUP_ERR_MSG));
+                }
+            }
+
+            // 4 test get compute group from user property
+            {
+                ConnectContext.remove();
+                UtFrameUtils.createDefaultCtx();
+                String setPropStr = "set property for 'root' 
'resource_tags.location' = '" + beTag1.value + "';";
+                ExceptionChecker.expectThrowsNoException(() -> 
setProperty(setPropStr));
+                FederationBackendPolicy fbPolicy = new 
FederationBackendPolicy();
+                fbPolicy.init(beSelPolicy);
+                Assert.assertTrue(fbPolicy.getBackends().size() == 1);
+                Assert.assertTrue(fbPolicy.getBackends().contains(tag1Be));
+            }
+    }
+
+    @Test
+    public void testBrokerLoadToConnectContext() throws UserException, 
IOException {
+        new Expectations() {
+            {
+                Env.getCurrentEnv().getComputeGroupMgr();
+                minTimes = 0;
+                result = new ComputeGroupMgr(null);
+            }
+        };
+
+            // test user is empty string
+            {
+                ConnectContext.remove();
+                UserIdentity emptyUser = new UserIdentity("", "%");
+                emptyUser.setIsAnalyzed();
+                BrokerLoadJob brokerLoadJob =
+                        new BrokerLoadJob(1, null, null, null, emptyUser);
+                brokerLoadJob.setComputeGroup();
+                Assert.assertTrue(ConnectContext.get().getComputeGroupSafely() 
instanceof AllBackendComputeGroup);
+            }
+
+            // test invalid user
+            {
+                ConnectContext.remove();
+                UserIdentity emptyUser = new 
UserIdentity(RandomIdentifierGenerator.generateRandomIdentifier(8), "%");
+                emptyUser.setIsAnalyzed();
+                BrokerLoadJob brokerLoadJob =
+                        new BrokerLoadJob(1, null, null, null, emptyUser);
+                brokerLoadJob.setComputeGroup();
+                Assert.assertTrue(ConnectContext.get().getComputeGroupSafely() 
instanceof AllBackendComputeGroup);
+            }
+
+            // test get cg from user property
+            {
+
+                UtFrameUtils.createDefaultCtx();
+                String nonAdminUserStr = "non_admin_user";
+                UserIdentity nonAdminUser = new UserIdentity(nonAdminUserStr, 
"%");
+                UserDesc nonAdminUserDesc = new UserDesc(nonAdminUser, 
"12345", true);
+
+                CreateUserStmt createNonAdminUser = new CreateUserStmt(false, 
nonAdminUserDesc, null);
+                createNonAdminUser.analyze(analyzer);
+                auth.createUser(createNonAdminUser);
+
+                String tagName = "tag_rg_1";
+                String setPropStr = "set property for '" + nonAdminUserStr + 
"' 'resource_tags.location' = '" + tagName + "';";
+                ExceptionChecker.expectThrowsNoException(() -> 
setProperty(setPropStr));
+
+                BrokerLoadJob brokerLoadJob =
+                        new BrokerLoadJob(1, null, null, null, nonAdminUser);
+                brokerLoadJob.setComputeGroup();
+                ComputeGroup cg = ConnectContext.get().getComputeGroupSafely();
+                Assert.assertTrue(cg instanceof MergedComputeGroup);
+                Assert.assertTrue(((MergedComputeGroup) 
cg).getComputeGroupNameSet().contains(tagName));
+
+            }
+    }
+
+    @Test
+    public void testRoutineLoadToConnectContext() throws Exception {
+        new Expectations() {
+            {
+                Env.getCurrentEnv().getComputeGroupMgr();
+                minTimes = 0;
+                result = new ComputeGroupMgr(null);
+
+                Env.getCurrentSystemInfo();
+                minTimes = 0;
+                result = new SystemInfoService();
+            }
+        };
+
+
+
+            // 1 ctx's user is empty, return all backend
+            {
+                ConnectContext ctx = UtFrameUtils.createDefaultCtx();
+                ctx.setQualifiedUser(null);
+                RoutineLoadJob job = new KafkaRoutineLoadJob();
+                job.setComputeGroup();
+                Assert.assertTrue(ConnectContext.get().getComputeGroupSafely() 
instanceof AllBackendComputeGroup);
+            }
+
+
+            // 2 set an invalid user, get an invalid compute group, then 
return all backends
+            {
+                ConnectContext.get().setQualifiedUser("xxxx");
+                RoutineLoadJob job = new KafkaRoutineLoadJob();
+                job.setComputeGroup();
+                Assert.assertTrue(ConnectContext.get().getComputeGroupSafely() 
instanceof AllBackendComputeGroup);
+            }
+
+            // 3 get a valid compute group
+            {
+                ConnectContext.get().setQualifiedUser("root");
+                String setPropStr = "set property for 'root' 
'resource_tags.location' = 'tag_rg_1';";
+                ExceptionChecker.expectThrowsNoException(() -> 
setProperty(setPropStr));
+                RoutineLoadJob job = new KafkaRoutineLoadJob();
+                job.setComputeGroup();
+                ComputeGroup cg = ConnectContext.get().getComputeGroupSafely();
+                Assert.assertTrue(cg instanceof MergedComputeGroup);
+                Assert.assertTrue(((MergedComputeGroup) 
cg).getComputeGroupNameSet().contains("tag_rg_1"));
+            }
+
+            // 4 get a null job
+            {
+                RoutineLoadManager routineLoadManager = new 
RoutineLoadManager();
+                try {
+                    routineLoadManager.getAvailableBackendIdsForUt(1);
+                } catch (LoadException e) {
+                    Assert.assertTrue(e.getMessage().contains("does not 
exist"));
+                }
+            }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to