APACHE-KYLIN-2902: disable this feature by default Signed-off-by: lidongsjtu <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/7aef88ae Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/7aef88ae Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/7aef88ae Branch: refs/heads/master Commit: 7aef88aeeb8d4ab40c6aca4b403e187ab7c3a171 Parents: 1fbd51b Author: Zhong <[email protected]> Authored: Mon Dec 11 19:08:55 2017 +0800 Committer: lidongsjtu <[email protected]> Committed: Wed Dec 20 23:20:11 2017 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 3 +- .../apache/kylin/rest/service/QueryService.java | 125 +++++++++---------- .../kylin/rest/util/QueryRequestUtil.java | 90 +++++++++++++ .../org/apache/kylin/rest/util/RequestUtil.java | 84 ------------- .../kylin/rest/util/QueryRequestUtilTest.java | 69 ++++++++++ .../apache/kylin/rest/util/RequestUtilTest.java | 69 ---------- 6 files changed, 222 insertions(+), 218 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/7aef88ae/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 4917e0e..ce524b1 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -1132,7 +1132,8 @@ abstract public class KylinConfigBase implements Serializable { } public int getQueryConcurrentRunningThresholdForProject() { - return Integer.parseInt(getOptional("kylin.query.project-concurrent-running-threshold", "20")); + // by default there's no limitation + return Integer.parseInt(getOptional("kylin.query.project-concurrent-running-threshold", "0")); } public long getQueryMaxScanBytes() { http://git-wip-us.apache.org/repos/asf/kylin/blob/7aef88ae/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java index 2cdcafb..84a184e 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -98,7 +98,7 @@ import org.apache.kylin.rest.request.PrepareSqlRequest; import org.apache.kylin.rest.request.SQLRequest; import org.apache.kylin.rest.response.SQLResponse; import org.apache.kylin.rest.util.AclEvaluate; -import org.apache.kylin.rest.util.RequestUtil; +import org.apache.kylin.rest.util.QueryRequestUtil; import org.apache.kylin.rest.util.TableauInterceptor; import org.apache.kylin.storage.hybrid.HybridInstance; import org.slf4j.Logger; @@ -418,8 +418,8 @@ public class QueryService extends BasicService { throw new BadRequestException(msg.getNOT_SUPPORTED_SQL()); } - if (!RequestUtil.openQueryRequest(projectInstance.getName(), - projectInstance.getConfig().getQueryConcurrentRunningThresholdForProject())) { + int maxConcurrentQuery = projectInstance.getConfig().getQueryConcurrentRunningThresholdForProject(); + if (!QueryRequestUtil.openQueryRequest(projectInstance.getName(), maxConcurrentQuery)) { logger.warn("Directly return exception as too many concurrent query requests for project:" + project); throw new BadRequestException(msg.getQUERY_TOO_MANY_RUNNING()); } @@ -430,78 +430,75 @@ public class QueryService extends BasicService { OLAPContext.clearThreadLocalContexts(); SQLResponse sqlResponse = null; - boolean queryCacheEnabled = checkCondition(kylinConfig.isQueryCacheEnabled(), - "query cache disabled in KylinConfig") && // - checkCondition(!BackdoorToggles.getDisableCache(), "query cache disabled in BackdoorToggles"); - - if (queryCacheEnabled) { - try { // to deal with the case that cache searching throws exception + try { // to deal with the case that cache searching throws exception + boolean queryCacheEnabled = checkCondition(kylinConfig.isQueryCacheEnabled(), + "query cache disabled in KylinConfig") && // + checkCondition(!BackdoorToggles.getDisableCache(), "query cache disabled in BackdoorToggles"); + if (queryCacheEnabled) { sqlResponse = searchQueryInCache(sqlRequest); - } catch (Throwable e) { - RequestUtil.closeQueryRequest(projectInstance.getName()); - throw e; } - } - try { - if (null == sqlResponse) { - if (isSelect) { - sqlResponse = query(sqlRequest); - } else if (isPushDownUpdateEnabled) { - sqlResponse = update(sqlRequest); - } + try { + if (null == sqlResponse) { + if (isSelect) { + sqlResponse = query(sqlRequest); + } else if (isPushDownUpdateEnabled) { + sqlResponse = update(sqlRequest); + } - long durationThreshold = kylinConfig.getQueryDurationCacheThreshold(); - long scanCountThreshold = kylinConfig.getQueryScanCountCacheThreshold(); - long scanBytesThreshold = kylinConfig.getQueryScanBytesCacheThreshold(); - sqlResponse.setDuration(System.currentTimeMillis() - startTime); - logger.info("Stats of SQL response: isException: {}, duration: {}, total scan count {}", // - String.valueOf(sqlResponse.getIsException()), String.valueOf(sqlResponse.getDuration()), - String.valueOf(sqlResponse.getTotalScanCount())); - if (checkCondition(queryCacheEnabled, "query cache is disabled") // - && checkCondition(!sqlResponse.getIsException(), "query has exception") // - && checkCondition(!(sqlResponse.isPushDown() - && (isSelect == false || kylinConfig.isPushdownQueryCacheEnabled() == false)), - "query is executed with pushdown, but it is non-select, or the cache for pushdown is disabled") // - && checkCondition( - sqlResponse.getDuration() > durationThreshold - || sqlResponse.getTotalScanCount() > scanCountThreshold - || sqlResponse.getTotalScanBytes() > scanBytesThreshold, // - "query is too lightweight with duration: {} (threshold {}), scan count: {} (threshold {}), scan bytes: {} (threshold {})", - sqlResponse.getDuration(), durationThreshold, sqlResponse.getTotalScanCount(), - scanCountThreshold, sqlResponse.getTotalScanBytes(), scanBytesThreshold) - && checkCondition(sqlResponse.getResults().size() < kylinConfig.getLargeQueryThreshold(), - "query response is too large: {} ({})", sqlResponse.getResults().size(), - kylinConfig.getLargeQueryThreshold())) { - cacheManager.getCache(SUCCESS_QUERY_CACHE) - .put(new Element(sqlRequest.getCacheKey(), sqlResponse)); - } + long durationThreshold = kylinConfig.getQueryDurationCacheThreshold(); + long scanCountThreshold = kylinConfig.getQueryScanCountCacheThreshold(); + long scanBytesThreshold = kylinConfig.getQueryScanBytesCacheThreshold(); + sqlResponse.setDuration(System.currentTimeMillis() - startTime); + logger.info("Stats of SQL response: isException: {}, duration: {}, total scan count {}", // + String.valueOf(sqlResponse.getIsException()), String.valueOf(sqlResponse.getDuration()), + String.valueOf(sqlResponse.getTotalScanCount())); + if (checkCondition(queryCacheEnabled, "query cache is disabled") // + && checkCondition(!sqlResponse.getIsException(), "query has exception") // + && checkCondition(!(sqlResponse.isPushDown() + && (isSelect == false || kylinConfig.isPushdownQueryCacheEnabled() == false)), + "query is executed with pushdown, but it is non-select, or the cache for pushdown is disabled") // + && checkCondition( + sqlResponse.getDuration() > durationThreshold + || sqlResponse.getTotalScanCount() > scanCountThreshold + || sqlResponse.getTotalScanBytes() > scanBytesThreshold, // + "query is too lightweight with duration: {} (threshold {}), scan count: {} (threshold {}), scan bytes: {} (threshold {})", + sqlResponse.getDuration(), durationThreshold, sqlResponse.getTotalScanCount(), + scanCountThreshold, sqlResponse.getTotalScanBytes(), scanBytesThreshold) + && checkCondition( + sqlResponse.getResults().size() < kylinConfig.getLargeQueryThreshold(), + "query response is too large: {} ({})", sqlResponse.getResults().size(), + kylinConfig.getLargeQueryThreshold())) { + cacheManager.getCache(SUCCESS_QUERY_CACHE) + .put(new Element(sqlRequest.getCacheKey(), sqlResponse)); + } - } else { - sqlResponse.setDuration(System.currentTimeMillis() - startTime); - sqlResponse.setTotalScanCount(0); - sqlResponse.setTotalScanBytes(0); - } + } else { + sqlResponse.setDuration(System.currentTimeMillis() - startTime); + sqlResponse.setTotalScanCount(0); + sqlResponse.setTotalScanBytes(0); + } - checkQueryAuth(sqlResponse, project, secureEnabled); + checkQueryAuth(sqlResponse, project, secureEnabled); - } catch (Throwable e) { // calcite may throw AssertError - logger.error("Exception while executing query", e); - String errMsg = makeErrorMsgUserFriendly(e); + } catch (Throwable e) { // calcite may throw AssertError + logger.error("Exception while executing query", e); + String errMsg = makeErrorMsgUserFriendly(e); - sqlResponse = new SQLResponse(null, null, 0, true, errMsg); - sqlResponse.setThrowable(e.getCause() == null ? e : ExceptionUtils.getRootCause(e)); - sqlResponse.setTotalScanCount(queryContext.getScannedRows()); - sqlResponse.setTotalScanBytes(queryContext.getScannedBytes()); - sqlResponse.setCubeSegmentStatisticsList(queryContext.getCubeSegmentStatisticsResultList()); + sqlResponse = new SQLResponse(null, null, 0, true, errMsg); + sqlResponse.setThrowable(e.getCause() == null ? e : ExceptionUtils.getRootCause(e)); + sqlResponse.setTotalScanCount(queryContext.getScannedRows()); + sqlResponse.setTotalScanBytes(queryContext.getScannedBytes()); + sqlResponse.setCubeSegmentStatisticsList(queryContext.getCubeSegmentStatisticsResultList()); - if (queryCacheEnabled && e.getCause() != null - && ExceptionUtils.getRootCause(e) instanceof ResourceLimitExceededException) { - Cache exceptionCache = cacheManager.getCache(EXCEPTION_QUERY_CACHE); - exceptionCache.put(new Element(sqlRequest.getCacheKey(), sqlResponse)); + if (queryCacheEnabled && e.getCause() != null + && ExceptionUtils.getRootCause(e) instanceof ResourceLimitExceededException) { + Cache exceptionCache = cacheManager.getCache(EXCEPTION_QUERY_CACHE); + exceptionCache.put(new Element(sqlRequest.getCacheKey(), sqlResponse)); + } } } finally { - RequestUtil.closeQueryRequest(projectInstance.getName()); + QueryRequestUtil.closeQueryRequest(projectInstance.getName(), maxConcurrentQuery); } logQuery(sqlRequest, sqlResponse); http://git-wip-us.apache.org/repos/asf/kylin/blob/7aef88ae/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestUtil.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestUtil.java b/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestUtil.java new file mode 100644 index 0000000..3eb1670 --- /dev/null +++ b/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestUtil.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.rest.util; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; + +public class QueryRequestUtil { + private static final Logger logger = LoggerFactory.getLogger(QueryRequestUtil.class); + + private static LoadingCache<String, AtomicInteger> queryRequestMap = CacheBuilder.newBuilder() + .removalListener(new RemovalListener<String, AtomicInteger>() { + @Override + public void onRemoval(RemovalNotification<String, AtomicInteger> notification) { + logger.info("Current running query number " + notification.getValue().get() + " for project " + + notification.getKey() + " is removed due to " + notification.getCause()); + } + }).expireAfterWrite(1, TimeUnit.DAYS).build(new CacheLoader<String, AtomicInteger>() { + @Override + public AtomicInteger load(String s) throws Exception { + return new AtomicInteger(0); + } + }); + + public static boolean openQueryRequest(String project, int maxConcurrentQuery) { + if (maxConcurrentQuery == 0) { + return true; + } + try { + AtomicInteger nRunningQueries = queryRequestMap.get(project); + for (;;) { + int nRunning = nRunningQueries.get(); + if (nRunning < maxConcurrentQuery) { + if (nRunningQueries.compareAndSet(nRunning, nRunning + 1)) { + return true; + } + } else { + return false; + } + } + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + + public static void closeQueryRequest(String project, int maxConcurrentQuery) { + if (maxConcurrentQuery == 0) { + return; + } + AtomicInteger nRunningQueries = queryRequestMap.getIfPresent(project); + if (nRunningQueries != null) { + nRunningQueries.decrementAndGet(); + } + } + + public static Integer getCurrentRunningQuery(String project) { + AtomicInteger nRunningQueries = queryRequestMap.getIfPresent(project); + if (nRunningQueries != null) { + return nRunningQueries.get(); + } else { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/7aef88ae/server-base/src/main/java/org/apache/kylin/rest/util/RequestUtil.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/util/RequestUtil.java b/server-base/src/main/java/org/apache/kylin/rest/util/RequestUtil.java deleted file mode 100644 index 0155306..0000000 --- a/server-base/src/main/java/org/apache/kylin/rest/util/RequestUtil.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.rest.util; - -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; - -public class RequestUtil { - private static final Logger logger = LoggerFactory.getLogger(RequestUtil.class); - - private static LoadingCache<String, AtomicInteger> queryRequestMap = CacheBuilder.newBuilder() - .removalListener(new RemovalListener<String, AtomicInteger>() { - @Override - public void onRemoval(RemovalNotification<String, AtomicInteger> notification) { - logger.info("Current running query number " + notification.getValue().get() + " for project " - + notification.getKey() + " is removed due to " + notification.getCause()); - } - }).expireAfterWrite(1, TimeUnit.DAYS).build(new CacheLoader<String, AtomicInteger>() { - @Override - public AtomicInteger load(String s) throws Exception { - return new AtomicInteger(0); - } - }); - - public static boolean openQueryRequest(String project, int maxConcurrentQuery) { - try { - AtomicInteger nRunningQueries = queryRequestMap.get(project); - for (;;) { - int nRunning = nRunningQueries.get(); - if (nRunning < maxConcurrentQuery) { - if (nRunningQueries.compareAndSet(nRunning, nRunning + 1)) { - return true; - } - } else { - return false; - } - } - } catch (ExecutionException e) { - throw new RuntimeException(e); - } - } - - public static void closeQueryRequest(String project) { - AtomicInteger nRunningQueries = queryRequestMap.getIfPresent(project); - if (nRunningQueries != null) { - nRunningQueries.decrementAndGet(); - } - } - - public static Integer getCurrentRunningQuery(String project) { - AtomicInteger nRunningQueries = queryRequestMap.getIfPresent(project); - if (nRunningQueries != null) { - return nRunningQueries.get(); - } else { - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/7aef88ae/server-base/src/test/java/org/apache/kylin/rest/util/QueryRequestUtilTest.java ---------------------------------------------------------------------- diff --git a/server-base/src/test/java/org/apache/kylin/rest/util/QueryRequestUtilTest.java b/server-base/src/test/java/org/apache/kylin/rest/util/QueryRequestUtilTest.java new file mode 100644 index 0000000..fb6d2ff --- /dev/null +++ b/server-base/src/test/java/org/apache/kylin/rest/util/QueryRequestUtilTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.rest.util; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Assert; +import org.junit.Test; + +public class QueryRequestUtilTest { + + @Test + public void testOpenAndCloseQueryRequest() { + int nThread = 5; + + final Integer maxConcurrentQuery = 2; + final String project = "test"; + + final AtomicInteger nQueryFailed = new AtomicInteger(0); + + Thread[] threads = new Thread[nThread]; + final CountDownLatch lock = new CountDownLatch(nThread); + for (int i = 0; i < nThread; i++) { + final int j = i; + threads[j] = new Thread(new Runnable() { + @Override + public void run() { + try { + boolean ifOpen = QueryRequestUtil.openQueryRequest(project, maxConcurrentQuery); + lock.countDown(); + if (ifOpen) { + lock.await(); + QueryRequestUtil.closeQueryRequest(project, maxConcurrentQuery); + } else { + nQueryFailed.incrementAndGet(); + } + } catch (InterruptedException e) { + } + } + }); + threads[j].start(); + } + for (int i = 0; i < nThread; i++) { + try { + threads[i].join(); + } catch (InterruptedException e) { + } + } + Assert.assertEquals(new Integer(0), QueryRequestUtil.getCurrentRunningQuery(project)); + Assert.assertEquals(nThread - maxConcurrentQuery, nQueryFailed.get()); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/7aef88ae/server-base/src/test/java/org/apache/kylin/rest/util/RequestUtilTest.java ---------------------------------------------------------------------- diff --git a/server-base/src/test/java/org/apache/kylin/rest/util/RequestUtilTest.java b/server-base/src/test/java/org/apache/kylin/rest/util/RequestUtilTest.java deleted file mode 100644 index 5445a86..0000000 --- a/server-base/src/test/java/org/apache/kylin/rest/util/RequestUtilTest.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.rest.util; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; - -import org.junit.Assert; -import org.junit.Test; - -public class RequestUtilTest { - - @Test - public void testOpenAndCloseQueryRequest() { - int nThread = 5; - - final Integer maxConcurrentQuery = 2; - final String project = "test"; - - final AtomicInteger nQueryFailed = new AtomicInteger(0); - - Thread[] threads = new Thread[nThread]; - final CountDownLatch lock = new CountDownLatch(nThread); - for (int i = 0; i < nThread; i++) { - final int j = i; - threads[j] = new Thread(new Runnable() { - @Override - public void run() { - try { - boolean ifOpen = RequestUtil.openQueryRequest(project, maxConcurrentQuery); - lock.countDown(); - if (ifOpen) { - lock.await(); - RequestUtil.closeQueryRequest(project); - } else { - nQueryFailed.incrementAndGet(); - } - } catch (InterruptedException e) { - } - } - }); - threads[j].start(); - } - for (int i = 0; i < nThread; i++) { - try { - threads[i].join(); - } catch (InterruptedException e) { - } - } - Assert.assertEquals(new Integer(0), RequestUtil.getCurrentRunningQuery(project)); - Assert.assertEquals(nThread - maxConcurrentQuery, nQueryFailed.get()); - } -}
