APACHE-KYLIN-2902: add query pending request limit to control thread 
consumption for each project

Signed-off-by: Zhong <[email protected]>
Signed-off-by: lidongsjtu <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f6b2d704
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f6b2d704
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f6b2d704

Branch: refs/heads/master
Commit: f6b2d7042c27a8e5eba2842bf478fb85ef3cf874
Parents: e7a3245
Author: Wang Ken <[email protected]>
Authored: Fri Dec 1 16:53:14 2017 +0800
Committer: lidongsjtu <[email protected]>
Committed: Wed Dec 20 23:20:11 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |  4 +
 .../kylin/metadata/project/ProjectInstance.java | 12 ++-
 .../kylin/metadata/project/ProjectManager.java  |  2 +-
 .../java/org/apache/kylin/rest/msg/Message.java |  4 +
 .../apache/kylin/rest/service/QueryService.java | 33 ++++++--
 .../org/apache/kylin/rest/util/RequestUtil.java | 84 ++++++++++++++++++++
 6 files changed, 131 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/f6b2d704/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 1b3aa03..4917e0e 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1131,6 +1131,10 @@ abstract public class KylinConfigBase implements 
Serializable {
         return Integer.parseInt(getOptional("kylin.query.scan-threshold", 
"10000000"));
     }
 
