KYLIN-2881 code review
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/e1d8b71b Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/e1d8b71b Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/e1d8b71b Branch: refs/heads/KYLIN-2881-review Commit: e1d8b71bbd02d04405ab138a8a91d9dc064cb676 Parents: fcac5fc Author: lidongsjtu <lid...@apache.org> Authored: Mon Jan 8 11:29:35 2018 +0800 Committer: lidongsjtu <lid...@apache.org> Committed: Sun Jan 21 20:01:34 2018 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/common/QueryContext.java | 8 +- .../apache/kylin/common/QueryContextFacade.java | 101 +++++++++++++++++ .../kylin/common/QueryContextManager.java | 109 ------------------- .../gtrecord/GTCubeStorageQueryBase.java | 6 +- .../gtrecord/SequentialCubeTupleIterator.java | 4 +- .../apache/kylin/query/ITFailfastQueryTest.java | 4 +- .../apache/kylin/query/ITKylinQueryTest.java | 4 +- .../apache/kylin/query/ITMassInQueryTest.java | 4 +- .../org/apache/kylin/query/KylinTestBase.java | 36 ++---- .../kylin/query/enumerator/OLAPQuery.java | 4 +- .../kylin/rest/controller/QueryController.java | 18 +-- .../kylin/rest/metrics/QueryMetricsFacade.java | 5 +- .../apache/kylin/rest/service/QueryService.java | 20 ++-- server/src/main/resources/kylinSecurity.xml | 4 + .../kylin/rest/metrics/QueryMetricsTest.java | 4 +- .../kylin/rest/service/QueryServiceTest.java | 4 +- .../storage/hbase/cube/v2/CubeHBaseRPC.java | 4 +- 17 files changed, 161 insertions(+), 178 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/e1d8b71b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java index d36b332..718f590 100644 --- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java +++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java @@ -62,8 +62,12 @@ public class QueryContext { private Map<Integer, CubeSegmentStatisticsResult> cubeSegmentStatisticsResultMap = Maps.newConcurrentMap(); QueryContext() { + this(System.currentTimeMillis()); + } + + QueryContext(long startMills) { queryId = UUID.randomUUID().toString(); - queryStartMillis = System.currentTimeMillis(); + queryStartMillis = startMills; } public long getQueryStartMillis() { @@ -72,7 +76,7 @@ public class QueryContext { public void setDeadline(long timeoutMillis) { if (timeoutMillis > 0) { - deadline = queryStartMillis + timeoutMillis; + deadline = queryStartMillis + timeoutMillis; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/e1d8b71b/core-common/src/main/java/org/apache/kylin/common/QueryContextFacade.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContextFacade.java b/core-common/src/main/java/org/apache/kylin/common/QueryContextFacade.java new file mode 100644 index 0000000..e1cf54b --- /dev/null +++ b/core-common/src/main/java/org/apache/kylin/common/QueryContextFacade.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.common; + +import java.util.Comparator; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +public class QueryContextFacade { + + private static final Logger logger = LoggerFactory.getLogger(QueryContextFacade.class); + + private static final ConcurrentMap<String, QueryContext> RUNNING_CTX_MAP = Maps.newConcurrentMap(); + private static final ThreadLocal<QueryContext> CURRENT_CTX = new ThreadLocal<QueryContext>() { + @Override + protected QueryContext initialValue() { + QueryContext queryContext = new QueryContext(); + RUNNING_CTX_MAP.put(queryContext.getQueryId(), queryContext); + return queryContext; + } + }; + + public static QueryContext current() { + return CURRENT_CTX.get(); + } + + /** + * invoked by program + */ + public static void resetCurrent() { + QueryContext queryContext = CURRENT_CTX.get(); + if (queryContext != null) { + RUNNING_CTX_MAP.remove(queryContext.getQueryId()); + CURRENT_CTX.remove(); + } + } + + /** + * invoked by user to let query stop early + * @link resetCurrent() should be finally invoked + */ + public static void stopQuery(String queryId, String info) { + QueryContext queryContext = RUNNING_CTX_MAP.get(queryId); + if (queryContext != null) { + queryContext.stopEarly(info); + } else { + logger.info("the query:{} is not existed", queryId); + } + } + + public static TreeSet<QueryContext> getAllRunningQueries() { + TreeSet<QueryContext> runningQueries = Sets.newTreeSet(new Comparator<QueryContext>() { + @Override + public int compare(QueryContext o1, QueryContext o2) { + if (o2.getAccumulatedMillis() > o1.getAccumulatedMillis()) { + return 1; + } else if (o2.getAccumulatedMillis() < o1.getAccumulatedMillis()) { + return -1; + } else { + return o1.getQueryId().compareTo(o2.getQueryId()); + } + } + }); + + runningQueries.addAll(RUNNING_CTX_MAP.values()); + return runningQueries; + } + + /** + * @param runningTime in milliseconds + * @return running queries that have run more than specified time + */ + public static TreeSet<QueryContext> getLongRunningQueries(long runningTime) { + SortedSet<QueryContext> allRunningQueries = getAllRunningQueries(); + QueryContext tmpCtx = new QueryContext(runningTime + 1L); // plus 1 to include those contexts in same accumulatedMills but different uuid + return (TreeSet<QueryContext>) allRunningQueries.headSet(tmpCtx); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/e1d8b71b/core-common/src/main/java/org/apache/kylin/common/QueryContextManager.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContextManager.java b/core-common/src/main/java/org/apache/kylin/common/QueryContextManager.java deleted file mode 100644 index d08557e..0000000 --- a/core-common/src/main/java/org/apache/kylin/common/QueryContextManager.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.common; - -import java.util.Comparator; -import java.util.List; -import java.util.TreeSet; -import java.util.concurrent.ConcurrentMap; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -public class QueryContextManager { - - private static final Logger logger = LoggerFactory.getLogger(QueryContextManager.class); - - private static final ConcurrentMap<String, QueryContext> idContextMap = Maps.newConcurrentMap(); - private static final ThreadLocal<QueryContext> contexts = new ThreadLocal<QueryContext>() { - @Override - protected QueryContext initialValue() { - QueryContext queryContext = new QueryContext(); - idContextMap.put(queryContext.getQueryId(), queryContext); - return queryContext; - } - }; - - public static QueryContext current() { - return contexts.get(); - } - - /** - * invoked by program - */ - public static void resetCurrent() { - QueryContext queryContext = contexts.get(); - if (queryContext != null) { - idContextMap.remove(queryContext.getQueryId()); - contexts.remove(); - } - } - - /** - * invoked by user to let query stop early - * @link resetCurrent() should be finally invoked - */ - public static void stopQuery(String queryId, String info) { - QueryContext queryContext = idContextMap.get(queryId); - if (queryContext != null) { - queryContext.stopEarly(info); - } else { - logger.info("the query:{} is not existed", queryId); - } - } - - public static List<QueryContext> getAllRunningQueries() { - // Sort by descending order - TreeSet<QueryContext> queriesSet = new TreeSet<>(new Comparator<QueryContext>() { - @Override - public int compare(QueryContext o1, QueryContext o2) { - if (o2.getAccumulatedMillis() > o1.getAccumulatedMillis()) { - return 1; - } else if (o2.getAccumulatedMillis() < o1.getAccumulatedMillis()) { - return -1; - } else { - return 0; - } - } - }); - - for (QueryContext runningQuery : idContextMap.values()) { - queriesSet.add(runningQuery); - } - return Lists.newArrayList(queriesSet); - } - - /** - * @param runningTime in milliseconds - * @return running queries that have run more than specified time - */ - public static List<QueryContext> getLongRunningQueries(int runningTime) { - List<QueryContext> allRunningQueries = getAllRunningQueries(); - int i = 0; - for (; i < allRunningQueries.size(); i++) { - if (allRunningQueries.get(i).getAccumulatedMillis() < runningTime) { - break; - } - } - return allRunningQueries.subList(0, i); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/e1d8b71b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java index 483facd..ae1f64f 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java @@ -26,7 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.kylin.common.QueryContextManager; +import org.apache.kylin.common.QueryContextFacade; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; @@ -161,8 +161,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { // set whether to aggregate results from multiple partitions enableStreamAggregateIfBeneficial(cuboid, groupsD, context); // set and check query deadline - QueryContextManager.current().setDeadline(cubeInstance.getConfig().getQueryTimeoutSeconds() * 1000); - QueryContextManager.current().checkMillisBeforeDeadline(); + QueryContextFacade.current().setDeadline(cubeInstance.getConfig().getQueryTimeoutSeconds() * 1000); + QueryContextFacade.current().checkMillisBeforeDeadline(); // push down having clause filter if possible TupleFilter havingFilter = checkHavingCanPushDown(sqlDigest.havingFilter, groupsD, sqlDigest.aggregations, http://git-wip-us.apache.org/repos/asf/kylin/blob/e1d8b71b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java index f45f02b..72417bf 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java @@ -25,7 +25,7 @@ import java.util.List; import java.util.Set; import java.util.TreeSet; -import org.apache.kylin.common.QueryContextManager; +import org.apache.kylin.common.QueryContextFacade; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.TblColRef; @@ -142,7 +142,7 @@ public class SequentialCubeTupleIterator implements ITupleIterator { @Override public ITuple next() { if (scanCount++ % 100 == 1) { - QueryContextManager.current().checkMillisBeforeDeadline(); + QueryContextFacade.current().checkMillisBeforeDeadline(); } if (++scanCountDelta >= 1000) http://git-wip-us.apache.org/repos/asf/kylin/blob/e1d8b71b/kylin-it/src/test/java/org/apache/kylin/query/ITFailfastQueryTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITFailfastQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITFailfastQueryTest.java index e4b8b43..f7f0752 100644 --- a/kylin-it/src/test/java/org/apache/kylin/query/ITFailfastQueryTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/query/ITFailfastQueryTest.java @@ -21,7 +21,7 @@ import java.io.File; import java.util.Map; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.QueryContextManager; +import org.apache.kylin.common.QueryContextFacade; import org.apache.kylin.common.exceptions.ResourceLimitExceededException; import org.apache.kylin.metadata.realization.RealizationType; import org.apache.kylin.query.routing.Candidate; @@ -57,7 +57,7 @@ public class ITFailfastQueryTest extends KylinTestBase { @After public void cleanUp() { - QueryContextManager.resetCurrent(); + QueryContextFacade.resetCurrent(); } @AfterClass http://git-wip-us.apache.org/repos/asf/kylin/blob/e1d8b71b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java index 02a50ce..4edfb3d 100644 --- a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java @@ -156,7 +156,7 @@ public class ITKylinQueryTest extends KylinTestBase { String sql = getTextFromFile(sqlFile); IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection); - execQueryUsingKylin(kylinConn, queryFileName, sql, true); + executeQuery(kylinConn, queryFileName, sql, true); } @Ignore @@ -403,7 +403,7 @@ public class ITKylinQueryTest extends KylinTestBase { // execute Kylin logger.info("Query Result from Kylin - " + queryName); IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection); - ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, false); + ITable kylinTable = executeQuery(kylinConn, queryName, sql, false); String queriedVersion = String.valueOf(kylinTable.getValue(0, "version")); // compare the result http://git-wip-us.apache.org/repos/asf/kylin/blob/e1d8b71b/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java index 16395fc..cca0be6 100644 --- a/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/query/ITMassInQueryTest.java @@ -118,7 +118,7 @@ public class ITMassInQueryTest extends KylinTestBase { // execute Kylin logger.info("Query Result from Kylin - " + queryName + " (" + queryFolder + ")"); IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection); - ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, needSort); + ITable kylinTable = executeQuery(kylinConn, queryName, sql, needSort); printResult(kylinTable); } @@ -139,7 +139,7 @@ public class ITMassInQueryTest extends KylinTestBase { // execute Kylin logger.info("Query Result from Kylin - " + queryName + " (" + queryFolder + ")"); IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection); - ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, needSort); + ITable kylinTable = executeQuery(kylinConn, queryName, sql, needSort); // execute H2 sql = sql.replace("massin(test_kylin_fact.SELLER_ID,'vip_customers')", "test_kylin_fact.SELLER_ID in ( " + org.apache.commons.lang.StringUtils.join(vipSellers, ",") + ")"); http://git-wip-us.apache.org/repos/asf/kylin/blob/e1d8b71b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java index e38bb1a..42ec917 100644 --- a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java +++ b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java @@ -44,7 +44,7 @@ import java.util.logging.LogManager; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.QueryContextManager; +import org.apache.kylin.common.QueryContextFacade; import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.common.util.HBaseMetadataTestCase; import org.apache.kylin.common.util.Pair; @@ -229,19 +229,10 @@ public class KylinTestBase { // //////////////////////////////////////////////////////////////////////////////////////// // execute - private void initExecQueryUsingKylin(String sql) { - QueryContextManager.resetCurrent(); - QueryContextManager.current(); - } - - protected ITable execQueryUsingKylin(IDatabaseConnection dbConn, String queryName, String sql, boolean needSort) - throws Exception { - initExecQueryUsingKylin(sql); - return executeQuery(dbConn, queryName, sql, needSort); - } protected ITable executeQuery(IDatabaseConnection dbConn, String queryName, String sql, boolean needSort) throws Exception { + QueryContextFacade.resetCurrent(); // change join type to match current setting sql = changeJoinType(sql, joinType); @@ -262,7 +253,6 @@ public class KylinTestBase { } protected int executeQuery(String sql, boolean needDisplay) throws Exception { - initExecQueryUsingKylin(sql); // change join type to match current setting sql = changeJoinType(sql, joinType); @@ -314,12 +304,6 @@ public class KylinTestBase { return PushDownUtil.tryPushDownNonSelectQuery(ProjectInstance.DEFAULT_PROJECT_NAME, sql, "DEFAULT", isPrepare); } - protected ITable execDynamicQueryUsingKylin(IDatabaseConnection dbConn, String queryName, String sql, - List<String> parameters, boolean needSort) throws Exception { - initExecQueryUsingKylin(sql); - return executeDynamicQuery(dbConn, queryName, sql, parameters, needSort); - } - protected ITable executeDynamicQuery(IDatabaseConnection dbConn, String queryName, String sql, List<String> parameters, boolean needSort) throws Exception { @@ -400,7 +384,7 @@ public class KylinTestBase { // execute Kylin logger.info("Query Result from Kylin - " + queryName + " (" + queryFolder + ")"); IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection); - ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, false); + ITable kylinTable = executeQuery(kylinConn, queryName, sql, false); // compare the result if (BackdoorToggles.getPrepareOnly()) @@ -444,7 +428,7 @@ public class KylinTestBase { // execute Kylin logger.info("Query Result from Kylin - " + queryName + " (" + queryFolder + ")"); IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection); - ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, false); + ITable kylinTable = executeQuery(kylinConn, queryName, sql, false); // compare the result assertTableEquals(expectTable, kylinTable); @@ -467,7 +451,7 @@ public class KylinTestBase { // execute Kylin logger.info("Query Result from Kylin - " + queryName + " (" + queryFolder + ")"); IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection); - ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql, needSort); + ITable kylinTable = executeQuery(kylinConn, queryName, sql, needSort); // execute H2 logger.info("Query Result from H2 - " + queryName); @@ -496,7 +480,7 @@ public class KylinTestBase { // execute Kylin logger.info("Query Result from Kylin - " + sql); IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection); - ITable kylinTable = execQueryUsingKylin(kylinConn, sql, sql, false); + ITable kylinTable = executeQuery(kylinConn, sql, sql, false); try { // compare the result @@ -528,7 +512,7 @@ public class KylinTestBase { // execute Kylin logger.info("Query Result from Kylin - " + queryName + " (" + queryFolder + ")"); IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection); - ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sqlWithLimit, false); + ITable kylinTable = executeQuery(kylinConn, queryName, sqlWithLimit, false); // execute H2 logger.info("Query Result from H2 - " + queryName); @@ -579,7 +563,7 @@ public class KylinTestBase { // execute Kylin logger.info("Query Result from Kylin - " + queryName + " (" + queryFolder + ")"); IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection); - ITable kylinTable = execQueryUsingKylin(kylinConn, queryName, sql1, needSort); + ITable kylinTable = executeQuery(kylinConn, queryName, sql1, needSort); // execute H2 logger.info("Query Result from H2 - " + queryName); @@ -619,7 +603,7 @@ public class KylinTestBase { // execute Kylin logger.info("Query Result from Kylin - " + queryName + " (" + queryFolder + ")"); IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection); - ITable kylinTable = execDynamicQueryUsingKylin(kylinConn, queryName, sql, parameters, needSort); + ITable kylinTable = executeDynamicQuery(kylinConn, queryName, sql, parameters, needSort); // execute H2 logger.info("Query Result from H2 - " + queryName); @@ -727,7 +711,7 @@ public class KylinTestBase { //setup cube conn String project = ProjectInstance.DEFAULT_PROJECT_NAME; - cubeConnection = QueryDataSource.create(project, config).getConnection(); + cubeConnection = QueryConnection.getConnection(project); //setup h2 h2Connection = DriverManager.getConnection("jdbc:h2:mem:db" + (h2InstanceCount++) + ";CACHE_SIZE=32072", "sa", http://git-wip-us.apache.org/repos/asf/kylin/blob/e1d8b71b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java index f0759ab..84ac5cf 100644 --- a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java +++ b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java @@ -22,7 +22,7 @@ import org.apache.calcite.DataContext; import org.apache.calcite.linq4j.AbstractEnumerable; import org.apache.calcite.linq4j.Enumerable; import org.apache.calcite.linq4j.Enumerator; -import org.apache.kylin.common.QueryContextManager; +import org.apache.kylin.common.QueryContextFacade; import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.query.relnode.OLAPContext; import org.slf4j.Logger; @@ -49,7 +49,7 @@ public class OLAPQuery extends AbstractEnumerable<Object[]> implements Enumerabl this.type = type; this.contextId = ctxId; - QueryContextManager.current().addContext(ctxId, type.toString(), + QueryContextFacade.current().addContext(ctxId, type.toString(), type == EnumeratorTypeEnum.OLAP); } http://git-wip-us.apache.org/repos/asf/kylin/blob/e1d8b71b/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java index b7c5650..f7c3720 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java @@ -25,13 +25,14 @@ import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Map; +import java.util.TreeSet; import javax.servlet.http.HttpServletResponse; import org.apache.commons.io.IOUtils; -import org.apache.kylin.common.QueryContext; -import org.apache.kylin.common.QueryContextManager; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.QueryContext; +import org.apache.kylin.common.QueryContextFacade; import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.metadata.querymeta.SelectedColumnMeta; import org.apache.kylin.metadata.querymeta.TableMeta; @@ -181,11 +182,12 @@ public class QueryController extends BasicController { */ @RequestMapping(value = "/query/runningQueries", method = RequestMethod.GET) @ResponseBody - public List<QueryContext> getRunningQueries(@RequestParam(value = "runTimeMoreThan", required = false, defaultValue = "-1") int runTimeMoreThan) { + public TreeSet<QueryContext> getRunningQueries( + @RequestParam(value = "runTimeMoreThan", required = false, defaultValue = "-1") int runTimeMoreThan) { if (runTimeMoreThan == -1) { - return QueryContextManager.getAllRunningQueries(); - }else { - return QueryContextManager.getLongRunningQueries(runTimeMoreThan * 1000); + return QueryContextFacade.getAllRunningQueries(); + } else { + return QueryContextFacade.getLongRunningQueries(runTimeMoreThan * 1000); } } @@ -193,8 +195,8 @@ public class QueryController extends BasicController { @ResponseBody public void stopQuery(@PathVariable String queryId) { final String user = SecurityContextHolder.getContext().getAuthentication().getName(); - logger.info("{} stop the query: {}", new Object[] { user, queryId }); - QueryContextManager.stopQuery(queryId, "stopped by " + user); + logger.info("{} tries to stop the query: {}, but not guaranteed to succeed.", user, queryId); + QueryContextFacade.stopQuery(queryId, "stopped by " + user); } public void setQueryService(QueryService queryService) { http://git-wip-us.apache.org/repos/asf/kylin/blob/e1d8b71b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java index 09ccc07..40fc5ef 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java +++ b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java @@ -28,8 +28,7 @@ import org.apache.hadoop.metrics2.MetricsException; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.QueryContext; -import org.apache.kylin.common.QueryContextManager; -import org.apache.kylin.metadata.project.ProjectInstance; +import org.apache.kylin.common.QueryContextFacade; import org.apache.kylin.metrics.MetricsManager; import org.apache.kylin.metrics.lib.impl.RecordEvent; import org.apache.kylin.metrics.lib.impl.TimedRecordEvent; @@ -100,7 +99,7 @@ public class QueryMetricsFacade { if (user == null) { user = "unknown"; } - for (QueryContext.RPCStatistics entry : QueryContextManager.current().getRpcStatisticsList()) { + for (QueryContext.RPCStatistics entry : QueryContextFacade.current().getRpcStatisticsList()) { RecordEvent rpcMetricsEvent = new TimedRecordEvent( KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQueryRpcCall()); setRPCWrapper(rpcMetricsEvent, // http://git-wip-us.apache.org/repos/asf/kylin/blob/e1d8b71b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java index 71b54e3..7b30606 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -58,7 +58,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.QueryContext; -import org.apache.kylin.common.QueryContextManager; +import org.apache.kylin.common.QueryContextFacade; import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.common.exceptions.ResourceLimitExceededException; import org.apache.kylin.common.htrace.HtraceInit; @@ -408,7 +408,7 @@ public class QueryService extends BasicService { if (sqlRequest.getBackdoorToggles() != null) BackdoorToggles.addToggles(sqlRequest.getBackdoorToggles()); - final QueryContext queryContext = QueryContextManager.current(); + final QueryContext queryContext = QueryContextFacade.current(); TraceScope scope = null; if (kylinConfig.isHtraceTracingEveryQuery() || BackdoorToggles.getHtraceEnabled()) { @@ -419,8 +419,6 @@ public class QueryService extends BasicService { String traceUrl = getTraceUrl(scope); try (SetThreadName ignored = new SetThreadName("Query %s", queryContext.getQueryId())) { - long startTime = System.currentTimeMillis(); - SQLResponse sqlResponse = null; String sql = sqlRequest.getSql(); String project = sqlRequest.getProject(); @@ -452,7 +450,7 @@ public class QueryService extends BasicService { // real execution if required if (sqlResponse == null) { try (QueryRequestLimits limit = new QueryRequestLimits(sqlRequest.getProject())) { - sqlResponse = queryAndUpdateCache(sqlRequest, startTime, isQueryCacheEnabled); + sqlResponse = queryAndUpdateCache(sqlRequest, isQueryCacheEnabled); } } else { Trace.addTimelineAnnotation("response without real execution"); @@ -462,7 +460,7 @@ public class QueryService extends BasicService { if (!sqlResponse.getIsException()) checkQueryAuth(sqlResponse, project); - sqlResponse.setDuration(System.currentTimeMillis() - startTime); + sqlResponse.setDuration(queryContext.getAccumulatedMillis()); sqlResponse.setTraceUrl(traceUrl); logQuery(queryContext.getQueryId(), sqlRequest, sqlResponse); try { @@ -477,17 +475,17 @@ public class QueryService extends BasicService { } finally { BackdoorToggles.cleanToggles(); - QueryContextManager.resetCurrent(); + QueryContextFacade.resetCurrent(); if (scope != null) { scope.close(); } } } - private SQLResponse queryAndUpdateCache(SQLRequest sqlRequest, long startTime, boolean queryCacheEnabled) { + private SQLResponse queryAndUpdateCache(SQLRequest sqlRequest, boolean queryCacheEnabled) { KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); Message msg = MsgPicker.getMsg(); - final QueryContext queryContext = QueryContextManager.current(); + final QueryContext queryContext = QueryContextFacade.current(); SQLResponse sqlResponse = null; try { @@ -624,7 +622,7 @@ public class QueryService extends BasicService { conn = QueryConnection.getConnection(sqlRequest.getProject()); String userInfo = SecurityContextHolder.getContext().getAuthentication().getName(); - QueryContext context = QueryContextManager.current(); + QueryContext context = QueryContextFacade.current(); context.setUsername(userInfo); context.setGroups(AclPermissionUtil.getCurrentUserGroups()); final Collection<? extends GrantedAuthority> grantedAuthorities = SecurityContextHolder.getContext() @@ -1046,7 +1044,7 @@ public class QueryService extends BasicService { boolean isPartialResult = false; StringBuilder cubeSb = new StringBuilder(); StringBuilder logSb = new StringBuilder("Processed rows for each storageContext: "); - QueryContext queryContext = QueryContextManager.current(); + QueryContext queryContext = QueryContextFacade.current(); if (OLAPContext.getThreadLocalContexts() != null) { // contexts can be null in case of 'explain plan for' for (OLAPContext ctx : OLAPContext.getThreadLocalContexts()) { String realizationName = "NULL"; http://git-wip-us.apache.org/repos/asf/kylin/blob/e1d8b71b/server/src/main/resources/kylinSecurity.xml ---------------------------------------------------------------------- diff --git a/server/src/main/resources/kylinSecurity.xml b/server/src/main/resources/kylinSecurity.xml index 3d5b686..f9c0d71 100644 --- a/server/src/main/resources/kylinSecurity.xml +++ b/server/src/main/resources/kylinSecurity.xml @@ -236,6 +236,8 @@ <scr:http-basic entry-point-ref="unauthorisedEntryPoint"/> <scr:intercept-url pattern="/api/user/authentication*/**" access="permitAll"/> + <scr:intercept-url pattern="/api/query/runningQueries" access="hasRole('ROLE_ADMIN')"/> + <scr:intercept-url pattern="/api/query/*/stop" access="hasRole('ROLE_ADMIN')"/> <scr:intercept-url pattern="/api/query*/**" access="isAuthenticated()"/> <scr:intercept-url pattern="/api/metadata*/**" access="isAuthenticated()"/> <scr:intercept-url pattern="/api/**/metrics" access="permitAll"/> @@ -279,6 +281,8 @@ <scr:http-basic entry-point-ref="unauthorisedEntryPoint"/> <scr:intercept-url pattern="/api/user/authentication*/**" access="permitAll"/> + <scr:intercept-url pattern="/api/query/runningQueries" access="hasRole('ROLE_ADMIN')"/> + <scr:intercept-url pattern="/api/query/*/stop" access="hasRole('ROLE_ADMIN')"/> <scr:intercept-url pattern="/api/query*/**" access="isAuthenticated()"/> <scr:intercept-url pattern="/api/metadata*/**" access="isAuthenticated()"/> <scr:intercept-url pattern="/api/**/metrics" access="permitAll"/> http://git-wip-us.apache.org/repos/asf/kylin/blob/e1d8b71b/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java b/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java index d4a16f8..8cd7489 100644 --- a/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java +++ b/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java @@ -26,7 +26,7 @@ import javax.management.MBeanServer; import javax.management.ObjectName; import org.apache.kylin.common.QueryContext; -import org.apache.kylin.common.QueryContextManager; +import org.apache.kylin.common.QueryContextFacade; import org.apache.kylin.rest.request.SQLRequest; import org.apache.kylin.rest.response.SQLResponse; import org.apache.kylin.rest.service.ServiceTestBase; @@ -122,7 +122,7 @@ public class QueryMetricsTest extends ServiceTestBase { sqlRequest.setSql("select * from TEST_KYLIN_FACT"); sqlRequest.setProject("default"); - QueryContext context = QueryContextManager.current(); + QueryContext context = QueryContextFacade.current(); SQLResponse sqlResponse = new SQLResponse(); sqlResponse.setDuration(10); http://git-wip-us.apache.org/repos/asf/kylin/blob/e1d8b71b/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java index 061e622..5c633a3 100644 --- a/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java +++ b/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java @@ -22,8 +22,8 @@ import java.io.IOException; import java.sql.SQLException; import org.apache.kylin.common.QueryContext; -import org.apache.kylin.common.QueryContextManager; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.QueryContextFacade; import org.apache.kylin.job.exception.JobException; import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.query.QueryConnection; @@ -65,7 +65,7 @@ public class QueryServiceTest extends ServiceTestBase { SQLRequest request = new SQLRequest(); request.setSql("select * from test_table"); request.setAcceptPartial(true); - QueryContext queryContext = QueryContextManager.current(); + QueryContext queryContext = QueryContextFacade.current(); SQLResponse response = new SQLResponse(); response.setHitExceptionCache(true); queryService.logQuery(queryContext.getQueryId(), request, response); http://git-wip-us.apache.org/repos/asf/kylin/blob/e1d8b71b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java index c660cad..1e2fbd6 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java @@ -30,7 +30,7 @@ import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.FuzzyRowFilter; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.QueryContext; -import org.apache.kylin.common.QueryContextManager; +import org.apache.kylin.common.QueryContextFacade; import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.ImmutableBitSet; @@ -77,7 +77,7 @@ public abstract class CubeHBaseRPC implements IGTStorage { this.cubeSeg = (CubeSegment) segment; this.cuboid = cuboid; this.fullGTInfo = fullGTInfo; - this.queryContext = QueryContextManager.current(); + this.queryContext = QueryContextFacade.current(); this.storageContext = context; this.fuzzyKeyEncoder = new FuzzyKeyEncoder(cubeSeg, cuboid);