This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
commit b9aab0105cbf00a3581409a357be3816d0cd89a6 Author: Ma,Gang <ga...@ebay.com> AuthorDate: Thu Oct 18 18:33:16 2018 +0800 KYLIN-2897 improve the query execution for a set of duplicate queries in a short period --- .../org/apache/kylin/common/KylinConfigBase.java | 8 ++++++ .../apache/kylin/rest/response/SQLResponse.java | 18 ++++++++++++ .../apache/kylin/rest/service/QueryService.java | 33 ++++++++++++++++++++++ 3 files changed, 59 insertions(+) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index b19f2e9..778b5bf 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -1363,6 +1363,14 @@ abstract public class KylinConfigBase implements Serializable { return Integer.parseInt(getOptional("kylin.query.scan-threshold", "10000000")); } + public boolean isLazyQueryEnabled() { + return Boolean.parseBoolean(getOptional("kylin.query.lazy-query-enabled", "false")); + } + + public long getLazyQueryWaitingTimeoutMilliSeconds() { + return Long.parseLong(getOptional("kylin.query.lazy-query-waiting-timeout-milliseconds", "60000L")); + } + public int getQueryConcurrentRunningThresholdForProject() { // by default there's no limitation return Integer.parseInt(getOptional("kylin.query.project-concurrent-running-threshold", "0")); diff --git a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java index 0502798..1721efe 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java +++ b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java @@ -79,6 +79,10 @@ public class SQLResponse implements Serializable { // it's sql response signature for cache checking, no need to return and should be JsonIgnore protected String signature; + // it's a temporary flag, no need to return and should be JsonIgnore + // indicating the lazy query start time, -1 indicating not enabled + protected long lazyQueryStartTime = -1L; + public SQLResponse() { } @@ -219,6 +223,20 @@ public class SQLResponse implements Serializable { } @JsonIgnore + public boolean isRunning() { + return this.lazyQueryStartTime >= 0; + } + + @JsonIgnore + public long getLazyQueryStartTime() { + return lazyQueryStartTime; + } + + public void setLazyQueryStartTime(long lazyQueryStartTime) { + this.lazyQueryStartTime = lazyQueryStartTime; + } + + @JsonIgnore public List<QueryContext.CubeSegmentStatisticsResult> getCubeSegmentStatisticsList() { try { return queryStatistics == null ? Lists.<QueryContext.CubeSegmentStatisticsResult> newArrayList() diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java index abcab7f..78068eb 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -443,8 +443,16 @@ public class QueryService extends BasicService { Message msg = MsgPicker.getMsg(); final QueryContext queryContext = QueryContextFacade.current(); + boolean isDummpyResponseEnabled = queryCacheEnabled && kylinConfig.isLazyQueryEnabled(); SQLResponse sqlResponse = null; try { + // Add dummy response which will be updated or evicted when query finishes + if (isDummpyResponseEnabled) { + SQLResponse dummyResponse = new SQLResponse(); + dummyResponse.setLazyQueryStartTime(System.currentTimeMillis()); + cacheManager.getCache(QUERY_CACHE).put(sqlRequest.getCacheKey(), dummyResponse); + } + final boolean isSelect = QueryUtil.isSelectStatement(sqlRequest.getSql()); if (isSelect) { sqlResponse = query(sqlRequest, queryContext.getQueryId()); @@ -481,6 +489,8 @@ public class QueryService extends BasicService { "query response is too large: {} ({})", sqlResponse.getResults().size(), kylinConfig.getLargeQueryThreshold())) { cacheManager.getCache(QUERY_CACHE).put(sqlRequest.getCacheKey(), sqlResponse); + } else if (isDummpyResponseEnabled) { + cacheManager.getCache(QUERY_CACHE).evict(sqlRequest.getCacheKey()); } } catch (Throwable e) { // calcite may throw AssertError @@ -529,6 +539,29 @@ public class QueryService extends BasicService { if (response == null) { return null; } + + // Check whether duplicate query exists + while (response.isRunning()) { + // Wait at most one minute + if (System.currentTimeMillis() - response.getLazyQueryStartTime() >= getConfig() + .getLazyQueryWaitingTimeoutMilliSeconds()) { + cache.evict(sqlRequest.getCacheKey()); + return null; + } + logger.info("Duplicated SQL request is running, waiting..."); + try { + Thread.sleep(100L); + } catch (InterruptedException e) { + } + wrapper = cache.get(sqlRequest.getCacheKey()); + if (wrapper == null) { + return null; + } + response = (SQLResponse) wrapper.get(); + if (response == null) { + return null; + } + } logger.info("The sqlResponse is found in QUERY_CACHE"); if (getConfig().isQueryCacheSignatureEnabled() && !SQLResponseSignatureUtil.checkSignature(getConfig(), response, sqlRequest.getProject())) {