This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new b5f447b [ResourceLimit] Add a property to limit user instance num. (#6159) b5f447b is described below commit b5f447b932146ddeca489d00047ecc44263931ba Author: Lijia Liu <liutang...@yeah.net> AuthorDate: Sat Jul 10 10:15:05 2021 +0800 [ResourceLimit] Add a property to limit user instance num. (#6159) Add a property to limit user instance num. --- docs/en/administrator-guide/config/fe_config.md | 7 ++ .../en/administrator-guide/config/user_property.md | 4 + .../Account Management/SET PROPERTY.md | 6 +- docs/zh-CN/administrator-guide/config/fe_config.md | 7 ++ .../administrator-guide/config/user_property.md | 4 + .../Account Management/SET PROPERTY.md | 4 + .../main/java/org/apache/doris/common/Config.java | 8 +- .../org/apache/doris/common/FeMetaVersion.java | 4 +- .../java/org/apache/doris/metric/MetricRepo.java | 13 +++ .../mysql/privilege/CommonUserProperties.java | 65 ++++++++++++++ .../org/apache/doris/mysql/privilege/PaloAuth.java | 9 ++ .../apache/doris/mysql/privilege/UserProperty.java | 98 ++++++++++++---------- .../doris/mysql/privilege/UserPropertyMgr.java | 9 ++ .../main/java/org/apache/doris/qe/Coordinator.java | 2 + .../main/java/org/apache/doris/qe/QeProcessor.java | 2 + .../java/org/apache/doris/qe/QeProcessorImpl.java | 61 +++++++++++++- .../org/apache/doris/catalog/UserPropertyTest.java | 4 + 17 files changed, 259 insertions(+), 48 deletions(-) diff --git a/docs/en/administrator-guide/config/fe_config.md b/docs/en/administrator-guide/config/fe_config.md index 1c8f260..cf5f66d 100644 --- a/docs/en/administrator-guide/config/fe_config.md +++ b/docs/en/administrator-guide/config/fe_config.md @@ -2014,3 +2014,10 @@ Load label cleaner will run every *label_clean_interval_second* to clean the out Default:30 the transaction will be cleaned after transaction_clean_interval_second seconds if the transaction is visible or aborted we should make this interval as short as possible and each clean cycle as soon as possible + + +### `default_max_query_instances` + +The default value when user property max_query_instances is equal or less than 0. This config is used to limit the max number of instances for a user. This parameter is less than or equal to 0 means unlimited. + +The default value is -1。 \ No newline at end of file diff --git a/docs/en/administrator-guide/config/user_property.md b/docs/en/administrator-guide/config/user_property.md index 872349f..27a2d36 100644 --- a/docs/en/administrator-guide/config/user_property.md +++ b/docs/en/administrator-guide/config/user_property.md @@ -60,6 +60,10 @@ User-level configuration items will only take effect for the specified users, an The maximum number of user connections, the default value is 100 In general, this parameter does not need to be changed unless the number of concurrent queries exceeds the default value. +### max_query_instances + + The maximum number of instances that the user can use at a certain point in time, The default value is -1, negative number means use default_max_query_instances config. + ### resource ### quota diff --git a/docs/en/sql-reference/sql-statements/Account Management/SET PROPERTY.md b/docs/en/sql-reference/sql-statements/Account Management/SET PROPERTY.md index c7c72cd..d70c9c4 100644 --- a/docs/en/sql-reference/sql-statements/Account Management/SET PROPERTY.md +++ b/docs/en/sql-reference/sql-statements/Account Management/SET PROPERTY.md @@ -38,7 +38,8 @@ Importing cluster is only applicable to Baidu internal users. key: Super user rights: -Max_user_connections: Maximum number of connections. +max_user_connections: Maximum number of connections. +max_query_instances: Maximum number of query instance user can use when query. resource.cpu_share: cpu resource assignment. Load_cluster. {cluster_name}. priority: assigns priority to a specified cluster, which can be HIGH or NORMAL @@ -77,6 +78,9 @@ SET PROPERTY FOR 'jack' 'default_load_cluster' = '{cluster_name}'; 7. Modify the cluster priority of user Jack to HIGH SET PROPERTY FOR 'jack' 'load_cluster.{cluster_name}.priority' = 'HIGH'; +8. Modify the maximum number of query instance for jack to 3000 +SET PROPERTY FOR 'jack' 'max_query_instances' = '3000'; + ## keyword SET, PROPERTY diff --git a/docs/zh-CN/administrator-guide/config/fe_config.md b/docs/zh-CN/administrator-guide/config/fe_config.md index b23a4cb..5cb3d70 100644 --- a/docs/zh-CN/administrator-guide/config/fe_config.md +++ b/docs/zh-CN/administrator-guide/config/fe_config.md @@ -2019,3 +2019,10 @@ load 标签清理器将每隔 `label_clean_interval_second` 运行一次以清 默认值:30 如果事务 visible 或者 aborted 状态,事务将在 `transaction_clean_interval_second` 秒后被清除 ,我们应该让这个间隔尽可能短,每个清洁周期都尽快 + + +### `default_max_query_instances` + +默认值:-1 + +用户属性max_query_instances小于等于0时,使用该配置,用来限制单个用户同一时刻可使用的查询instance个数。该参数小于等于0表示无限制。 \ No newline at end of file diff --git a/docs/zh-CN/administrator-guide/config/user_property.md b/docs/zh-CN/administrator-guide/config/user_property.md index 76ec9e5..ff57234 100644 --- a/docs/zh-CN/administrator-guide/config/user_property.md +++ b/docs/zh-CN/administrator-guide/config/user_property.md @@ -60,6 +60,10 @@ User 级别的配置项只会对指定用户生效,并不会影响其他用户 用户最大的连接数,默认值为100。一般情况不需要更改该参数,除非查询的并发数超过了默认值。 +### max_query_instances + + 用户同一时间点可使用的instance个数, 默认是-1,小于等于0将会使用配置default_max_query_instances. + ### resource ### quota diff --git a/docs/zh-CN/sql-reference/sql-statements/Account Management/SET PROPERTY.md b/docs/zh-CN/sql-reference/sql-statements/Account Management/SET PROPERTY.md index 41f294f..1f29a6c 100644 --- a/docs/zh-CN/sql-reference/sql-statements/Account Management/SET PROPERTY.md +++ b/docs/zh-CN/sql-reference/sql-statements/Account Management/SET PROPERTY.md @@ -39,6 +39,7 @@ under the License. 超级用户权限: max_user_connections: 最大连接数。 + max_query_instances: 用户同一时间点执行查询可以使用的instance个数。 resource.cpu_share: cpu资源分配。 load_cluster.{cluster_name}.priority: 为指定的cluster分配优先级,可以为 HIGH 或 NORMAL @@ -77,6 +78,9 @@ under the License. 7. 修改用户 jack 的集群优先级为 HIGH SET PROPERTY FOR 'jack' 'load_cluster.{cluster_name}.priority' = 'HIGH'; + 8. 修改用户jack的查询可用instance个数为3000 + SET PROPERTY FOR 'jack' 'max_query_instances' = '3000'; + ## keyword SET, PROPERTY diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 48a7a26..95287e0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -1419,9 +1419,15 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static int max_dynamic_partition_num = 500; - /* + /** * Control the max num of backup/restore job per db */ @ConfField(mutable = true, masterOnly = true) public static int max_backup_restore_job_num_per_db = 10; + + /** + * Control the default max num of the instance for a user. + */ + @ConfField(mutable = true) + public static int default_max_query_instances = -1; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java index b26f372..02a7b30 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -210,6 +210,8 @@ public final class FeMetaVersion { public static final int VERSION_98 = 98; // add audit steam load and change the serialization backend method to json public static final int VERSION_99 = 99; + // for max query instance + public static final int VERSION_100 = 100; // note: when increment meta version, should assign the latest version to VERSION_CURRENT - public static final int VERSION_CURRENT = VERSION_99; + public static final int VERSION_CURRENT = VERSION_100; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java index 9196eea..4d9b072 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java @@ -32,10 +32,12 @@ import org.apache.doris.metric.Metric.MetricUnit; import org.apache.doris.monitor.jvm.JvmService; import org.apache.doris.monitor.jvm.JvmStats; import org.apache.doris.persist.EditLog; +import org.apache.doris.qe.QeProcessorImpl; import org.apache.doris.service.ExecuteEnv; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; +import com.codahale.metrics.Gauge; import com.codahale.metrics.Histogram; import com.codahale.metrics.MetricRegistry; import com.google.common.collect.Sets; @@ -49,6 +51,7 @@ import java.util.Set; import java.util.SortedMap; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.function.BinaryOperator; public final class MetricRepo { private static final Logger LOG = LogManager.getLogger(MetricRepo.class); @@ -302,6 +305,16 @@ public final class MetricRepo { HISTO_QUERY_LATENCY = METRIC_REGISTER.histogram(MetricRegistry.name("query", "latency", "ms")); HISTO_EDIT_LOG_WRITE_LATENCY = METRIC_REGISTER.histogram(MetricRegistry.name("editlog", "write", "latency", "ms")); + METRIC_REGISTER.register(MetricRegistry.name("palo", "fe", "query", "max_instances_num_per_user"), (Gauge<Integer>) () -> { + try{ + return ((QeProcessorImpl)QeProcessorImpl.INSTANCE).getInstancesNumPerUser().values().stream() + .reduce(-1, BinaryOperator.maxBy(Integer::compareTo)); + } catch (Throwable ex) { + LOG.warn("Get max_instances_num_per_user error", ex); + return -2; + } + }); + // init system metrics initSystemMetrics(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/CommonUserProperties.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/CommonUserProperties.java new file mode 100644 index 0000000..0695f8a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/CommonUserProperties.java @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mysql.privilege; + +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.gson.annotations.SerializedName; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * Used in + */ +public class CommonUserProperties implements Writable { + @SerializedName("maxConn") + private long maxConn = 100; + @SerializedName("maxQueryInstances") + private long maxQueryInstances = -1; + + long getMaxConn() { + return maxConn; + } + + long getMaxQueryInstances() { + return maxQueryInstances; + } + + void setMaxConn(long maxConn) { + this.maxConn = maxConn; + } + + void setMaxQueryInstances(long maxQueryInstances) { + this.maxQueryInstances = maxQueryInstances; + } + + public static CommonUserProperties read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, CommonUserProperties.class); + } + + @Override + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); + } +} \ No newline at end of file diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/PaloAuth.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/PaloAuth.java index a49329e..2937413 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/PaloAuth.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/PaloAuth.java @@ -1048,6 +1048,15 @@ public class PaloAuth implements Writable { } } + public long getMaxQueryInstances(String qualifiedUser) { + readLock(); + try { + return propertyMgr.getMaxQueryInstances(qualifiedUser); + } finally { + readUnlock(); + } + } + public void getAllDomains(Set<String> allDomains) { readLock(); try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java index 72c9b71..57578ae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java @@ -39,8 +39,6 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.commons.lang.StringUtils; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import java.io.DataInput; import java.io.DataOutput; @@ -58,9 +56,12 @@ import java.util.regex.Pattern; * This user is just qualified by cluster name, not host which it connected from. */ public class UserProperty implements Writable { - private static final Logger LOG = LogManager.getLogger(UserProperty.class); + // common properties private static final String PROP_MAX_USER_CONNECTIONS = "max_user_connections"; + private static final String PROP_MAX_QUERY_INSTANCES = "max_query_instances"; + // common properties end + private static final String PROP_RESOURCE = "resource"; private static final String PROP_QUOTA = "quota"; private static final String PROP_DEFAULT_LOAD_CLUSTER = "default_load_cluster"; @@ -73,7 +74,8 @@ public class UserProperty implements Writable { private String qualifiedUser; - private long maxConn = 100; + private CommonUserProperties commonProperties = new CommonUserProperties(); + // Resource belong to this user. private UserResource resource = new UserResource(1000); // load cluster @@ -87,20 +89,12 @@ public class UserProperty implements Writable { */ private WhiteList whiteList = new WhiteList(); - @Deprecated - private byte[] password; - @Deprecated - private boolean isAdmin = false; - @Deprecated - private boolean isSuperuser = false; - @Deprecated - private Map<String, AccessPrivilege> dbPrivMap = Maps.newHashMap(); - static { ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_MAX_USER_CONNECTIONS + "$", Pattern.CASE_INSENSITIVE)); ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_RESOURCE + ".", Pattern.CASE_INSENSITIVE)); ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_LOAD_CLUSTER + "." + DppConfig.CLUSTER_NAME_REGEX + "." + DppConfig.PRIORITY + "$", Pattern.CASE_INSENSITIVE)); + ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_MAX_QUERY_INSTANCES + "$", Pattern.CASE_INSENSITIVE)); COMMON_PROPERTIES.add(Pattern.compile("^" + PROP_QUOTA + ".", Pattern.CASE_INSENSITIVE)); COMMON_PROPERTIES.add(Pattern.compile("^" + PROP_DEFAULT_LOAD_CLUSTER + "$", Pattern.CASE_INSENSITIVE)); @@ -120,31 +114,15 @@ public class UserProperty implements Writable { } public long getMaxConn() { - return maxConn; - } - - public WhiteList getWhiteList() { - return whiteList; + return this.commonProperties.getMaxConn(); } - @Deprecated - public byte[] getPassword() { - return password; + public long getMaxQueryInstances() { + return commonProperties.getMaxQueryInstances();// maxQueryInstances; } - @Deprecated - public boolean isAdmin() { - return isAdmin; - } - - @Deprecated - public boolean isSuperuser() { - return isSuperuser; - } - - @Deprecated - public Map<String, AccessPrivilege> getDbPrivMap() { - return dbPrivMap; + public WhiteList getWhiteList() { + return whiteList; } public void setPasswordForDomain(String domain, byte[] password, boolean errOnExist) throws DdlException { @@ -163,7 +141,8 @@ public class UserProperty implements Writable { public void update(List<Pair<String, String>> properties) throws DdlException { // copy - long newMaxConn = maxConn; + long newMaxConn = this.commonProperties.getMaxConn(); + long newMaxQueryInstances = this.commonProperties.getMaxQueryInstances(); UserResource newResource = resource.getCopiedUserResource(); String newDefaultLoadCluster = defaultLoadCluster; Map<String, DppConfig> newDppConfigs = Maps.newHashMap(clusterToDppConfig); @@ -237,13 +216,25 @@ public class UserProperty implements Writable { } newDefaultLoadCluster = value; + } else if (keyArr[0].equalsIgnoreCase(PROP_MAX_QUERY_INSTANCES)) { + // set property "max_query_instances" = "1000" + if (keyArr.length != 1) { + throw new DdlException(PROP_MAX_QUERY_INSTANCES + " format error"); + } + + try { + newMaxQueryInstances = Long.parseLong(value); + } catch (NumberFormatException e) { + throw new DdlException(PROP_MAX_QUERY_INSTANCES + " is not number"); + } } else { throw new DdlException("Unknown user property(" + key + ")"); } } // set - maxConn = newMaxConn; + this.commonProperties.setMaxConn(newMaxConn); + this.commonProperties.setMaxQueryInstances(newMaxQueryInstances); resource = newResource; if (newDppConfigs.containsKey(newDefaultLoadCluster)) { defaultLoadCluster = newDefaultLoadCluster; @@ -326,7 +317,10 @@ public class UserProperty implements Writable { String dot = SetUserPropertyVar.DOT_SEPARATOR; // max user connections - result.add(Lists.newArrayList(PROP_MAX_USER_CONNECTIONS, String.valueOf(maxConn))); + result.add(Lists.newArrayList(PROP_MAX_USER_CONNECTIONS, String.valueOf(commonProperties.getMaxConn()))); + + // max query instance + result.add(Lists.newArrayList(PROP_MAX_QUERY_INSTANCES, String.valueOf(commonProperties.getMaxQueryInstances()))); // resource ResourceGroup group = resource.getResource(); @@ -404,10 +398,12 @@ public class UserProperty implements Writable { return userProperty; } + + @Override public void write(DataOutput out) throws IOException { + // user name Text.writeString(out, qualifiedUser); - out.writeLong(maxConn); // user resource resource.write(out); @@ -419,15 +415,19 @@ public class UserProperty implements Writable { out.writeBoolean(true); Text.writeString(out, defaultLoadCluster); } - out.writeInt(clusterToDppConfig.size()); for (Map.Entry<String, DppConfig> entry : clusterToDppConfig.entrySet()) { Text.writeString(out, entry.getKey()); entry.getValue().write(out); } + // whiteList whiteList.write(out); + + // common properties + commonProperties.write(out); } + public void readFields(DataInput in) throws IOException { if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_43) { // consume the flag of empty user name @@ -443,19 +443,25 @@ public class UserProperty implements Writable { if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_43) { int passwordLen = in.readInt(); - password = new byte[passwordLen]; + byte[] password = new byte[passwordLen]; in.readFully(password); - isAdmin = in.readBoolean(); + // boolean isAdmin + in.readBoolean(); if (Catalog.getCurrentCatalogJournalVersion() >= 1) { - isSuperuser = in.readBoolean(); + // boolean isSuperuser + in.readBoolean(); } } - maxConn = in.readLong(); + if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_100) { + long maxConn = in.readLong(); + this.commonProperties.setMaxConn(maxConn); + } if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_43) { + Map<String, AccessPrivilege> dbPrivMap = Maps.newHashMap(); int numPriv = in.readInt(); for (int i = 0; i < numPriv; ++i) { String dbName = null; @@ -487,6 +493,7 @@ public class UserProperty implements Writable { } } + // whiteList if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_21) { whiteList.readFields(in); if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_69) { @@ -502,5 +509,10 @@ public class UserProperty implements Writable { } } } + + // common properties + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_100) { + this.commonProperties = CommonUserProperties.read(in); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyMgr.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyMgr.java index ca05470..5f9cde8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyMgr.java @@ -19,6 +19,7 @@ package org.apache.doris.mysql.privilege; import org.apache.doris.analysis.UserIdentity; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.Pair; import org.apache.doris.common.io.Writable; @@ -122,6 +123,14 @@ public class UserPropertyMgr implements Writable { return existProperty.getMaxConn(); } + public long getMaxQueryInstances(String qualifiedUser) { + UserProperty existProperty = propertyMap.get(qualifiedUser); + if (existProperty == null) { + return Config.default_max_query_instances; + } + return existProperty.getMaxQueryInstances(); + } + public int getPropertyMapSize() { return propertyMap.size(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index e957dd2..10ae31f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -412,6 +412,8 @@ public class Coordinator { traceInstance(); + QeProcessorImpl.INSTANCE.registerInstances(queryId, instanceIds.size()); + // create result receiver PlanFragmentId topId = fragments.get(0).getFragmentId(); FragmentExecParams topParams = fragmentExecParamsMap.get(topId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessor.java index e65af48..30ecbe6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessor.java @@ -33,6 +33,8 @@ public interface QeProcessor { void registerQuery(TUniqueId queryId, QeProcessorImpl.QueryInfo info) throws UserException; + void registerInstances(TUniqueId queryId, Integer instancesNum) throws UserException; + void unregisterQuery(TUniqueId queryId); Map<String, QueryStatisticsItem> getQueryStatistics(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java index 4105afd..c421b56 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java @@ -17,6 +17,7 @@ package org.apache.doris.qe; +import org.apache.doris.common.Config; import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; @@ -28,19 +29,25 @@ import org.apache.doris.thrift.TStatus; import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TUniqueId; +import com.google.common.base.Strings; import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; public final class QeProcessorImpl implements QeProcessor { private static final Logger LOG = LogManager.getLogger(QeProcessorImpl.class); private Map<TUniqueId, QueryInfo> coordinatorMap; + private Map<TUniqueId, Integer> queryToInstancesNum; + private Map<String, AtomicInteger> userToInstancesCount; + public static final QeProcessor INSTANCE; static { @@ -50,10 +57,12 @@ public final class QeProcessorImpl implements QeProcessor { private ExecutorService writeProfileExecutor; private QeProcessorImpl() { - coordinatorMap = Maps.newConcurrentMap(); + coordinatorMap = new ConcurrentHashMap<>(); // write profile to ProfileManager when query is running. writeProfileExecutor = ThreadPoolManager.newDaemonProfileThreadPool(1, 100, "profile-write-pool", true); + queryToInstancesNum = new ConcurrentHashMap<>(); + userToInstancesCount = new ConcurrentHashMap<>(); } @Override @@ -79,10 +88,58 @@ public final class QeProcessorImpl implements QeProcessor { } } + public void registerInstances(TUniqueId queryId, Integer instancesNum) throws UserException { + if (!coordinatorMap.containsKey(queryId)) { + throw new UserException("query not exists in coordinatorMap:" + DebugUtil.printId(queryId)); + } + QueryInfo queryInfo = coordinatorMap.get(queryId); + if (queryInfo.getConnectContext() != null && + !Strings.isNullOrEmpty(queryInfo.getConnectContext().getQualifiedUser()) + ) { + String user = queryInfo.getConnectContext().getQualifiedUser(); + long maxQueryInstances = queryInfo.getConnectContext().getCatalog().getAuth().getMaxQueryInstances(user); + if (maxQueryInstances <= 0) { + maxQueryInstances = Config.default_max_query_instances; + } + if (maxQueryInstances > 0) { + AtomicInteger currentCount = userToInstancesCount.computeIfAbsent(user, __ -> new AtomicInteger(0)); + // Many query can reach here. + if (instancesNum + currentCount.get() > maxQueryInstances) { + throw new UserException("reach max_query_instances " + maxQueryInstances); + } + } + queryToInstancesNum.put(queryId, instancesNum); + userToInstancesCount.computeIfAbsent(user, __ -> new AtomicInteger(0)).addAndGet(instancesNum); + } + } + + public Map<String, Integer> getInstancesNumPerUser() { + return Maps.transformEntries(userToInstancesCount, (__, value) -> value != null ? value.get() : 0); + } + @Override public void unregisterQuery(TUniqueId queryId) { - if (coordinatorMap.remove(queryId) != null) { + QueryInfo queryInfo = coordinatorMap.remove(queryId); + if (queryInfo != null) { LOG.info("deregister query id {}", DebugUtil.printId(queryId)); + if (queryInfo.getConnectContext() != null && + !Strings.isNullOrEmpty(queryInfo.getConnectContext().getQualifiedUser()) + ) { + Integer num = queryToInstancesNum.remove(queryId); + if (num != null) { + String user = queryInfo.getConnectContext().getQualifiedUser(); + AtomicInteger instancesNum = userToInstancesCount.get(user); + if (instancesNum == null) { + LOG.warn("WTF?? query {} in queryToInstancesNum but not in userToInstancesCount", + DebugUtil.printId(queryId) + ); + } else { + instancesNum.addAndGet(-num); + } + } + } + } else { + LOG.warn("not found query {} when unregisterQuery", DebugUtil.printId(queryId)); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/UserPropertyTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/UserPropertyTest.java index bb1eee6..6fd1bf7 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/UserPropertyTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/UserPropertyTest.java @@ -64,6 +64,7 @@ public class UserPropertyTest { properties.add(Pair.create("quota.normal", "102")); properties.add(Pair.create("load_cluster.dpp-cluster.hadoop_palo_path", "/user/palo2")); properties.add(Pair.create("default_load_cluster", "dpp-cluster")); + properties.add(Pair.create("max_qUERY_instances", "3000")); UserProperty userProperty = new UserProperty(); userProperty.update(properties); @@ -72,6 +73,7 @@ public class UserPropertyTest { Assert.assertEquals(102, userProperty.getResource().getShareByGroup().get("normal").intValue()); Assert.assertEquals("/user/palo2", userProperty.getLoadClusterInfo("dpp-cluster").second.getPaloPath()); Assert.assertEquals("dpp-cluster", userProperty.getDefaultLoadCluster()); + Assert.assertEquals(3000, userProperty.getMaxQueryInstances()); // fetch property List<List<String>> rows = userProperty.fetchProperty(); @@ -89,6 +91,8 @@ public class UserPropertyTest { Assert.assertEquals("/user/palo2", value); } else if (key.equalsIgnoreCase("default_load_cluster")) { Assert.assertEquals("dpp-cluster", value); + } else if (key.equalsIgnoreCase("max_query_instances")) { + Assert.assertEquals("3000", value); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org