APACHE-KYLIN-2902: add query pending request limit to control thread consumption for each project
Signed-off-by: Zhong <[email protected]> Signed-off-by: lidongsjtu <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f6b2d704 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f6b2d704 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f6b2d704 Branch: refs/heads/master Commit: f6b2d7042c27a8e5eba2842bf478fb85ef3cf874 Parents: e7a3245 Author: Wang Ken <[email protected]> Authored: Fri Dec 1 16:53:14 2017 +0800 Committer: lidongsjtu <[email protected]> Committed: Wed Dec 20 23:20:11 2017 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 4 + .../kylin/metadata/project/ProjectInstance.java | 12 ++- .../kylin/metadata/project/ProjectManager.java | 2 +- .../java/org/apache/kylin/rest/msg/Message.java | 4 + .../apache/kylin/rest/service/QueryService.java | 33 ++++++-- .../org/apache/kylin/rest/util/RequestUtil.java | 84 ++++++++++++++++++++ 6 files changed, 131 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/f6b2d704/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 1b3aa03..4917e0e 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -1131,6 +1131,10 @@ abstract public class KylinConfigBase implements Serializable { return Integer.parseInt(getOptional("kylin.query.scan-threshold", "10000000")); } + public int getQueryConcurrentRunningThresholdForProject() { + return Integer.parseInt(getOptional("kylin.query.project-concurrent-running-threshold", "20")); + } + public long getQueryMaxScanBytes() { long value = Long.parseLong(getOptional("kylin.query.max-scan-bytes", "0")); return value > 0 ? value : Long.MAX_VALUE; http://git-wip-us.apache.org/repos/asf/kylin/blob/f6b2d704/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java index 1f54416..a7d37e7 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java @@ -27,6 +27,8 @@ import java.util.TreeSet; import javax.annotation.Nullable; import org.apache.commons.lang3.StringUtils; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.KylinConfigExt; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.RootPersistentEntity; import org.apache.kylin.metadata.realization.RealizationType; @@ -84,6 +86,8 @@ public class ProjectInstance extends RootPersistentEntity { @JsonInclude(JsonInclude.Include.NON_NULL) private LinkedHashMap<String, String> overrideKylinProps; + private KylinConfigExt config; + public String getResourcePath() { return concatResourcePath(name); } @@ -304,7 +308,7 @@ public class ProjectInstance extends RootPersistentEntity { this.overrideKylinProps = overrideKylinProps; } - public void init() { + public void init(KylinConfig config) { if (name == null) name = ProjectInstance.DEFAULT_PROJECT_NAME; @@ -321,6 +325,12 @@ public class ProjectInstance extends RootPersistentEntity { if (StringUtils.isBlank(this.name)) throw new IllegalStateException("Project name must not be blank"); + + this.config = KylinConfigExt.createInstance(config, overrideKylinProps); + } + + public KylinConfig getConfig() { + return config; } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/f6b2d704/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java index b4431b4..4f2678f 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java @@ -155,7 +155,7 @@ public class ProjectManager { return null; } - projectInstance.init(); + projectInstance.init(config); projectMap.putLocal(projectInstance.getName(), projectInstance); clearL2Cache(); http://git-wip-us.apache.org/repos/asf/kylin/blob/f6b2d704/server-base/src/main/java/org/apache/kylin/rest/msg/Message.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/msg/Message.java b/server-base/src/main/java/org/apache/kylin/rest/msg/Message.java index a881c86..02e4020 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/msg/Message.java +++ b/server-base/src/main/java/org/apache/kylin/rest/msg/Message.java @@ -334,6 +334,10 @@ public class Message { return "Not Supported SQL."; } + public String getQUERY_TOO_MANY_RUNNING() { + return "Too many concurrent query requests."; + } + public String getTABLE_META_INCONSISTENT() { return "Table metadata inconsistent with JDBC meta."; } http://git-wip-us.apache.org/repos/asf/kylin/blob/f6b2d704/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java index 9c3d34f..2cdcafb 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -98,6 +98,7 @@ import org.apache.kylin.rest.request.PrepareSqlRequest; import org.apache.kylin.rest.request.SQLRequest; import org.apache.kylin.rest.response.SQLResponse; import org.apache.kylin.rest.util.AclEvaluate; +import org.apache.kylin.rest.util.RequestUtil; import org.apache.kylin.rest.util.TableauInterceptor; import org.apache.kylin.storage.hybrid.HybridInstance; import org.slf4j.Logger; @@ -392,6 +393,10 @@ public class QueryService extends BasicService { if (StringUtils.isBlank(sqlRequest.getProject())) { throw new BadRequestException(msg.getEMPTY_PROJECT_NAME()); } + ProjectInstance projectInstance = getProjectManager().getProject(sqlRequest.getProject()); + if (projectInstance == null) { + throw new BadRequestException(msg.getPROJECT_NOT_FOUND()); + } if (sqlRequest.getBackdoorToggles() != null) BackdoorToggles.addToggles(sqlRequest.getBackdoorToggles()); @@ -405,6 +410,19 @@ public class QueryService extends BasicService { logger.info("The original query: " + sql); final boolean isSelect = QueryUtil.isSelectStatement(sql); + final boolean isPushDownUpdateEnabled = kylinConfig.isPushDownEnabled() + && kylinConfig.isPushDownUpdateEnabled(); + + if (!isSelect && !isPushDownUpdateEnabled) { + logger.debug("Directly return exception as the sql is unsupported, and query pushdown is disabled"); + throw new BadRequestException(msg.getNOT_SUPPORTED_SQL()); + } + + if (!RequestUtil.openQueryRequest(projectInstance.getName(), + projectInstance.getConfig().getQueryConcurrentRunningThresholdForProject())) { + logger.warn("Directly return exception as too many concurrent query requests for project:" + project); + throw new BadRequestException(msg.getQUERY_TOO_MANY_RUNNING()); + } long startTime = System.currentTimeMillis(); @@ -417,19 +435,20 @@ public class QueryService extends BasicService { checkCondition(!BackdoorToggles.getDisableCache(), "query cache disabled in BackdoorToggles"); if (queryCacheEnabled) { - sqlResponse = searchQueryInCache(sqlRequest); + try { // to deal with the case that cache searching throws exception + sqlResponse = searchQueryInCache(sqlRequest); + } catch (Throwable e) { + RequestUtil.closeQueryRequest(projectInstance.getName()); + throw e; + } } try { if (null == sqlResponse) { if (isSelect) { sqlResponse = query(sqlRequest); - } else if (kylinConfig.isPushDownEnabled() && kylinConfig.isPushDownUpdateEnabled()) { + } else if (isPushDownUpdateEnabled) { sqlResponse = update(sqlRequest); - } else { - logger.debug( - "Directly return exception as the sql is unsupported, and query pushdown is disabled"); - throw new BadRequestException(msg.getNOT_SUPPORTED_SQL()); } long durationThreshold = kylinConfig.getQueryDurationCacheThreshold(); @@ -481,6 +500,8 @@ public class QueryService extends BasicService { Cache exceptionCache = cacheManager.getCache(EXCEPTION_QUERY_CACHE); exceptionCache.put(new Element(sqlRequest.getCacheKey(), sqlResponse)); } + } finally { + RequestUtil.closeQueryRequest(projectInstance.getName()); } logQuery(sqlRequest, sqlResponse); http://git-wip-us.apache.org/repos/asf/kylin/blob/f6b2d704/server-base/src/main/java/org/apache/kylin/rest/util/RequestUtil.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/util/RequestUtil.java b/server-base/src/main/java/org/apache/kylin/rest/util/RequestUtil.java new file mode 100644 index 0000000..0155306 --- /dev/null +++ b/server-base/src/main/java/org/apache/kylin/rest/util/RequestUtil.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.rest.util; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; + +public class RequestUtil { + private static final Logger logger = LoggerFactory.getLogger(RequestUtil.class); + + private static LoadingCache<String, AtomicInteger> queryRequestMap = CacheBuilder.newBuilder() + .removalListener(new RemovalListener<String, AtomicInteger>() { + @Override + public void onRemoval(RemovalNotification<String, AtomicInteger> notification) { + logger.info("Current running query number " + notification.getValue().get() + " for project " + + notification.getKey() + " is removed due to " + notification.getCause()); + } + }).expireAfterWrite(1, TimeUnit.DAYS).build(new CacheLoader<String, AtomicInteger>() { + @Override + public AtomicInteger load(String s) throws Exception { + return new AtomicInteger(0); + } + }); + + public static boolean openQueryRequest(String project, int maxConcurrentQuery) { + try { + AtomicInteger nRunningQueries = queryRequestMap.get(project); + for (;;) { + int nRunning = nRunningQueries.get(); + if (nRunning < maxConcurrentQuery) { + if (nRunningQueries.compareAndSet(nRunning, nRunning + 1)) { + return true; + } + } else { + return false; + } + } + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + + public static void closeQueryRequest(String project) { + AtomicInteger nRunningQueries = queryRequestMap.getIfPresent(project); + if (nRunningQueries != null) { + nRunningQueries.decrementAndGet(); + } + } + + public static Integer getCurrentRunningQuery(String project) { + AtomicInteger nRunningQueries = queryRequestMap.getIfPresent(project); + if (nRunningQueries != null) { + return nRunningQueries.get(); + } else { + return null; + } + } +}