+    public int getQueryConcurrentRunningThresholdForProject() {
+        return 
Integer.parseInt(getOptional("kylin.query.project-concurrent-running-threshold",
 "20"));
+    }
+
     public long getQueryMaxScanBytes() {
         long value = Long.parseLong(getOptional("kylin.query.max-scan-bytes", 
"0"));
         return value > 0 ? value : Long.MAX_VALUE;

http://git-wip-us.apache.org/repos/asf/kylin/blob/f6b2d704/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
index 1f54416..a7d37e7 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
@@ -27,6 +27,8 @@ import java.util.TreeSet;
 import javax.annotation.Nullable;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.KylinConfigExt;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
 import org.apache.kylin.metadata.realization.RealizationType;
@@ -84,6 +86,8 @@ public class ProjectInstance extends RootPersistentEntity {
     @JsonInclude(JsonInclude.Include.NON_NULL)
     private LinkedHashMap<String, String> overrideKylinProps;
 
+    private KylinConfigExt config;
+
     public String getResourcePath() {
         return concatResourcePath(name);
     }
@@ -304,7 +308,7 @@ public class ProjectInstance extends RootPersistentEntity {
         this.overrideKylinProps = overrideKylinProps;
     }
 
-    public void init() {
+    public void init(KylinConfig config) {
         if (name == null)
             name = ProjectInstance.DEFAULT_PROJECT_NAME;
 
@@ -321,6 +325,12 @@ public class ProjectInstance extends RootPersistentEntity {
 
         if (StringUtils.isBlank(this.name))
             throw new IllegalStateException("Project name must not be blank");
+
+        this.config = KylinConfigExt.createInstance(config, 
overrideKylinProps);
+    }
+
+    public KylinConfig getConfig() {
+        return config;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kylin/blob/f6b2d704/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
 
b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
index b4431b4..4f2678f 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
@@ -155,7 +155,7 @@ public class ProjectManager {
             return null;
         }
 
-        projectInstance.init();
+        projectInstance.init(config);
 
         projectMap.putLocal(projectInstance.getName(), projectInstance);
         clearL2Cache();

http://git-wip-us.apache.org/repos/asf/kylin/blob/f6b2d704/server-base/src/main/java/org/apache/kylin/rest/msg/Message.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/msg/Message.java 
b/server-base/src/main/java/org/apache/kylin/rest/msg/Message.java
index a881c86..02e4020 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/msg/Message.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/msg/Message.java
@@ -334,6 +334,10 @@ public class Message {
         return "Not Supported SQL.";
     }
 
+    public String getQUERY_TOO_MANY_RUNNING() {
+        return "Too many concurrent query requests.";
+    }
+
     public String getTABLE_META_INCONSISTENT() {
         return "Table metadata inconsistent with JDBC meta.";
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/f6b2d704/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 9c3d34f..2cdcafb 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -98,6 +98,7 @@ import org.apache.kylin.rest.request.PrepareSqlRequest;
 import org.apache.kylin.rest.request.SQLRequest;
 import org.apache.kylin.rest.response.SQLResponse;
 import org.apache.kylin.rest.util.AclEvaluate;
+import org.apache.kylin.rest.util.RequestUtil;
 import org.apache.kylin.rest.util.TableauInterceptor;
 import org.apache.kylin.storage.hybrid.HybridInstance;
 import org.slf4j.Logger;
@@ -392,6 +393,10 @@ public class QueryService extends BasicService {
         if (StringUtils.isBlank(sqlRequest.getProject())) {
             throw new BadRequestException(msg.getEMPTY_PROJECT_NAME());
         }
+        ProjectInstance projectInstance = 
getProjectManager().getProject(sqlRequest.getProject());
+        if (projectInstance == null) {
+            throw new BadRequestException(msg.getPROJECT_NOT_FOUND());
+        }
 
         if (sqlRequest.getBackdoorToggles() != null)
             BackdoorToggles.addToggles(sqlRequest.getBackdoorToggles());
@@ -405,6 +410,19 @@ public class QueryService extends BasicService {
             logger.info("The original query:  " + sql);
 
             final boolean isSelect = QueryUtil.isSelectStatement(sql);
+            final boolean isPushDownUpdateEnabled = 
kylinConfig.isPushDownEnabled()
+                    && kylinConfig.isPushDownUpdateEnabled();
+
+            if (!isSelect && !isPushDownUpdateEnabled) {
+                logger.debug("Directly return exception as the sql is 
unsupported, and query pushdown is disabled");
+                throw new BadRequestException(msg.getNOT_SUPPORTED_SQL());
+            }
+
+            if (!RequestUtil.openQueryRequest(projectInstance.getName(),
+                    
projectInstance.getConfig().getQueryConcurrentRunningThresholdForProject())) {
+                logger.warn("Directly return exception as too many concurrent 
query requests for project:" + project);
+                throw new BadRequestException(msg.getQUERY_TOO_MANY_RUNNING());
+            }
 
             long startTime = System.currentTimeMillis();
 
@@ -417,19 +435,20 @@ public class QueryService extends BasicService {
                     checkCondition(!BackdoorToggles.getDisableCache(), "query 
cache disabled in BackdoorToggles");
 
             if (queryCacheEnabled) {
-                sqlResponse = searchQueryInCache(sqlRequest);
+                try { // to deal with the case that cache searching throws 
exception
+                    sqlResponse = searchQueryInCache(sqlRequest);
+                } catch (Throwable e) {
+                    RequestUtil.closeQueryRequest(projectInstance.getName());
+                    throw e;
+                }
             }
 
             try {
                 if (null == sqlResponse) {
                     if (isSelect) {
                         sqlResponse = query(sqlRequest);
-                    } else if (kylinConfig.isPushDownEnabled() && 
kylinConfig.isPushDownUpdateEnabled()) {
+                    } else if (isPushDownUpdateEnabled) {
                         sqlResponse = update(sqlRequest);
-                    } else {
-                        logger.debug(
-                                "Directly return exception as the sql is 
unsupported, and query pushdown is disabled");
-                        throw new 
BadRequestException(msg.getNOT_SUPPORTED_SQL());
                     }
 
                     long durationThreshold = 
kylinConfig.getQueryDurationCacheThreshold();
@@ -481,6 +500,8 @@ public class QueryService extends BasicService {
                     Cache exceptionCache = 
cacheManager.getCache(EXCEPTION_QUERY_CACHE);
                     exceptionCache.put(new Element(sqlRequest.getCacheKey(), 
sqlResponse));
                 }
+            } finally {
+                RequestUtil.closeQueryRequest(projectInstance.getName());
             }
 
             logQuery(sqlRequest, sqlResponse);

http://git-wip-us.apache.org/repos/asf/kylin/blob/f6b2d704/server-base/src/main/java/org/apache/kylin/rest/util/RequestUtil.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/util/RequestUtil.java 
b/server-base/src/main/java/org/apache/kylin/rest/util/RequestUtil.java
new file mode 100644
index 0000000..0155306
--- /dev/null
+++ b/server-base/src/main/java/org/apache/kylin/rest/util/RequestUtil.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.util;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+
+public class RequestUtil {
+    private static final Logger logger = 
LoggerFactory.getLogger(RequestUtil.class);
+
+    private static LoadingCache<String, AtomicInteger> queryRequestMap = 
CacheBuilder.newBuilder()
+            .removalListener(new RemovalListener<String, AtomicInteger>() {
+                @Override
+                public void onRemoval(RemovalNotification<String, 
AtomicInteger> notification) {
+                    logger.info("Current running query number " + 
notification.getValue().get() + " for project "
+                            + notification.getKey() + " is removed due to " + 
notification.getCause());
+                }
+            }).expireAfterWrite(1, TimeUnit.DAYS).build(new 
CacheLoader<String, AtomicInteger>() {
+                @Override
+                public AtomicInteger load(String s) throws Exception {
+                    return new AtomicInteger(0);
+                }
+            });
+
+    public static boolean openQueryRequest(String project, int 
maxConcurrentQuery) {
+        try {
+            AtomicInteger nRunningQueries = queryRequestMap.get(project);
+            for (;;) {
+                int nRunning = nRunningQueries.get();
+                if (nRunning < maxConcurrentQuery) {
+                    if (nRunningQueries.compareAndSet(nRunning, nRunning + 1)) 
{
+                        return true;
+                    }
+                } else {
+                    return false;
+                }
+            }
+        } catch (ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static void closeQueryRequest(String project) {
+        AtomicInteger nRunningQueries = queryRequestMap.getIfPresent(project);
+        if (nRunningQueries != null) {
+            nRunningQueries.decrementAndGet();
+        }
+    }
+
+    public static Integer getCurrentRunningQuery(String project) {
+        AtomicInteger nRunningQueries = queryRequestMap.getIfPresent(project);
+        if (nRunningQueries != null) {
+            return nRunningQueries.get();
+        } else {
+            return null;
+        }
+    }
+}

Reply via email to