This is an automated email from the ASF dual-hosted git repository. wangbo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 6b6acf53a54 [Improment]Add ComputeGroupMgr (#48187) 6b6acf53a54 is described below commit 6b6acf53a5483b293dd90e2fdab0b1a4d670fb18 Author: wangbo <wan...@selectdb.com> AuthorDate: Mon Mar 17 11:23:34 2025 +0800 [Improment]Add ComputeGroupMgr (#48187) --- .../main/java/org/apache/doris/catalog/Env.java | 8 + .../java/org/apache/doris/catalog/OlapTable.java | 18 +- .../doris/datasource/FederationBackendPolicy.java | 54 +- .../org/apache/doris/httpv2/rest/LoadAction.java | 21 +- .../apache/doris/load/loadv2/BrokerLoadJob.java | 36 ++ .../doris/load/routineload/RoutineLoadJob.java | 34 +- .../doris/load/routineload/RoutineLoadManager.java | 36 +- .../java/org/apache/doris/mysql/MysqlProto.java | 2 +- .../org/apache/doris/mysql/privilege/Auth.java | 6 +- .../apache/doris/mysql/privilege/UserProperty.java | 2 +- .../doris/mysql/privilege/UserPropertyMgr.java | 11 +- .../org/apache/doris/planner/OlapScanNode.java | 16 +- .../java/org/apache/doris/qe/ConnectContext.java | 52 +- .../java/org/apache/doris/qe/ConnectProcessor.java | 4 +- .../main/java/org/apache/doris/qe/Coordinator.java | 4 +- .../computegroup/AllBackendComputeGroup.java | 63 ++ .../resource/computegroup/CloudComputeGroup.java | 36 ++ .../doris/resource/computegroup/ComputeGroup.java | 102 ++++ .../resource/computegroup/ComputeGroupMgr.java | 59 ++ .../resource/computegroup/MergedComputeGroup.java | 66 +++ .../org/apache/doris/system/BeSelectionPolicy.java | 3 +- .../org/apache/doris/system/SystemInfoService.java | 34 +- .../org/apache/doris/catalog/OlapTableTest.java | 10 +- .../doris/planner/FederationBackendPolicyTest.java | 43 ++ .../apache/doris/planner/ResourceTagQueryTest.java | 31 +- .../apache/doris/resource/ComputeGroupTest.java | 636 +++++++++++++++++++++ 26 files changed, 1264 insertions(+), 123 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index f88653ade36..404590a786d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -259,6 +259,7 @@ import org.apache.doris.qe.StmtExecutor; import org.apache.doris.qe.VariableMgr; import org.apache.doris.resource.AdmissionControl; import org.apache.doris.resource.Tag; +import org.apache.doris.resource.computegroup.ComputeGroupMgr; import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr; import org.apache.doris.resource.workloadschedpolicy.WorkloadRuntimeStatusMgr; import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyMgr; @@ -537,6 +538,8 @@ public class Env { private WorkloadGroupMgr workloadGroupMgr; + private ComputeGroupMgr computeGroupMgr; + private WorkloadSchedPolicyMgr workloadSchedPolicyMgr; private WorkloadRuntimeStatusMgr workloadRuntimeStatusMgr; @@ -821,6 +824,7 @@ public class Env { this.statisticsJobAppender = new StatisticsJobAppender(); this.globalFunctionMgr = new GlobalFunctionMgr(); this.workloadGroupMgr = new WorkloadGroupMgr(); + this.computeGroupMgr = new ComputeGroupMgr(systemInfo); this.workloadSchedPolicyMgr = new WorkloadSchedPolicyMgr(); this.workloadRuntimeStatusMgr = new WorkloadRuntimeStatusMgr(); this.admissionControl = new AdmissionControl(systemInfo); @@ -964,6 +968,10 @@ public class Env { return auditEventProcessor; } + public ComputeGroupMgr getComputeGroupMgr() { + return computeGroupMgr; + } + public WorkloadGroupMgr getWorkloadGroupMgr() { return workloadGroupMgr; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 868909dc9b8..060f77aca84 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -66,6 +66,7 @@ import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.OriginStatement; import org.apache.doris.resource.Tag; +import org.apache.doris.resource.computegroup.ComputeGroup; import org.apache.doris.rpc.RpcException; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.AnalysisInfo.AnalysisType; @@ -114,7 +115,6 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -3105,19 +3105,23 @@ public class OlapTable extends Table implements MTMVRelatedTableIf, GsonPostProc fetchOption.setFetchRowStore(useStoreRow); fetchOption.setUseTwoPhaseFetch(true); - // get backend by tag - Set<Tag> tagSet = new HashSet<>(); ConnectContext context = ConnectContext.get(); - if (context != null) { - tagSet = context.getResourceTags(); + if (context == null) { + context = new ConnectContext(); } BeSelectionPolicy policy = new BeSelectionPolicy.Builder() .needQueryAvailable() .setRequireAliveBe() - .addTags(tagSet) .build(); + TPaloNodesInfo nodesInfo = new TPaloNodesInfo(); - for (Backend backend : Env.getCurrentSystemInfo().getBackendsByPolicy(policy)) { + ComputeGroup computeGroup = context.getComputeGroupSafely(); + + if (ComputeGroup.INVALID_COMPUTE_GROUP.equals(computeGroup)) { + throw new RuntimeException(ComputeGroup.INVALID_COMPUTE_GROUP_ERR_MSG); + } + + for (Backend backend : policy.getCandidateBackends(computeGroup.getBackendList())) { nodesInfo.addToNodes(new TNodeInfo(backend.getId(), 0, backend.getHost(), backend.getBrpcPort())); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java index 1e1787c1f64..288b55448af 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java @@ -20,15 +20,15 @@ package org.apache.doris.datasource; -import org.apache.doris.catalog.Env; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.IndexedPriorityQueue; +import org.apache.doris.common.LoadException; import org.apache.doris.common.ResettableRandomizedIterator; import org.apache.doris.common.UserException; import org.apache.doris.common.util.ConsistentHash; -import org.apache.doris.mysql.privilege.UserProperty; import org.apache.doris.qe.ConnectContext; -import org.apache.doris.resource.Tag; +import org.apache.doris.resource.computegroup.ComputeGroup; import org.apache.doris.spi.Split; import org.apache.doris.system.Backend; import org.apache.doris.system.BeSelectionPolicy; @@ -36,7 +36,6 @@ import org.apache.doris.system.SystemInfoService; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.base.Strings; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -46,7 +45,6 @@ import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; -import com.google.common.collect.Sets; import com.google.common.hash.Funnel; import com.google.common.hash.Hashing; import com.google.common.hash.PrimitiveSink; @@ -154,41 +152,35 @@ public class FederationBackendPolicy { } public void init(List<String> preLocations) throws UserException { - Set<Tag> tags = Sets.newHashSet(); - if (ConnectContext.get() != null && ConnectContext.get().getCurrentUserIdentity() != null) { - String qualifiedUser = ConnectContext.get().getCurrentUserIdentity().getQualifiedUser(); - // Some request from stream load(eg, mysql load) may not set user info in ConnectContext - // just ignore it. - if (!Strings.isNullOrEmpty(qualifiedUser)) { - tags = Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser); - if (tags == UserProperty.INVALID_RESOURCE_TAGS) { - throw new UserException("No valid resource tag for user: " + qualifiedUser); - } - } - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("user info in ExternalFileScanNode should not be null, add log to observer"); - } - } - // scan node is used for query - BeSelectionPolicy policy = new BeSelectionPolicy.Builder() - .needQueryAvailable() + BeSelectionPolicy.Builder builder = new BeSelectionPolicy.Builder(); + builder.needQueryAvailable() .needLoadAvailable() - .addTags(tags) .preferComputeNode(Config.prefer_compute_node_for_external_table) .assignExpectBeNum(Config.min_backend_num_for_external_table) - .addPreLocations(preLocations) - .build(); - init(policy); + .addPreLocations(preLocations); + init(builder.build()); } public void init(BeSelectionPolicy policy) throws UserException { - backends.addAll(policy.getCandidateBackends(Env.getCurrentSystemInfo() - .getBackendsByCurrentCluster().values().asList())); + ConnectContext ctx = ConnectContext.get(); + if (ctx == null) { + if (Config.isCloudMode()) { + throw new AnalysisException("ConnectContext is null"); + } else { + ctx = new ConnectContext(); + } + + } + ComputeGroup computeGroup = ctx.getComputeGroup(); + if (Config.isNotCloudMode() && computeGroup.equals(ComputeGroup.INVALID_COMPUTE_GROUP)) { + throw new LoadException(ComputeGroup.INVALID_COMPUTE_GROUP_ERR_MSG); + } + + backends.addAll(policy.getCandidateBackends(computeGroup.getBackendList())); if (backends.isEmpty()) { throw new UserException("No available backends, " - + "in cloud maybe this cluster has been dropped, please `use @otherClusterName` switch it"); + + "in cloud maybe this cluster has been dropped, please `use @otherClusterName` switch it"); } for (Backend backend : backends) { assignedWeightPerBackend.put(backend, 0L); diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java index 48776da1569..8401eb32ab5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java @@ -47,7 +47,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.planner.GroupCommitPlanner; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.SessionVariable; -import org.apache.doris.resource.Tag; +import org.apache.doris.resource.computegroup.ComputeGroup; import org.apache.doris.service.ExecuteEnv; import org.apache.doris.system.Backend; import org.apache.doris.system.BeSelectionPolicy; @@ -439,19 +439,16 @@ public class LoadAction extends RestBaseController { throws LoadException { Backend backend = null; BeSelectionPolicy policy = null; - String qualifiedUser = ConnectContext.get().getQualifiedUser(); - Set<Tag> userTags = Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser); - policy = new BeSelectionPolicy.Builder() - .addTags(userTags) - .setEnableRoundRobin(true) - .needLoadAvailable().build(); + ConnectContext ctx = ConnectContext.get(); + if (ctx == null) { + throw new LoadException("ConnectContext should not be null"); + } + ComputeGroup computeGroup = ctx.getComputeGroupSafely(); + policy = new BeSelectionPolicy.Builder().setEnableRoundRobin(true).needLoadAvailable().build(); policy.nextRoundRobinIndex = getLastSelectedBackendIndexAndUpdate(); List<Long> backendIds; - if (groupCommit) { - backendIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, -1); - } else { - backendIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1); - } + int number = groupCommit ? -1 : 1; + backendIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, number, computeGroup.getBackendList()); if (backendIds.isEmpty()) { throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index 45ad61a1e30..dab9ee32624 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -52,6 +52,7 @@ import org.apache.doris.load.FailMsg; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.OriginStatement; import org.apache.doris.qe.SessionVariable; +import org.apache.doris.resource.computegroup.ComputeGroup; import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; import org.apache.doris.thrift.TStatusCode; @@ -63,6 +64,7 @@ import org.apache.doris.transaction.TransactionState.TxnSourceType; import com.google.common.base.Joiner; import com.google.common.collect.Lists; +import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -219,6 +221,37 @@ public class BrokerLoadJob extends BulkLoadJob { loadStartTimestamp = System.currentTimeMillis(); } + // make public for UT + public void setComputeGroup() { + ComputeGroup computeGroup = null; + ConnectContext context = ConnectContext.get(); + try { + if (context == null) { + context = new ConnectContext(); + context.setThreadLocalInfo(); + } + + String currentUser = getUserInfo().getQualifiedUser(); + // user is null or get an invalid compute group should not be normal case, + // broker load job can get all backends when meets it. + if (StringUtils.isEmpty(currentUser)) { + computeGroup = Env.getCurrentEnv().getComputeGroupMgr().getAllBackendComputeGroup(); + LOG.warn("can not find user in broker load, then skip compute group."); + } else { + computeGroup = Env.getCurrentEnv().getAuth().getComputeGroup(currentUser); + if (ComputeGroup.INVALID_COMPUTE_GROUP.equals(computeGroup)) { + LOG.warn("get an invalid compute group in broker load job."); + computeGroup = Env.getCurrentEnv().getComputeGroupMgr().getAllBackendComputeGroup(); + } + } + } catch (Throwable t) { + LOG.warn("error happens when set compute group for broker load", t); + computeGroup = Env.getCurrentEnv().getComputeGroupMgr().getAllBackendComputeGroup(); + } + + context.setComputeGroup(computeGroup); + } + protected LoadLoadingTask createTask(Database db, OlapTable table, List<BrokerFileGroup> brokerFileGroups, boolean isEnableMemtableOnSinkNode, int batchSize, FileGroupAggKey aggKey, BrokerPendingTaskAttachment attachment) throws UserException { @@ -231,6 +264,9 @@ public class BrokerLoadJob extends BulkLoadJob { UUID uuid = UUID.randomUUID(); TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); + + setComputeGroup(); + task.init(loadId, attachment.getFileStatusByTable(aggKey), attachment.getFileNumByTable(aggKey), getUserInfo()); task.settWorkloadGroups(tWorkloadGroups); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 859d5df43e6..b30e100a38f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -61,6 +61,7 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.OriginStatement; import org.apache.doris.qe.SessionVariable; import org.apache.doris.qe.SqlModeHelper; +import org.apache.doris.resource.computegroup.ComputeGroup; import org.apache.doris.task.LoadTaskInfo; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; @@ -71,7 +72,6 @@ import org.apache.doris.transaction.TransactionException; import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionStatus; -import com.aliyuncs.utils.StringUtils; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -83,6 +83,7 @@ import com.google.gson.GsonBuilder; import com.google.gson.annotations.SerializedName; import lombok.Getter; import lombok.Setter; +import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -1013,6 +1014,35 @@ public abstract class RoutineLoadJob // derived class can override this. public abstract void prepare() throws UserException; + // make this public here just for UT. + public void setComputeGroup() { + ComputeGroup computeGroup = null; + try { + if (ConnectContext.get() == null) { + ConnectContext ctx = new ConnectContext(); + ctx.setThreadLocalInfo(); + } + String currentUser = ConnectContext.get().getQualifiedUser(); + if (StringUtils.isEmpty(currentUser)) { + currentUser = getUserIdentity().getQualifiedUser(); + } + if (StringUtils.isEmpty(currentUser)) { + LOG.warn("can not find user in routine load"); + computeGroup = Env.getCurrentEnv().getComputeGroupMgr().getAllBackendComputeGroup(); + } else { + computeGroup = Env.getCurrentEnv().getAuth().getComputeGroup(currentUser); + } + if (ComputeGroup.INVALID_COMPUTE_GROUP.equals(computeGroup)) { + LOG.warn("get an invalid compute group in routine load"); + computeGroup = Env.getCurrentEnv().getComputeGroupMgr().getAllBackendComputeGroup(); + } + } catch (Throwable t) { + LOG.warn("error happens when set compute group for routine load", t); + computeGroup = Env.getCurrentEnv().getComputeGroupMgr().getAllBackendComputeGroup(); + } + ConnectContext.get().setComputeGroup(computeGroup); + } + public TPipelineFragmentParams plan(StreamLoadPlanner planner, TUniqueId loadId, long txnId) throws UserException { Preconditions.checkNotNull(planner); Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId); @@ -1037,6 +1067,8 @@ public abstract class RoutineLoadJob } else { ConnectContext.get().setCloudCluster(clusterName); } + } else { + setComputeGroup(); } TPipelineFragmentParams planParams = planner.plan(loadId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index bfe42ad7695..3bb25e85619 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -44,11 +44,11 @@ import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.mysql.privilege.PrivPredicate; -import org.apache.doris.mysql.privilege.UserProperty; import org.apache.doris.persist.AlterRoutineLoadJobOperationLog; import org.apache.doris.persist.RoutineLoadOperation; import org.apache.doris.qe.ConnectContext; import org.apache.doris.resource.Tag; +import org.apache.doris.resource.computegroup.ComputeGroup; import org.apache.doris.system.Backend; import org.apache.doris.system.BeSelectionPolicy; @@ -553,6 +553,11 @@ public class RoutineLoadManager implements Writable { } } + // just for UT + public List<Long> getAvailableBackendIdsForUt(long jobId) throws LoadException { + return getAvailableBackendIds(jobId); + } + /** * The routine load task can only be scheduled on backends which has proper resource tags. * The tags should be got from user property. @@ -565,24 +570,43 @@ public class RoutineLoadManager implements Writable { * @throws LoadException */ protected List<Long> getAvailableBackendIds(long jobId) throws LoadException { + // Usually Cloud node could not reach here(refer CloudRoutineLoadManager.getAvailableBackendIds), + // check cloud mode here is just to be on the safe side. + if (Config.isCloudMode()) { + throw new LoadException("cloud mode should not reach here"); + } + RoutineLoadJob job = getJob(jobId); if (job == null) { throw new LoadException("job " + jobId + " does not exist"); } - Set<Tag> tags; + Set<Tag> tags = null; + ComputeGroup computeGroup = null; if (job.getUserIdentity() == null) { // For old job, there may be no user info. So we have to use tags from replica allocation tags = getTagsFromReplicaAllocation(job.getDbId(), job.getTableId()); + BeSelectionPolicy policy = new BeSelectionPolicy.Builder().addTags(tags).needLoadAvailable().build(); + return Env.getCurrentSystemInfo() + .selectBackendIdsByPolicy(policy, -1 /* as many as possible */); } else { - tags = Env.getCurrentEnv().getAuth().getResourceTags(job.getUserIdentity().getQualifiedUser()); - if (tags == UserProperty.INVALID_RESOURCE_TAGS) { + computeGroup = Env.getCurrentEnv().getAuth().getComputeGroup(job.getUserIdentity().getQualifiedUser()); + if (ComputeGroup.INVALID_COMPUTE_GROUP.equals(computeGroup)) { // user may be dropped, or may not set resource tag property. // Here we fall back to use replica tag tags = getTagsFromReplicaAllocation(job.getDbId(), job.getTableId()); } + + if (computeGroup != null && !ComputeGroup.INVALID_COMPUTE_GROUP.equals(computeGroup)) { + BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().build(); + return Env.getCurrentSystemInfo() + .selectBackendIdsByPolicy(policy, -1 /* as many as possible */, + computeGroup.getBackendList()); + } else { + BeSelectionPolicy policy = new BeSelectionPolicy.Builder().addTags(tags).needLoadAvailable().build(); + return Env.getCurrentSystemInfo() + .selectBackendIdsByPolicy(policy, -1 /* as many as possible */); + } } - BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().addTags(tags).build(); - return Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, -1 /* as many as possible */); } private Set<Tag> getTagsFromReplicaAllocation(long dbId, long tblId) throws LoadException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java index a672a217a33..92154779661 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java @@ -266,7 +266,7 @@ public class MysqlProto { } // set resource tag if has - context.setResourceTags(Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser)); + context.setComputeGroup(Env.getCurrentEnv().getAuth().getComputeGroup(qualifiedUser)); return true; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java index 5d3324bca2b..590b4970be7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java @@ -66,7 +66,7 @@ import org.apache.doris.persist.AlterUserOperationLog; import org.apache.doris.persist.LdapInfo; import org.apache.doris.persist.PrivInfo; import org.apache.doris.qe.ConnectContext; -import org.apache.doris.resource.Tag; +import org.apache.doris.resource.computegroup.ComputeGroup; import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr; import org.apache.doris.thrift.TPrivilegeStatus; @@ -1229,10 +1229,10 @@ public class Auth implements Writable { } } - public Set<Tag> getResourceTags(String qualifiedUser) { + public ComputeGroup getComputeGroup(String qualifiedUser) { readLock(); try { - return propertyMgr.getResourceTags(qualifiedUser); + return propertyMgr.getComputeGroup(qualifiedUser); } finally { readUnlock(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java index a637fb6c182..01f8fac9dec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java @@ -259,7 +259,7 @@ public class UserProperty implements Writable { } newDefaultLoadCluster = value; - } else if (keyArr[0].equalsIgnoreCase(DEFAULT_CLOUD_CLUSTER)) { + } else if (keyArr[0].equalsIgnoreCase(DEFAULT_CLOUD_CLUSTER)) { newDefaultCloudCluster = checkCloudDefaultCluster(keyArr, value, DEFAULT_CLOUD_CLUSTER, isReplay); } else if (keyArr[0].equalsIgnoreCase(DEFAULT_COMPUTE_GROUP)) { newDefaultCloudCluster = checkCloudDefaultCluster(keyArr, value, DEFAULT_COMPUTE_GROUP, isReplay); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyMgr.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyMgr.java index 29ae1f438a1..67415abb7a5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyMgr.java @@ -32,6 +32,7 @@ import org.apache.doris.load.DppConfig; import org.apache.doris.mysql.authenticate.AuthenticateType; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.resource.Tag; +import org.apache.doris.resource.computegroup.ComputeGroup; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -161,11 +162,11 @@ public class UserPropertyMgr implements Writable { return existProperty.getParallelFragmentExecInstanceNum(); } - public Set<Tag> getResourceTags(String qualifiedUser) { + public ComputeGroup getComputeGroup(String qualifiedUser) { UserProperty existProperty = propertyMap.get(qualifiedUser); existProperty = getPropertyIfNull(qualifiedUser, existProperty); if (existProperty == null) { - return UserProperty.INVALID_RESOURCE_TAGS; + return ComputeGroup.INVALID_COMPUTE_GROUP; } Set<Tag> tags = existProperty.getCopiedResourceTags(); // only root and admin can return empty tag. @@ -176,7 +177,11 @@ public class UserPropertyMgr implements Writable { && Config.force_olap_table_replication_allocation.isEmpty()) { tags = Sets.newHashSet(Tag.DEFAULT_BACKEND_TAG); } - return tags; + if (!tags.isEmpty()) { + return Env.getCurrentEnv().getComputeGroupMgr().getComputeGroup(tags); + } else { + return Env.getCurrentEnv().getComputeGroupMgr().getAllBackendComputeGroup(); + } } public Pair<String, DppConfig> getLoadClusterInfo(String qualifiedUser, String cluster) throws DdlException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 5a1aaec5085..5e8c8e5df67 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -70,7 +70,7 @@ import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; import org.apache.doris.planner.normalize.Normalizer; import org.apache.doris.planner.normalize.PartitionRangePredicateNormalizer; import org.apache.doris.qe.ConnectContext; -import org.apache.doris.resource.Tag; +import org.apache.doris.resource.computegroup.ComputeGroup; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.statistics.StatsDeriveResult; import org.apache.doris.statistics.StatsRecursiveDerive; @@ -757,14 +757,12 @@ public class OlapScanNode extends ScanNode { } String visibleVersionStr = String.valueOf(visibleVersion); - Set<Tag> allowedTags = Sets.newHashSet(); int useFixReplica = -1; - boolean needCheckTags = false; boolean skipMissingVersion = false; ConnectContext context = ConnectContext.get(); + ComputeGroup computeGroup = null; if (context != null) { - allowedTags = context.getResourceTags(); - needCheckTags = context.isResourceTagsSet(); + computeGroup = context.getComputeGroupSafely(); useFixReplica = context.getSessionVariable().useFixReplica; if (useFixReplica == -1 && context.getState().isNereids() && context.getSessionVariable().getEnableQueryCache()) { @@ -915,10 +913,12 @@ public class OlapScanNode extends ScanNode { if (!backend.isMixNode()) { continue; } - if (needCheckTags && !allowedTags.isEmpty() && !allowedTags.contains(backend.getLocationTag())) { + String beTagName = backend.getLocationTag().value; + if ((ComputeGroup.INVALID_COMPUTE_GROUP.equals(computeGroup)) || (computeGroup != null + && !Config.isCloudMode() && !computeGroup.containsBackend(beTagName))) { String err = String.format( - "Replica on backend %d with tag %s," + " which is not in user's resource tags: %s", - backend.getId(), backend.getLocationTag(), allowedTags); + "Replica on backend %d with tag %s," + " which is not in user's resource tag: %s", + backend.getId(), beTagName, computeGroup.toString()); if (LOG.isDebugEnabled()) { LOG.debug(err); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index 14d7d03b0f5..dc3e5841ef2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -64,6 +64,8 @@ import org.apache.doris.plsql.Exec; import org.apache.doris.plsql.executor.PlSqlOperation; import org.apache.doris.plugin.AuditEvent.AuditEventBuilder; import org.apache.doris.resource.Tag; +import org.apache.doris.resource.computegroup.ComputeGroup; +import org.apache.doris.resource.computegroup.ComputeGroupMgr; import org.apache.doris.service.arrowflight.results.FlightSqlChannel; import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation; import org.apache.doris.statistics.ColumnStatistic; @@ -79,7 +81,6 @@ import org.apache.doris.transaction.TransactionStatus; import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import io.netty.util.concurrent.FastThreadLocal; import lombok.Getter; import lombok.Setter; @@ -211,14 +212,11 @@ public class ConnectContext { // If set to true, the nondeterministic function will not be rewrote to constant. private boolean notEvalNondeterministicFunction = false; - // The resource tag is used to limit the node resources that the user can use for query. + // The compute group tag is used to limit the node resources that the user can use for query. // The default is empty, that is, unlimited. // This property is obtained from UserProperty when the client connection is created. // Only when the connection is created again, the new resource tags will be retrieved from the UserProperty - private Set<Tag> resourceTags = Sets.newHashSet(); - // If set to true, the resource tags set in resourceTags will be used to limit the query resources. - // If set to false, the system will not restrict query resources. - private boolean isResourceTagsSet = false; + private ComputeGroup computeGroup = null; private PlSqlOperation plSqlOperation = null; @@ -1086,17 +1084,12 @@ public class ConnectContext { return threadInfo; } - public boolean isResourceTagsSet() { - return isResourceTagsSet; + public boolean isSetComputeGroup() { + return computeGroup != null; } - public Set<Tag> getResourceTags() { - return resourceTags; - } - - public void setResourceTags(Set<Tag> resourceTags) { - this.resourceTags = resourceTags; - this.isResourceTagsSet = !this.resourceTags.isEmpty(); + public void setComputeGroup(ComputeGroup computeGroup) { + this.computeGroup = computeGroup; } public void setCurrentConnectedFEIp(String ip) { @@ -1360,6 +1353,35 @@ public class ConnectContext { : new CloudClusterResult(hasAuthCluster.get(0), CloudClusterResult.Comment.FOUND_BY_FRIST_CLUSTER_HAS_AUTH); } + public ComputeGroup getComputeGroupSafely() { + try { + return getComputeGroup(); + } catch (UserException e) { + throw new RuntimeException(e); + } + } + + public ComputeGroup getComputeGroup() throws UserException { + ComputeGroupMgr cgMgr = Env.getCurrentEnv().getComputeGroupMgr(); + if (Config.isCloudMode()) { + return cgMgr.getComputeGroupByName(getCurrentCloudCluster()); + } else { + // In order to be compatible with resource tag's old logic, + // when a user login in FE by mysql client, then its tags are set in ConnectContext which + // means isSetComputeGroup = true + if (this.isSetComputeGroup()) { + return computeGroup; + } else { + String currentUser = getQualifiedUser(); + if (!StringUtils.isEmpty(currentUser)) { + return Env.getCurrentEnv().getAuth().getComputeGroup(currentUser); + } else { + return Env.getCurrentEnv().getComputeGroupMgr().getComputeGroupByName(Tag.VALUE_DEFAULT_TAG); + } + } + } + } + /** * Tries to choose an available cluster in the following order * 1. Do nothing if a cluster has been chosen for current session. It may be diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index a415f2dccc3..2cbdd92f7e4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -622,8 +622,8 @@ public abstract class ConnectProcessor { ctx.setUserVars(userVariableFromThrift(request.getUserVariables())); } - // set resource tag - ctx.setResourceTags(Env.getCurrentEnv().getAuth().getResourceTags(ctx.qualifiedUser)); + // set compute group + ctx.setComputeGroup(Env.getCurrentEnv().getAuth().getComputeGroup(ctx.qualifiedUser)); ctx.setThreadLocalInfo(); StmtExecutor executor = null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 795e24b4704..110385ed382 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1740,7 +1740,7 @@ public class Coordinator implements CoordInterface { TNetworkAddress execHostport; if (groupCommitBackend != null) { execHostport = getGroupCommitBackend(addressToBackendID); - } else if (((ConnectContext.get() != null && ConnectContext.get().isResourceTagsSet()) || ( + } else if (((ConnectContext.get() != null && ConnectContext.get().isSetComputeGroup()) || ( isAllExternalScan && Config.prefer_compute_node_for_external_table)) && !addressToBackendID.isEmpty()) { // 2 cases: @@ -1932,7 +1932,7 @@ public class Coordinator implements CoordInterface { TNetworkAddress execHostport; if (groupCommitBackend != null) { execHostport = getGroupCommitBackend(addressToBackendID); - } else if (ConnectContext.get() != null && ConnectContext.get().isResourceTagsSet() + } else if (ConnectContext.get() != null && ConnectContext.get().isSetComputeGroup() && !addressToBackendID.isEmpty()) { // In this case, we only use the BE where the replica selected by the tag is located to // execute this query. Otherwise, except for the scan node, the rest of the execution nodes diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/computegroup/AllBackendComputeGroup.java b/fe/fe-core/src/main/java/org/apache/doris/resource/computegroup/AllBackendComputeGroup.java new file mode 100644 index 00000000000..665fd9601b0 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/computegroup/AllBackendComputeGroup.java @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.resource.computegroup; + +import org.apache.doris.common.Config; +import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; + +import com.google.common.collect.ImmutableList; + +public class AllBackendComputeGroup extends ComputeGroup { + + public AllBackendComputeGroup(SystemInfoService systemInfoService) { + super(AllBackendComputeGroup.class.getSimpleName(), AllBackendComputeGroup.class.getSimpleName(), + systemInfoService); + } + + @Override + public boolean containsBackend(String beTag) { + if (Config.isCloudMode()) { + throw new RuntimeException("AllBackendComputeGroup not implements containsBackend in cloud mode."); + } + // currently AllBackendComputeGroup is used when admin/root user not specifies a resource tag, + // then they can get all backends + return true; + } + + @Override + public String getId() { + throw new RuntimeException("AllBackendComputeGroup not implements getId."); + } + + @Override + public String getName() { + throw new RuntimeException("AllBackendComputeGroup not implements getName."); + } + + @Override + public ImmutableList<Backend> getBackendList() { + return systemInfoService.getAllClusterBackendsNoException().values().asList(); + } + + @Override + public String toString() { + return AllBackendComputeGroup.class.getSimpleName(); + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/computegroup/CloudComputeGroup.java b/fe/fe-core/src/main/java/org/apache/doris/resource/computegroup/CloudComputeGroup.java new file mode 100644 index 00000000000..8d2a670d170 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/computegroup/CloudComputeGroup.java @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.resource.computegroup; + +import org.apache.doris.cloud.system.CloudSystemInfoService; +import org.apache.doris.system.Backend; + +import java.util.List; + +public class CloudComputeGroup extends ComputeGroup { + + public CloudComputeGroup(String id, String name, CloudSystemInfoService systemInfoService) { + super(id, name, systemInfoService); + } + + @Override + public List<Backend> getBackendList() { + return ((CloudSystemInfoService) (super.systemInfoService)).getBackendsByClusterName(name); + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/computegroup/ComputeGroup.java b/fe/fe-core/src/main/java/org/apache/doris/resource/computegroup/ComputeGroup.java new file mode 100644 index 00000000000..8a9fd7665a8 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/computegroup/ComputeGroup.java @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.resource.computegroup; + +import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; + +import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class ComputeGroup { + + private static final Logger LOG = LogManager.getLogger(ComputeGroup.class); + + // NOTE: Why need an invalid compute group + // invalid compute group is mainly used in local-mode(not cloud mode), + // it comes from invalid resource tag(UserPropertyMgr.getComputeGroup), when trying to find a resource tag + // by user, but the user has not a property(maybe user is invalid), + // so introduce an invalid compute group to cover this case. + // caller should deal the invalid compute case by case; + // eg, if routine load cant not find a valid compute group when finding Slot BE, it could find BE by replica's tag. + // some other case may throw an Exception. + public static final String INVALID_COMPUTE_GROUP_NAME = "_invalid_compute_group_name"; + + public static final ComputeGroup INVALID_COMPUTE_GROUP = new ComputeGroup(INVALID_COMPUTE_GROUP_NAME, + INVALID_COMPUTE_GROUP_NAME, null); + + public static final String INVALID_COMPUTE_GROUP_ERR_MSG + = "can not find a valid compute group, please check whether current user is valid."; + + protected SystemInfoService systemInfoService; + + protected String id; + + protected String name; + + public ComputeGroup(String id, String name, SystemInfoService systemInfoService) { + this.id = id; + this.name = name; + this.systemInfoService = systemInfoService; + } + + public String getId() { + return id; + } + + public String getName() { + return name; + } + + public boolean containsBackend(String backendTag) { + checkInvalidComputeGroup(); + return StringUtils.equals(this.name, backendTag); + } + + public List<Backend> getBackendList() { + checkInvalidComputeGroup(); + Set<String> cgSet = new HashSet<>(); + cgSet.add(name); + return systemInfoService.getBackendListByComputeGroup(cgSet); + } + + @Override + public String toString() { + return String.format("%s id=%s, name=%s", this.getClass().getSimpleName(), id, name); + } + + @Override + public boolean equals(Object obj) { + // NOTE: currently equals is used to compare INVALID_COMPUTE_GROUP, just using ```==``` is enough. + return (this == obj); + } + + private void checkInvalidComputeGroup() { + if (this.getName() == INVALID_COMPUTE_GROUP_NAME) { + throw new RuntimeException( + "invalid compute group can not be used, please check whether current user is valid."); + } + } + +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/computegroup/ComputeGroupMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/computegroup/ComputeGroupMgr.java new file mode 100644 index 00000000000..167b6cc5040 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/computegroup/ComputeGroupMgr.java @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.resource.computegroup; + +import org.apache.doris.cloud.system.CloudSystemInfoService; +import org.apache.doris.common.Config; +import org.apache.doris.resource.Tag; +import org.apache.doris.system.SystemInfoService; + +import com.google.common.collect.Sets; + +import java.util.Set; + +public class ComputeGroupMgr { + + private SystemInfoService systemInfoService; + + public ComputeGroupMgr(SystemInfoService systemInfoService) { + this.systemInfoService = systemInfoService; + } + + public ComputeGroup getComputeGroupByName(String name) { + if (Config.isCloudMode()) { + return new CloudComputeGroup("", name, (CloudSystemInfoService) systemInfoService); + } else { + return new ComputeGroup("", name, systemInfoService); + } + } + + public ComputeGroup getComputeGroup(Set<Tag> rgTags) { + Set<String> tagStrSet = Sets.newHashSet(); + for (Tag tag : rgTags) { + tagStrSet.add(tag.value); + } + return new MergedComputeGroup(tagStrSet, systemInfoService); + } + + // to be compatible with resource tag's logic, if root/admin user not specify a resource tag, + // which means return all backends. + public ComputeGroup getAllBackendComputeGroup() { + return new AllBackendComputeGroup(systemInfoService); + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/computegroup/MergedComputeGroup.java b/fe/fe-core/src/main/java/org/apache/doris/resource/computegroup/MergedComputeGroup.java new file mode 100644 index 00000000000..bebc8a37bdd --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/computegroup/MergedComputeGroup.java @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.resource.computegroup; + +import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; + +import com.google.common.collect.ImmutableList; + +import java.util.Set; + +public class MergedComputeGroup extends ComputeGroup { + + private Set<String> computeGroupSet; + + public MergedComputeGroup(Set<String> computeGroupSet, SystemInfoService systemInfoService) { + super(MergedComputeGroup.class.getSimpleName(), MergedComputeGroup.class.getSimpleName(), systemInfoService); + this.computeGroupSet = computeGroupSet; + } + + @Override + public boolean containsBackend(String computeGroupName) { + return computeGroupSet.contains(computeGroupName); + } + + @Override + public ImmutableList<Backend> getBackendList() { + return systemInfoService.getBackendListByComputeGroup(computeGroupSet); + } + + @Override + public String getId() { + throw new RuntimeException("MergedComputeGroup not implements getId."); + } + + @Override + public String getName() { + throw new RuntimeException("MergedComputeGroup not implements getName."); + } + + // current main for UT + public Set<String> getComputeGroupNameSet() { + return computeGroupSet; + } + + @Override + public String toString() { + return String.format("%s %s", MergedComputeGroup.class.getSimpleName(), String.join(",", computeGroupSet)); + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java index 0d72c40cf3f..384b126aba0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java @@ -185,7 +185,8 @@ public class BeSelectionPolicy { } public List<Backend> getCandidateBackends(Collection<Backend> backends) { - List<Backend> filterBackends = backends.stream().filter(this::isMatch).collect(Collectors.toList()); + List<Backend> filterBackends = backends.stream().filter(this::isMatch) + .collect(Collectors.toList()); List<Backend> preLocationFilterBackends = filterBackends.stream() .filter(iterm -> preferredLocations.contains(iterm.getHost())).collect(Collectors.toList()); // If preLocations were chosen, use the preLocation backends. Otherwise we just ignore this filter. diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java index 3d0df46ca69..20e625ad239 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java @@ -41,6 +41,7 @@ import org.apache.doris.thrift.TStorageMedium; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -566,17 +567,24 @@ public class SystemInfoService { return sb.toString(); } + public List<Long> selectBackendIdsByPolicy(BeSelectionPolicy policy, int number) { + return selectBackendIdsByPolicy(policy, number, getAllClusterBackendsNoException().values().asList()); + } + /** * Select a set of backends by the given policy. * * @param policy if policy is enableRoundRobin, will update its nextRoundRobinIndex * @param number number of backends which need to be selected. -1 means return as many as possible. * @return return #number of backend ids, - * or empty set if no backends match the policy, or the number of matched backends is less than "number"; + * or empty set if no backends match the policy, or the number of matched backends is less than "number"; */ - public List<Long> selectBackendIdsByPolicy(BeSelectionPolicy policy, int number) { + public List<Long> selectBackendIdsByPolicy(BeSelectionPolicy policy, int number, + List<Backend> backendList) { Preconditions.checkArgument(number >= -1); - List<Backend> candidates = policy.getCandidateBackends(getAllClusterBackendsNoException().values()); + + List<Backend> candidates = policy.getCandidateBackends(backendList); + if (candidates.size() < number || candidates.isEmpty()) { if (LOG.isDebugEnabled()) { LOG.debug("Not match policy: {}. candidates num: {}, expected: {}", policy, candidates.size(), number); @@ -1085,6 +1093,16 @@ public class SystemInfoService { return idToBackendRef; } + public ImmutableList<Backend> getBackendListByComputeGroup(Set<String> cgSet) { + List<Backend> result = new ArrayList<>(); + for (Backend be : idToBackendRef.values()) { + if (cgSet.contains(be.getLocationTag().value)) { + result.add(be); + } + } + return ImmutableList.copyOf(result); + } + // Cloud and NonCloud get all bes public ImmutableMap<Long, Backend> getAllBackendsByAllCluster() throws AnalysisException { return idToBackendRef; @@ -1118,14 +1136,4 @@ public class SystemInfoService { return Env.getCurrentInvertedIndex().getTabletNumByBackendId(beId); } - public List<Backend> getBackendsByPolicy(BeSelectionPolicy beSelectionPolicy) { - try { - return beSelectionPolicy.getCandidateBackends(Env.getCurrentSystemInfo() - .getBackendsByCurrentCluster().values().asList()); - } catch (Throwable t) { - LOG.warn("get backends by policy failed, policy: {}", beSelectionPolicy.toString()); - } - return Lists.newArrayList(); - } - } diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/OlapTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/OlapTableTest.java index 3f6230b99cf..b190229bf71 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/OlapTableTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/OlapTableTest.java @@ -24,6 +24,7 @@ import org.apache.doris.common.io.FastByteArrayOutputStream; import org.apache.doris.common.util.UnitTestUtil; import org.apache.doris.qe.ConnectContext; import org.apache.doris.resource.Tag; +import org.apache.doris.resource.computegroup.ComputeGroup; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TFetchOption; import org.apache.doris.thrift.TStorageType; @@ -39,7 +40,6 @@ import org.junit.Test; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -269,15 +269,15 @@ public class OlapTableTest { Env.getCurrentSystemInfo().addBackend(be1); Env.getCurrentSystemInfo().addBackend(be2); + ConnectContext connectContext = UtFrameUtils.createDefaultCtx(); + connectContext.setQualifiedUser("root"); + OlapTable tab = new OlapTable(); TFetchOption tfetchOption = tab.generateTwoPhaseReadOption(-1); Assert.assertTrue(tfetchOption.nodes_info.nodes.size() == 2); - ConnectContext connectContext = UtFrameUtils.createDefaultCtx(); - Set<Tag> tagSet = new HashSet<>(); - tagSet.add(taga); + connectContext.setComputeGroup(new ComputeGroup("taga", "taga", Env.getCurrentSystemInfo())); - connectContext.setResourceTags(tagSet); TFetchOption tfetchOption2 = tab.generateTwoPhaseReadOption(-1); Assert.assertTrue(tfetchOption2.nodes_info.nodes.size() == 1); ConnectContext.remove(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java index 3b3e2eeedf7..04e097211c9 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java @@ -24,6 +24,7 @@ import org.apache.doris.common.util.LocationPath; import org.apache.doris.datasource.FederationBackendPolicy; import org.apache.doris.datasource.FileSplit; import org.apache.doris.datasource.NodeSelectionStrategy; +import org.apache.doris.resource.computegroup.ComputeGroupMgr; import org.apache.doris.spi.Split; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; @@ -69,11 +70,17 @@ public class FederationBackendPolicyTest { backend3.setAlive(true); service.addBackend(backend3); + ComputeGroupMgr cgmgr = new ComputeGroupMgr(service); new MockUp<Env>() { @Mock public SystemInfoService getCurrentSystemInfo() { return service; } + + @Mock + public ComputeGroupMgr getComputeGroupMgr() { + return cgmgr; + } }; List<Split> splits = new ArrayList<>(); @@ -134,11 +141,17 @@ public class FederationBackendPolicyTest { backend3.setAlive(true); service.addBackend(backend3); + ComputeGroupMgr cgmgr = new ComputeGroupMgr(service); new MockUp<Env>() { @Mock public SystemInfoService getCurrentSystemInfo() { return service; } + + @Mock + public ComputeGroupMgr getComputeGroupMgr() { + return cgmgr; + } }; List<Split> splits = new ArrayList<>(); @@ -228,11 +241,17 @@ public class FederationBackendPolicyTest { backend3.setAlive(true); service.addBackend(backend3); + ComputeGroupMgr cgmgr = new ComputeGroupMgr(service); new MockUp<Env>() { @Mock public SystemInfoService getCurrentSystemInfo() { return service; } + + @Mock + public ComputeGroupMgr getComputeGroupMgr() { + return cgmgr; + } }; List<Split> splits = new ArrayList<>(); @@ -307,11 +326,17 @@ public class FederationBackendPolicyTest { @Test public void testGenerateRandomly() throws UserException { SystemInfoService service = new SystemInfoService(); + ComputeGroupMgr cgmgr = new ComputeGroupMgr(service); new MockUp<Env>() { @Mock public SystemInfoService getCurrentSystemInfo() { return service; } + + @Mock + public ComputeGroupMgr getComputeGroupMgr() { + return cgmgr; + } }; Random random = new Random(); @@ -425,11 +450,17 @@ public class FederationBackendPolicyTest { @Test public void testNonAliveNodes() throws UserException { SystemInfoService service = new SystemInfoService(); + ComputeGroupMgr cgmgr = new ComputeGroupMgr(service); new MockUp<Env>() { @Mock public SystemInfoService getCurrentSystemInfo() { return service; } + + @Mock + public ComputeGroupMgr getComputeGroupMgr() { + return cgmgr; + } }; Random random = new Random(); @@ -599,11 +630,17 @@ public class FederationBackendPolicyTest { backend3.setAlive(true); service.addBackend(backend3); + ComputeGroupMgr cgmgr = new ComputeGroupMgr(service); new MockUp<Env>() { @Mock public SystemInfoService getCurrentSystemInfo() { return service; } + + @Mock + public ComputeGroupMgr getComputeGroupMgr() { + return cgmgr; + } }; List<Split> splits = new ArrayList<>(); @@ -771,11 +808,17 @@ public class FederationBackendPolicyTest { backend3.setAlive(true); service.addBackend(backend3); + ComputeGroupMgr cgmgr = new ComputeGroupMgr(service); new MockUp<Env>() { @Mock public SystemInfoService getCurrentSystemInfo() { return service; } + + @Mock + public ComputeGroupMgr getComputeGroupMgr() { + return cgmgr; + } }; List<Split> splits = new ArrayList<>(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java index 850d8b27b06..1a8b27f149a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java @@ -41,6 +41,9 @@ import org.apache.doris.mysql.privilege.Auth; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.DdlExecutor; import org.apache.doris.resource.Tag; +import org.apache.doris.resource.computegroup.AllBackendComputeGroup; +import org.apache.doris.resource.computegroup.ComputeGroup; +import org.apache.doris.resource.computegroup.MergedComputeGroup; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TDisk; import org.apache.doris.thrift.TStorageMedium; @@ -195,17 +198,21 @@ public class ResourceTagQueryTest { Database db = Env.getCurrentInternalCatalog().getDbNullable("test"); OlapTable tbl = (OlapTable) db.getTableNullable("tbl1"); - Set<Tag> userTags = Env.getCurrentEnv().getAuth().getResourceTags(Auth.ROOT_USER); - Assert.assertEquals(0, userTags.size()); + ComputeGroup cg1 = Env.getCurrentEnv().getAuth().getComputeGroup(Auth.ROOT_USER); + Assert.assertTrue(cg1 instanceof AllBackendComputeGroup); // set default tag for root String setPropStr = "set property for 'root' 'resource_tags.location' = 'default';"; ExceptionChecker.expectThrowsNoException(() -> setProperty(setPropStr)); - userTags = Env.getCurrentEnv().getAuth().getResourceTags(Auth.ROOT_USER); - Assert.assertEquals(1, userTags.size()); + ComputeGroup cg2 = Env.getCurrentEnv().getAuth().getComputeGroup(Auth.ROOT_USER); + Assert.assertTrue(cg2 instanceof MergedComputeGroup); + MergedComputeGroup cg3 = (MergedComputeGroup) cg2; + Set<String> cgNameSet1 = cg3.getComputeGroupNameSet(); + Assert.assertTrue(cgNameSet1.size() == 1); + Assert.assertTrue(cgNameSet1.contains("default")); // update connection context and query - connectContext.setResourceTags(userTags); + connectContext.setComputeGroup(cg2); String queryStr = "explain select * from test.tbl1"; String explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr); System.out.println(explainString); @@ -214,15 +221,15 @@ public class ResourceTagQueryTest { // set zone1 tag for root String setPropStr2 = "set property for 'root' 'resource_tags.location' = 'zone1';"; ExceptionChecker.expectThrowsNoException(() -> setProperty(setPropStr2)); - userTags = Env.getCurrentEnv().getAuth().getResourceTags(Auth.ROOT_USER); - Assert.assertEquals(1, userTags.size()); - for (Tag tag : userTags) { - Assert.assertEquals(tag1, tag); - } + ComputeGroup cg4 = Env.getCurrentEnv().getAuth().getComputeGroup(Auth.ROOT_USER); + Assert.assertTrue(cg4 instanceof MergedComputeGroup); + MergedComputeGroup cg5 = (MergedComputeGroup) cg4; + Assert.assertTrue(cg5.getComputeGroupNameSet().size() == 1); + Assert.assertTrue(cg5.getComputeGroupNameSet().contains("zone1")); // update connection context and query, it will failed because no zone1 backend - connectContext.setResourceTags(userTags); - Assert.assertTrue(connectContext.isResourceTagsSet()); + connectContext.setComputeGroup(cg4); + Assert.assertTrue(connectContext.isSetComputeGroup()); queryStr = "explain select * from test.tbl1"; String error = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr); Assert.assertTrue(error.contains("no queryable replicas")); diff --git a/fe/fe-core/src/test/java/org/apache/doris/resource/ComputeGroupTest.java b/fe/fe-core/src/test/java/org/apache/doris/resource/ComputeGroupTest.java new file mode 100644 index 00000000000..2ad93dfbf82 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/resource/ComputeGroupTest.java @@ -0,0 +1,636 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.resource; + +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.CreateUserStmt; +import org.apache.doris.analysis.SetUserPropertyStmt; +import org.apache.doris.analysis.UserDesc; +import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.ExceptionChecker; +import org.apache.doris.common.FeConstants; +import org.apache.doris.common.LoadException; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.RandomIdentifierGenerator; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.FederationBackendPolicy; +import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.load.loadv2.BrokerLoadJob; +import org.apache.doris.load.routineload.KafkaRoutineLoadJob; +import org.apache.doris.load.routineload.RoutineLoadJob; +import org.apache.doris.load.routineload.RoutineLoadManager; +import org.apache.doris.mysql.privilege.AccessControllerManager; +import org.apache.doris.mysql.privilege.Auth; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.resource.computegroup.AllBackendComputeGroup; +import org.apache.doris.resource.computegroup.CloudComputeGroup; +import org.apache.doris.resource.computegroup.ComputeGroup; +import org.apache.doris.resource.computegroup.ComputeGroupMgr; +import org.apache.doris.resource.computegroup.MergedComputeGroup; +import org.apache.doris.system.Backend; +import org.apache.doris.system.BeSelectionPolicy; +import org.apache.doris.system.SystemInfoService; +import org.apache.doris.utframe.UtFrameUtils; + +import com.google.common.collect.Sets; +import mockit.Expectations; +import mockit.Mocked; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.Set; + +public class ComputeGroupTest { + private static final Logger LOG = LogManager.getLogger(ComputeGroupTest.class); + + // use a unique dir so that it won't be conflict with other unit test which + // may also start a Mocked Frontend + private static String runningDirBase = "fe"; + private static ConnectContext connectContext; + + private Auth auth; + @Mocked + public Env env; + @Mocked + private Analyzer analyzer; + @Mocked + AccessControllerManager accessManager; + + @BeforeClass + public static void beforeClass() throws Exception { + FeConstants.runningUnitTest = true; + connectContext = UtFrameUtils.createDefaultCtx(); + } + + @Before + public void setUp() throws MetaNotFoundException { + auth = new Auth(); + accessManager = new AccessControllerManager(auth); + + new Expectations() { + { + Env.getCurrentEnv(); + minTimes = 0; + result = env; + + env.getAuth(); + minTimes = 0; + result = auth; + + analyzer.getDefaultCatalog(); + minTimes = 0; + result = InternalCatalog.INTERNAL_CATALOG_NAME; + + accessManager.checkGlobalPriv((ConnectContext) any, PrivPredicate.ADMIN); + minTimes = 0; + result = true; + + accessManager.checkGlobalPriv((ConnectContext) any, PrivPredicate.GRANT); + minTimes = 0; + result = true; + + accessManager.checkGlobalPriv((ConnectContext) any, PrivPredicate.OPERATOR); + minTimes = 0; + result = true; + + accessManager.checkGlobalPriv((ConnectContext) any, PrivPredicate.CREATE); + minTimes = 0; + result = true; + } + }; + } + + // @AfterClass + // public static void tearDown() { + // UtFrameUtils.cleanDorisFeDir(runningDirBase); + // } + + private static void setProperty(String sql) throws Exception { + SetUserPropertyStmt setUserPropertyStmt = (SetUserPropertyStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext); + Env.getCurrentEnv().getAuth().updateUserProperty(setUserPropertyStmt); + } + + @Test + public void testGetSetResourceTagFromAuth() throws Exception { + new Expectations() { + { + Env.getCurrentEnv().getComputeGroupMgr(); + minTimes = 0; + result = new ComputeGroupMgr(null); + } + }; + + // 1 get no user + { + String invalidUser = RandomIdentifierGenerator.generateRandomIdentifier(8); + ComputeGroup cg1 = auth.getComputeGroup(invalidUser); + Assert.assertTrue(cg1 == ComputeGroup.INVALID_COMPUTE_GROUP); + } + // 2 get a non-admin user without resource tag + { + String nonAdminUserStr = "non_admin_user"; + UserIdentity nonAdminUser = new UserIdentity(nonAdminUserStr, "%"); + UserDesc nonAdminUserDesc = new UserDesc(nonAdminUser, "12345", true); + + CreateUserStmt createNonAdminUser = new CreateUserStmt(false, nonAdminUserDesc, null); + createNonAdminUser.analyze(analyzer); + auth.createUser(createNonAdminUser); + ComputeGroup cg = auth.getComputeGroup(nonAdminUserStr); + Assert.assertTrue(cg instanceof MergedComputeGroup); + Assert.assertTrue(((MergedComputeGroup) cg).getComputeGroupNameSet().contains(Tag.VALUE_DEFAULT_TAG)); + + // 2.1 get a non-admin user with resource tag + String setPropStr = "set property for '" + nonAdminUserStr + "' 'resource_tags.location' = 'test_rg1';"; + ExceptionChecker.expectThrowsNoException(() -> setProperty(setPropStr)); + ComputeGroup cg2 = auth.getComputeGroup(nonAdminUserStr); + Assert.assertTrue(cg2 instanceof MergedComputeGroup); + Assert.assertTrue(((MergedComputeGroup) cg2).getComputeGroupNameSet().contains("test_rg1")); + + // 2.2 get a non-admin user with multi-resource tag + String setPropStr2 = "set property for '" + nonAdminUserStr + + "' 'resource_tags.location' = 'test_rg1,test_rg2';"; + ExceptionChecker.expectThrowsNoException(() -> setProperty(setPropStr2)); + ComputeGroup cg3 = auth.getComputeGroup(nonAdminUserStr); + Assert.assertTrue(cg3 instanceof MergedComputeGroup); + Set<String> cgNameSet = ((MergedComputeGroup) cg3).getComputeGroupNameSet(); + Assert.assertTrue(cgNameSet.contains("test_rg1")); + Assert.assertTrue(cgNameSet.contains("test_rg2")); + Assert.assertTrue(cgNameSet.size() == 2); + + // 2.3 get a non-admin user with empty tag + String setPropStr3 = "set property for '" + nonAdminUserStr + + "' 'resource_tags.location' = '';"; + ExceptionChecker.expectThrowsNoException(() -> setProperty(setPropStr3)); + ComputeGroup cg4 = auth.getComputeGroup(nonAdminUserStr); + Assert.assertTrue(cg4 instanceof MergedComputeGroup); + Set<String> cgNameSets = ((MergedComputeGroup) cg4).getComputeGroupNameSet(); + Assert.assertTrue(cgNameSets.size() == 1); + Assert.assertTrue(cgNameSets.contains("default")); + } + + // 4 get an admin user without resource tag + { + ComputeGroup cg1 = auth.getComputeGroup("root"); + Assert.assertTrue(cg1 instanceof AllBackendComputeGroup); + + // 4.1 get an admin user with a resource tag + String setPropStr = "set property for 'root' 'resource_tags.location' = 'test_rg2';"; + ExceptionChecker.expectThrowsNoException(() -> setProperty(setPropStr)); + ComputeGroup cg2 = auth.getComputeGroup("root"); + Assert.assertTrue(cg2 instanceof MergedComputeGroup); + Assert.assertTrue(((MergedComputeGroup) cg2).getComputeGroupNameSet().contains("test_rg2")); + + + // 4.2 get an admin user with an empty resource tag + String setPropStr2 = "set property for 'root' 'resource_tags.location' = '';"; + ExceptionChecker.expectThrowsNoException(() -> setProperty(setPropStr2)); + ComputeGroup cg3 = auth.getComputeGroup("root"); + Assert.assertTrue(cg3 instanceof AllBackendComputeGroup); + } + } + + @Test + public void testComputeGroup() { + // test invalid compute group + { + try { + ComputeGroup.INVALID_COMPUTE_GROUP.getBackendList(); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains("invalid compute group can not be used")); + } + + try { + ComputeGroup.INVALID_COMPUTE_GROUP.containsBackend(""); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains("invalid compute group can not be used")); + } + + String invalidCgToString = ComputeGroup.INVALID_COMPUTE_GROUP.toString(); + String expectedCgString = String.format("%s id=%s, name=%s", + ComputeGroup.INVALID_COMPUTE_GROUP.getClass().getSimpleName(), + ComputeGroup.INVALID_COMPUTE_GROUP_NAME, ComputeGroup.INVALID_COMPUTE_GROUP_NAME); + Assert.assertTrue(expectedCgString.equals(invalidCgToString)); + } + + // test Compute group + { + String cgId = "test_cg_id"; + String cgName = "test_cg_1"; + ComputeGroup cg1 = new ComputeGroup(cgId, cgName, null); + Assert.assertTrue(cgId.equals(cg1.getId())); + Assert.assertTrue(cgName.equals(cg1.getName())); + String cg1ToString = String.format("%s id=%s, name=%s", ComputeGroup.class.getSimpleName(), cgId, cgName); + Assert.assertTrue(cg1ToString.equals(cg1.toString())); + Assert.assertTrue(cg1.containsBackend(cgName)); + Assert.assertFalse(cg1.containsBackend("123")); + } + + // test Cloud Compute group + { + String cgId = "test_cloud_cg_id"; + String cgName = "test_cloud_cg_name"; + ComputeGroup cg1 = new CloudComputeGroup(cgId, cgName, null); + Assert.assertTrue(cgId.equals(cg1.getId())); + Assert.assertTrue(cgName.equals(cg1.getName())); + String cg1ToString = String.format("%s id=%s, name=%s", CloudComputeGroup.class.getSimpleName(), cgId, + cgName); + Assert.assertTrue(cg1ToString.equals(cg1.toString())); + Assert.assertTrue(cg1.containsBackend(cgName)); + Assert.assertFalse(cg1.containsBackend("123")); + } + + // test MergedComputeGroup + { + Set<String> emptyTags = Sets.newHashSet(); + String cgName = "merged_cg_name"; + ComputeGroup emptyMergedCg = new MergedComputeGroup(emptyTags, null); + try { + emptyMergedCg.getId(); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains("MergedComputeGroup not implements getId")); + } + + try { + emptyMergedCg.getName(); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains("MergedComputeGroup not implements getName")); + } + + String mergedCgToString = String.format("%s %s", MergedComputeGroup.class.getSimpleName(), + String.join(",", emptyTags)); + Assert.assertTrue(mergedCgToString.equals(emptyMergedCg.toString())); + Assert.assertFalse(emptyMergedCg.containsBackend(cgName)); + + Set<String> tags = Sets.newHashSet(); + tags.add(cgName); + ComputeGroup notEmptyMergedCg = new MergedComputeGroup(tags, null); + String mergedCgToString2 = String.format("%s %s", MergedComputeGroup.class.getSimpleName(), + String.join(",", tags)); + Assert.assertTrue(mergedCgToString2.equals(notEmptyMergedCg.toString())); + Assert.assertTrue(notEmptyMergedCg.containsBackend(cgName)); + } + + // test AllBackendComputeGroup + { + AllBackendComputeGroup allBeCg = new AllBackendComputeGroup(null); + try { + allBeCg.getName(); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains("AllBackendComputeGroup not implements getName")); + } + + try { + allBeCg.getId(); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains("AllBackendComputeGroup not implements getId")); + } + + Assert.assertTrue(allBeCg.getClass().getSimpleName().equals(allBeCg.toString())); + } + + // test equals + { + String cgName1 = "cg_name1"; + String cgId1 = "cg_id1"; + ComputeGroup cg1 = new ComputeGroup(cgId1, cgName1, null); + + String cgName11 = "cg_name11"; + ComputeGroup cg11 = new ComputeGroup(cgId1, cgName11, null); + + String cgName2 = "cg_name2"; + String cgId2 = "cg_id2"; + ComputeGroup cg2 = new ComputeGroup(cgId2, cgName2, null); + + ComputeGroup cg3 = new ComputeGroup(cgId1, cgName1, null); + + Assert.assertFalse(cg1.equals(cg2)); + Assert.assertFalse(cg2.equals(cg1)); + + Assert.assertFalse(cg1.equals(cg3)); + Assert.assertFalse(cg3.equals(cg1)); + + Assert.assertFalse(cg1.equals(cg11)); + Assert.assertFalse(cg11.equals(cg1)); + + Assert.assertFalse(cg1.equals(null)); + + CloudComputeGroup cloudCg1 = new CloudComputeGroup(cgId1, cgName1, null); + CloudComputeGroup cloudCg2 = new CloudComputeGroup(cgId2, cgName2, null); + Assert.assertFalse(cloudCg1.equals(cloudCg2)); + Assert.assertFalse(cloudCg2.equals(cloudCg1)); + + AllBackendComputeGroup allBecg1 = new AllBackendComputeGroup(null); + AllBackendComputeGroup allBecg2 = new AllBackendComputeGroup(null); + Assert.assertFalse(allBecg1.equals(allBecg2)); + Assert.assertFalse(allBecg2.equals(allBecg1)); + + MergedComputeGroup mergedCg1 = new MergedComputeGroup(null, null); + MergedComputeGroup mergedCg2 = new MergedComputeGroup(null, null); + Assert.assertFalse(mergedCg1.equals(mergedCg2)); + Assert.assertFalse(mergedCg2.equals(mergedCg1)); + + // ComputeGroup vs others + Assert.assertTrue(cg1.equals(cg1)); + Assert.assertFalse(cg1.equals(cloudCg1)); + Assert.assertFalse(cg1.equals(allBecg1)); + Assert.assertFalse(cg1.equals(mergedCg1)); + + // CloudComputeGroup vs others + Assert.assertTrue(cloudCg1.equals(cloudCg1)); + Assert.assertFalse(cloudCg1.equals(cg1)); + Assert.assertFalse(cloudCg1.equals(allBecg1)); + Assert.assertFalse(cloudCg1.equals(mergedCg1)); + + // AllBackendComputeGroup vs others + Assert.assertTrue(allBecg1.equals(allBecg1)); + Assert.assertFalse(allBecg1.equals(cg1)); + Assert.assertFalse(allBecg1.equals(cloudCg1)); + Assert.assertFalse(allBecg1.equals(mergedCg1)); + + // MergedComputeGroup vs others + Assert.assertTrue(mergedCg1.equals(mergedCg1)); + Assert.assertFalse(mergedCg1.equals(cg1)); + Assert.assertFalse(mergedCg1.equals(allBecg1)); + Assert.assertFalse(mergedCg1.equals(cloudCg1)); + } + } + + @Test + public void testComputeGroupMgr() throws Exception { + SystemInfoService systemInfoService = new SystemInfoService(); + ComputeGroupMgr cgmgr = new ComputeGroupMgr(systemInfoService); + + Tag beTag1 = Tag.create(Tag.TYPE_LOCATION, "be_tag1"); + Tag beTag2 = Tag.create(Tag.TYPE_LOCATION, "be_tag2"); + Tag beTag3 = Tag.create(Tag.TYPE_LOCATION, "be_tag3"); + + { + Backend be1 = new Backend(10001, "192.168.1.1", 9050); + be1.setTagMap(beTag1.toMap()); + be1.setAlive(true); + + Backend be2 = new Backend(10002, "192.168.1.2", 9050); + be2.setTagMap(beTag1.toMap()); + be2.setAlive(true); + + systemInfoService.addBackend(be1); + systemInfoService.addBackend(be2); + } + + { + Backend be1 = new Backend(10003, "192.168.1.3", 9050); + be1.setTagMap(beTag2.toMap()); + be1.setAlive(true); + + Backend be2 = new Backend(10004, "192.168.1.4", 9050); + be2.setTagMap(beTag2.toMap()); + be2.setAlive(true); + systemInfoService.addBackend(be1); + systemInfoService.addBackend(be2); + } + + { + Backend be3 = new Backend(10005, "192.168.1.5", 9050); + be3.setTagMap(beTag3.toMap()); + be3.setAlive(true); + systemInfoService.addBackend(be3); + } + + ComputeGroup cg1 = cgmgr.getComputeGroupByName(beTag1.value); + Assert.assertTrue(cg1.getBackendList().size() == 2); + ComputeGroup cg2 = cgmgr.getComputeGroupByName("abc"); + Assert.assertTrue(cg2.getBackendList().size() == 0); + + Set<Tag> tagSet1 = Sets.newHashSet(beTag1, beTag2); + Assert.assertTrue(cgmgr.getComputeGroup(tagSet1).getBackendList().size() == 4); + + Tag beTag4 = Tag.create(Tag.TYPE_LOCATION, "abc"); + Set<Tag> tagset2 = Sets.newHashSet(beTag4); + Assert.assertTrue(cgmgr.getComputeGroup(tagset2).getBackendList().size() == 0); + + Set<Tag> emptyTagSet = Sets.newHashSet(); + Assert.assertTrue(cgmgr.getComputeGroup(emptyTagSet).getBackendList().size() == 0); + + Assert.assertTrue(cgmgr.getAllBackendComputeGroup().getBackendList().size() == 5); + + } + + @Test + public void testConnectContextToFederationBackendPolicy() throws UserException, IOException { + SystemInfoService systemInfoService = new SystemInfoService(); + + Tag beTag1 = Tag.create(Tag.TYPE_LOCATION, "be_tag1"); + Tag beTag2 = Tag.DEFAULT_BACKEND_TAG; + Backend defaultBe = null; + Backend tag1Be = null; + + { + tag1Be = new Backend(10001, "192.168.1.1", 9050); + tag1Be.setTagMap(beTag1.toMap()); + tag1Be.setAlive(true); + + defaultBe = new Backend(10002, "192.168.1.2", 9050); + defaultBe.setTagMap(beTag2.toMap()); + defaultBe.setAlive(true); + + systemInfoService.addBackend(tag1Be); + systemInfoService.addBackend(defaultBe); + } + + + ComputeGroupMgr cgmgr = new ComputeGroupMgr(systemInfoService); + new Expectations() { + { + Env.getCurrentEnv().getComputeGroupMgr(); + minTimes = 0; + result = cgmgr; + } + }; + BeSelectionPolicy beSelPolicy = new BeSelectionPolicy.Builder().build(); + + + // 1 when connectctx is null, return default tag be. + { + ConnectContext.remove(); + FederationBackendPolicy fbPolicy = new FederationBackendPolicy(); + fbPolicy.init(beSelPolicy); + Assert.assertTrue(fbPolicy.getBackends().size() == 1); + Assert.assertTrue(fbPolicy.getBackends().contains(defaultBe)); + } + + // 2 get compute group from connect ctx + { + ConnectContext.remove(); + ConnectContext context = new ConnectContext(); + context.setThreadLocalInfo(); + context.setComputeGroup(cgmgr.getComputeGroupByName(beTag1.value)); + FederationBackendPolicy fbPolicy = new FederationBackendPolicy(); + fbPolicy.init(beSelPolicy); + Assert.assertTrue(fbPolicy.getBackends().size() == 1); + Assert.assertTrue(fbPolicy.getBackends().contains(tag1Be)); + } + + // 3 test set invalid compute group + { + ConnectContext.remove(); + ConnectContext context = new ConnectContext(); + context.setThreadLocalInfo(); + context.setComputeGroup(ComputeGroup.INVALID_COMPUTE_GROUP); + FederationBackendPolicy fbPolicy = new FederationBackendPolicy(); + try { + fbPolicy.init(beSelPolicy); + } catch (UserException e) { + Assert.assertTrue(e.getMessage().contains(ComputeGroup.INVALID_COMPUTE_GROUP_ERR_MSG)); + } + } + + // 4 test get compute group from user property + { + ConnectContext.remove(); + UtFrameUtils.createDefaultCtx(); + String setPropStr = "set property for 'root' 'resource_tags.location' = '" + beTag1.value + "';"; + ExceptionChecker.expectThrowsNoException(() -> setProperty(setPropStr)); + FederationBackendPolicy fbPolicy = new FederationBackendPolicy(); + fbPolicy.init(beSelPolicy); + Assert.assertTrue(fbPolicy.getBackends().size() == 1); + Assert.assertTrue(fbPolicy.getBackends().contains(tag1Be)); + } + } + + @Test + public void testBrokerLoadToConnectContext() throws UserException, IOException { + new Expectations() { + { + Env.getCurrentEnv().getComputeGroupMgr(); + minTimes = 0; + result = new ComputeGroupMgr(null); + } + }; + + // test user is empty string + { + ConnectContext.remove(); + UserIdentity emptyUser = new UserIdentity("", "%"); + emptyUser.setIsAnalyzed(); + BrokerLoadJob brokerLoadJob = + new BrokerLoadJob(1, null, null, null, emptyUser); + brokerLoadJob.setComputeGroup(); + Assert.assertTrue(ConnectContext.get().getComputeGroupSafely() instanceof AllBackendComputeGroup); + } + + // test invalid user + { + ConnectContext.remove(); + UserIdentity emptyUser = new UserIdentity(RandomIdentifierGenerator.generateRandomIdentifier(8), "%"); + emptyUser.setIsAnalyzed(); + BrokerLoadJob brokerLoadJob = + new BrokerLoadJob(1, null, null, null, emptyUser); + brokerLoadJob.setComputeGroup(); + Assert.assertTrue(ConnectContext.get().getComputeGroupSafely() instanceof AllBackendComputeGroup); + } + + // test get cg from user property + { + + UtFrameUtils.createDefaultCtx(); + String nonAdminUserStr = "non_admin_user"; + UserIdentity nonAdminUser = new UserIdentity(nonAdminUserStr, "%"); + UserDesc nonAdminUserDesc = new UserDesc(nonAdminUser, "12345", true); + + CreateUserStmt createNonAdminUser = new CreateUserStmt(false, nonAdminUserDesc, null); + createNonAdminUser.analyze(analyzer); + auth.createUser(createNonAdminUser); + + String tagName = "tag_rg_1"; + String setPropStr = "set property for '" + nonAdminUserStr + "' 'resource_tags.location' = '" + tagName + "';"; + ExceptionChecker.expectThrowsNoException(() -> setProperty(setPropStr)); + + BrokerLoadJob brokerLoadJob = + new BrokerLoadJob(1, null, null, null, nonAdminUser); + brokerLoadJob.setComputeGroup(); + ComputeGroup cg = ConnectContext.get().getComputeGroupSafely(); + Assert.assertTrue(cg instanceof MergedComputeGroup); + Assert.assertTrue(((MergedComputeGroup) cg).getComputeGroupNameSet().contains(tagName)); + + } + } + + @Test + public void testRoutineLoadToConnectContext() throws Exception { + new Expectations() { + { + Env.getCurrentEnv().getComputeGroupMgr(); + minTimes = 0; + result = new ComputeGroupMgr(null); + + Env.getCurrentSystemInfo(); + minTimes = 0; + result = new SystemInfoService(); + } + }; + + + + // 1 ctx's user is empty, return all backend + { + ConnectContext ctx = UtFrameUtils.createDefaultCtx(); + ctx.setQualifiedUser(null); + RoutineLoadJob job = new KafkaRoutineLoadJob(); + job.setComputeGroup(); + Assert.assertTrue(ConnectContext.get().getComputeGroupSafely() instanceof AllBackendComputeGroup); + } + + + // 2 set an invalid user, get an invalid compute group, then return all backends + { + ConnectContext.get().setQualifiedUser("xxxx"); + RoutineLoadJob job = new KafkaRoutineLoadJob(); + job.setComputeGroup(); + Assert.assertTrue(ConnectContext.get().getComputeGroupSafely() instanceof AllBackendComputeGroup); + } + + // 3 get a valid compute group + { + ConnectContext.get().setQualifiedUser("root"); + String setPropStr = "set property for 'root' 'resource_tags.location' = 'tag_rg_1';"; + ExceptionChecker.expectThrowsNoException(() -> setProperty(setPropStr)); + RoutineLoadJob job = new KafkaRoutineLoadJob(); + job.setComputeGroup(); + ComputeGroup cg = ConnectContext.get().getComputeGroupSafely(); + Assert.assertTrue(cg instanceof MergedComputeGroup); + Assert.assertTrue(((MergedComputeGroup) cg).getComputeGroupNameSet().contains("tag_rg_1")); + } + + // 4 get a null job + { + RoutineLoadManager routineLoadManager = new RoutineLoadManager(); + try { + routineLoadManager.getAvailableBackendIdsForUt(1); + } catch (LoadException e) { + Assert.assertTrue(e.getMessage().contains("does not exist")); + } + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org