KYLIN-2902 minor refine
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/8690fd2d Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/8690fd2d Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/8690fd2d Branch: refs/heads/master Commit: 8690fd2d671ca7fa49576cc84eda41c3f24e539f Parents: 7aef88a Author: lidongsjtu <[email protected]> Authored: Wed Dec 20 15:24:10 2017 +0800 Committer: lidongsjtu <[email protected]> Committed: Wed Dec 20 23:20:11 2017 +0800 ---------------------------------------------------------------------- .../apache/kylin/rest/service/QueryService.java | 27 +++--- .../kylin/rest/util/QueryRequestLimits.java | 90 ++++++++++++++++++++ .../kylin/rest/util/QueryRequestUtil.java | 90 -------------------- .../kylin/rest/util/QueryRequestLimitsTest.java | 69 +++++++++++++++ .../kylin/rest/util/QueryRequestUtilTest.java | 69 --------------- 5 files changed, 174 insertions(+), 171 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/8690fd2d/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java index 84a184e..17f6b58 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -98,7 +98,7 @@ import org.apache.kylin.rest.request.PrepareSqlRequest; import org.apache.kylin.rest.request.SQLRequest; import org.apache.kylin.rest.response.SQLResponse; import org.apache.kylin.rest.util.AclEvaluate; -import org.apache.kylin.rest.util.QueryRequestUtil; +import org.apache.kylin.rest.util.QueryRequestLimits; import org.apache.kylin.rest.util.TableauInterceptor; import org.apache.kylin.storage.hybrid.HybridInstance; import org.slf4j.Logger; @@ -412,25 +412,28 @@ public class QueryService extends BasicService { final boolean isSelect = QueryUtil.isSelectStatement(sql); final boolean isPushDownUpdateEnabled = kylinConfig.isPushDownEnabled() && kylinConfig.isPushDownUpdateEnabled(); + final int maxConcurrentQuery = projectInstance.getConfig().getQueryConcurrentRunningThresholdForProject(); if (!isSelect && !isPushDownUpdateEnabled) { logger.debug("Directly return exception as the sql is unsupported, and query pushdown is disabled"); throw new BadRequestException(msg.getNOT_SUPPORTED_SQL()); } - int maxConcurrentQuery = projectInstance.getConfig().getQueryConcurrentRunningThresholdForProject(); - if (!QueryRequestUtil.openQueryRequest(projectInstance.getName(), maxConcurrentQuery)) { - logger.warn("Directly return exception as too many concurrent query requests for project:" + project); - throw new BadRequestException(msg.getQUERY_TOO_MANY_RUNNING()); - } + SQLResponse sqlResponse = null; + + try { + // Check project level query request concurrency limitation per query server + if (!QueryRequestLimits.openQueryRequest(projectInstance.getName(), maxConcurrentQuery)) { + logger.warn( + "Directly return exception as too many concurrent query requests for project:" + project); + throw new BadRequestException(msg.getQUERY_TOO_MANY_RUNNING()); + } - long startTime = System.currentTimeMillis(); + long startTime = System.currentTimeMillis(); - // force clear the query context before a new query - OLAPContext.clearThreadLocalContexts(); + // force clear the query context before a new query + OLAPContext.clearThreadLocalContexts(); - SQLResponse sqlResponse = null; - try { // to deal with the case that cache searching throws exception boolean queryCacheEnabled = checkCondition(kylinConfig.isQueryCacheEnabled(), "query cache disabled in KylinConfig") && // checkCondition(!BackdoorToggles.getDisableCache(), "query cache disabled in BackdoorToggles"); @@ -498,7 +501,7 @@ public class QueryService extends BasicService { } } } finally { - QueryRequestUtil.closeQueryRequest(projectInstance.getName(), maxConcurrentQuery); + QueryRequestLimits.closeQueryRequest(projectInstance.getName(), maxConcurrentQuery); } logQuery(sqlRequest, sqlResponse); http://git-wip-us.apache.org/repos/asf/kylin/blob/8690fd2d/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestLimits.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestLimits.java b/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestLimits.java new file mode 100644 index 0000000..cddaa12 --- /dev/null +++ b/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestLimits.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.rest.util; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; + +public class QueryRequestLimits { + private static final Logger logger = LoggerFactory.getLogger(QueryRequestLimits.class); + + private static LoadingCache<String, AtomicInteger> runningStats = CacheBuilder.newBuilder() + .removalListener(new RemovalListener<String, AtomicInteger>() { + @Override + public void onRemoval(RemovalNotification<String, AtomicInteger> notification) { + logger.info("Current running query number " + notification.getValue().get() + " for project " + + notification.getKey() + " is removed due to " + notification.getCause()); + } + }).expireAfterWrite(1, TimeUnit.DAYS).build(new CacheLoader<String, AtomicInteger>() { + @Override + public AtomicInteger load(String s) throws Exception { + return new AtomicInteger(0); + } + }); + + public static boolean openQueryRequest(String project, int maxConcurrentQuery) { + if (maxConcurrentQuery == 0) { + return true; + } + try { + AtomicInteger nRunningQueries = runningStats.get(project); + for (;;) { + int nRunning = nRunningQueries.get(); + if (nRunning < maxConcurrentQuery) { + if (nRunningQueries.compareAndSet(nRunning, nRunning + 1)) { + return true; + } + } else { + return false; + } + } + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + + public static void closeQueryRequest(String project, int maxConcurrentQuery) { + if (maxConcurrentQuery == 0) { + return; + } + AtomicInteger nRunningQueries = runningStats.getIfPresent(project); + if (nRunningQueries != null) { + nRunningQueries.decrementAndGet(); + } + } + + public static Integer getCurrentRunningQuery(String project) { + AtomicInteger nRunningQueries = runningStats.getIfPresent(project); + if (nRunningQueries != null) { + return nRunningQueries.get(); + } else { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/8690fd2d/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestUtil.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestUtil.java b/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestUtil.java deleted file mode 100644 index 3eb1670..0000000 --- a/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestUtil.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.rest.util; - -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; - -public class QueryRequestUtil { - private static final Logger logger = LoggerFactory.getLogger(QueryRequestUtil.class); - - private static LoadingCache<String, AtomicInteger> queryRequestMap = CacheBuilder.newBuilder() - .removalListener(new RemovalListener<String, AtomicInteger>() { - @Override - public void onRemoval(RemovalNotification<String, AtomicInteger> notification) { - logger.info("Current running query number " + notification.getValue().get() + " for project " - + notification.getKey() + " is removed due to " + notification.getCause()); - } - }).expireAfterWrite(1, TimeUnit.DAYS).build(new CacheLoader<String, AtomicInteger>() { - @Override - public AtomicInteger load(String s) throws Exception { - return new AtomicInteger(0); - } - }); - - public static boolean openQueryRequest(String project, int maxConcurrentQuery) { - if (maxConcurrentQuery == 0) { - return true; - } - try { - AtomicInteger nRunningQueries = queryRequestMap.get(project); - for (;;) { - int nRunning = nRunningQueries.get(); - if (nRunning < maxConcurrentQuery) { - if (nRunningQueries.compareAndSet(nRunning, nRunning + 1)) { - return true; - } - } else { - return false; - } - } - } catch (ExecutionException e) { - throw new RuntimeException(e); - } - } - - public static void closeQueryRequest(String project, int maxConcurrentQuery) { - if (maxConcurrentQuery == 0) { - return; - } - AtomicInteger nRunningQueries = queryRequestMap.getIfPresent(project); - if (nRunningQueries != null) { - nRunningQueries.decrementAndGet(); - } - } - - public static Integer getCurrentRunningQuery(String project) { - AtomicInteger nRunningQueries = queryRequestMap.getIfPresent(project); - if (nRunningQueries != null) { - return nRunningQueries.get(); - } else { - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/8690fd2d/server-base/src/test/java/org/apache/kylin/rest/util/QueryRequestLimitsTest.java ---------------------------------------------------------------------- diff --git a/server-base/src/test/java/org/apache/kylin/rest/util/QueryRequestLimitsTest.java b/server-base/src/test/java/org/apache/kylin/rest/util/QueryRequestLimitsTest.java new file mode 100644 index 0000000..021c057 --- /dev/null +++ b/server-base/src/test/java/org/apache/kylin/rest/util/QueryRequestLimitsTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.rest.util; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Assert; +import org.junit.Test; + +public class QueryRequestLimitsTest { + + @Test + public void testOpenAndCloseQueryRequest() { + int nThread = 5; + + final Integer maxConcurrentQuery = 2; + final String project = "test"; + + final AtomicInteger nQueryFailed = new AtomicInteger(0); + + Thread[] threads = new Thread[nThread]; + final CountDownLatch lock = new CountDownLatch(nThread); + for (int i = 0; i < nThread; i++) { + final int j = i; + threads[j] = new Thread(new Runnable() { + @Override + public void run() { + try { + boolean ifOpen = QueryRequestLimits.openQueryRequest(project, maxConcurrentQuery); + lock.countDown(); + if (ifOpen) { + lock.await(); + QueryRequestLimits.closeQueryRequest(project, maxConcurrentQuery); + } else { + nQueryFailed.incrementAndGet(); + } + } catch (InterruptedException e) { + } + } + }); + threads[j].start(); + } + for (int i = 0; i < nThread; i++) { + try { + threads[i].join(); + } catch (InterruptedException e) { + } + } + Assert.assertEquals(new Integer(0), QueryRequestLimits.getCurrentRunningQuery(project)); + Assert.assertEquals(nThread - maxConcurrentQuery, nQueryFailed.get()); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/8690fd2d/server-base/src/test/java/org/apache/kylin/rest/util/QueryRequestUtilTest.java ---------------------------------------------------------------------- diff --git a/server-base/src/test/java/org/apache/kylin/rest/util/QueryRequestUtilTest.java b/server-base/src/test/java/org/apache/kylin/rest/util/QueryRequestUtilTest.java deleted file mode 100644 index fb6d2ff..0000000 --- a/server-base/src/test/java/org/apache/kylin/rest/util/QueryRequestUtilTest.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.rest.util; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; - -import org.junit.Assert; -import org.junit.Test; - -public class QueryRequestUtilTest { - - @Test - public void testOpenAndCloseQueryRequest() { - int nThread = 5; - - final Integer maxConcurrentQuery = 2; - final String project = "test"; - - final AtomicInteger nQueryFailed = new AtomicInteger(0); - - Thread[] threads = new Thread[nThread]; - final CountDownLatch lock = new CountDownLatch(nThread); - for (int i = 0; i < nThread; i++) { - final int j = i; - threads[j] = new Thread(new Runnable() { - @Override - public void run() { - try { - boolean ifOpen = QueryRequestUtil.openQueryRequest(project, maxConcurrentQuery); - lock.countDown(); - if (ifOpen) { - lock.await(); - QueryRequestUtil.closeQueryRequest(project, maxConcurrentQuery); - } else { - nQueryFailed.incrementAndGet(); - } - } catch (InterruptedException e) { - } - } - }); - threads[j].start(); - } - for (int i = 0; i < nThread; i++) { - try { - threads[i].join(); - } catch (InterruptedException e) { - } - } - Assert.assertEquals(new Integer(0), QueryRequestUtil.getCurrentRunningQuery(project)); - Assert.assertEquals(nThread - maxConcurrentQuery, nQueryFailed.get()); - } -}
