KYLIN-3157, enhancement kylin's query timeout.
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/1f33dd93 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/1f33dd93 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/1f33dd93 Branch: refs/heads/master Commit: 1f33dd93123dd59795a16193a3b5064419f4b532 Parents: 87a0058 Author: Li Yang <liy...@apache.org> Authored: Fri Jan 26 18:39:57 2018 +0800 Committer: Li Yang <liy...@apache.org> Committed: Fri Jan 26 22:54:58 2018 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 13 ++- .../org/apache/kylin/common/QueryContext.java | 21 +--- .../apache/kylin/storage/StorageFactory.java | 4 + .../gtrecord/GTCubeStorageQueryBase.java | 3 +- .../gtrecord/SequentialCubeTupleIterator.java | 5 +- .../kylin/rest/service/BadQueryDetector.java | 17 ++- .../apache/kylin/rest/service/QueryService.java | 1 + .../rest/service/BadQueryDetectorTest.java | 4 +- .../rest/service/KyilnQueryTimeoutTest.java | 104 +++++++++++++++++++ .../storage/hbase/cube/v2/CubeHBaseRPC.java | 7 +- 10 files changed, 144 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/1f33dd93/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 8efd260..b053daa 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -1266,7 +1266,11 @@ abstract public class KylinConfigBase implements Serializable { } public int getBadQueryDefaultDetectIntervalSeconds() { - return Integer.parseInt(getOptional("kylin.query.badquery-detect-interval", "60")); + int time = getQueryTimeoutSeconds() / 2; // half of query timeout + if (time == 0) { + time = 60; // 60 sec + } + return time; } public boolean getBadQueryPersistentEnabled() { @@ -1324,7 +1328,12 @@ abstract public class KylinConfigBase implements Serializable { } public int getQueryTimeoutSeconds() { - return Integer.parseInt(this.getOptional("kylin.query.timeout-seconds", "0")); + int time = Integer.parseInt(this.getOptional("kylin.query.timeout-seconds", "0")); + if (time != 0 && time <= 60) { + logger.warn("query timeout seconds less than 60 sec, set to 60 sec."); + time = 60; + } + return time; } public boolean isPushDownEnabled() { http://git-wip-us.apache.org/repos/asf/kylin/blob/1f33dd93/core-common/src/main/java/org/apache/kylin/common/QueryContext.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java index cb1b09c..1aa94d3 100644 --- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java +++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java @@ -45,7 +45,6 @@ public class QueryContext { } private long queryStartMillis; - private long deadline = Long.MAX_VALUE; private final String queryId; private String username; @@ -75,26 +74,10 @@ public class QueryContext { return queryStartMillis; } - public void setDeadline(long timeoutMillis) { - if (timeoutMillis > 0) { - deadline = queryStartMillis + timeoutMillis; - } - } - - public long getDeadline() { - return deadline; - } - - /** - * @return millis before deadline - * @throws KylinTimeoutException if deadline has passed - */ - public long checkMillisBeforeDeadline() { - long remain = deadline - System.currentTimeMillis(); - if (remain <= 0) { + public void checkMillisBeforeDeadline() { + if (Thread.interrupted()) { throw new KylinTimeoutException("Query timeout"); } - return remain; } public String getQueryId() { http://git-wip-us.apache.org/repos/asf/kylin/blob/1f33dd93/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java index 79b93fe..3505708 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageFactory.java @@ -39,6 +39,10 @@ public class StorageFactory { return current.get(aware.getStorageType()); } + public static void clearCache() { + storages.remove(); + } + public static IStorageQuery createQuery(IRealization realization) { return storage(realization).createQuery(realization); } http://git-wip-us.apache.org/repos/asf/kylin/blob/1f33dd93/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java index ae1f64f..2f69b76 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java @@ -160,8 +160,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { sqlDigest.aggregations, context); // set whether to aggregate results from multiple partitions enableStreamAggregateIfBeneficial(cuboid, groupsD, context); - // set and check query deadline - QueryContextFacade.current().setDeadline(cubeInstance.getConfig().getQueryTimeoutSeconds() * 1000); + // check query deadline QueryContextFacade.current().checkMillisBeforeDeadline(); // push down having clause filter if possible http://git-wip-us.apache.org/repos/asf/kylin/blob/1f33dd93/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java index 72417bf..c067e33 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java @@ -144,10 +144,9 @@ public class SequentialCubeTupleIterator implements ITupleIterator { if (scanCount++ % 100 == 1) { QueryContextFacade.current().checkMillisBeforeDeadline(); } - - if (++scanCountDelta >= 1000) + if (++scanCountDelta >= 1000) { flushScanCountDelta(); - + } return tupleIterator.next(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/1f33dd93/server-base/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java b/server-base/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java index 7410c9c..4f7bccf 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/BadQueryDetector.java @@ -44,7 +44,8 @@ public class BadQueryDetector extends Thread { private final int alertMB; private final int alertRunningSec; private KylinConfig kylinConfig; - private ArrayList<Notifier> notifiers = new ArrayList<Notifier>(); + private ArrayList<Notifier> notifiers = new ArrayList<>(); + private int queryTimeoutSeconds; public BadQueryDetector() { super("BadQueryDetector"); @@ -53,17 +54,19 @@ public class BadQueryDetector extends Thread { this.detectionInterval = kylinConfig.getBadQueryDefaultDetectIntervalSeconds() * 1000L; this.alertMB = 100; this.alertRunningSec = kylinConfig.getBadQueryDefaultAlertingSeconds(); + this.queryTimeoutSeconds = kylinConfig.getQueryTimeoutSeconds(); initNotifiers(); } - public BadQueryDetector(long detectionInterval, int alertMB, int alertRunningSec) { + public BadQueryDetector(long detectionInterval, int alertMB, int alertRunningSec, int queryTimeoutSeconds) { super("BadQueryDetector"); this.setDaemon(true); this.detectionInterval = detectionInterval; this.alertMB = alertMB; this.alertRunningSec = alertRunningSec; this.kylinConfig = KylinConfig.getInstanceFromEnv(); + this.queryTimeoutSeconds = queryTimeoutSeconds; initNotifiers(); } @@ -121,6 +124,7 @@ public class BadQueryDetector extends Thread { notify(badReason, entry); } + @Override public void run() { while (true) { try { @@ -147,6 +151,8 @@ public class BadQueryDetector extends Thread { // report if query running long for (Entry e : entries) { float runningSec = (float) (now - e.startTime) / 1000; + setQueryThreadInterrupted(e, runningSec); + if (runningSec >= alertRunningSec) { notify(BadQueryEntry.ADJ_SLOW, e); dumpStackTrace(e.thread); @@ -161,6 +167,13 @@ public class BadQueryDetector extends Thread { } } + private void setQueryThreadInterrupted(Entry e, float runningSec) { + if (queryTimeoutSeconds != 0 && runningSec >= queryTimeoutSeconds) { + e.thread.interrupt(); + logger.error("Trying to cancel query:" + e.thread.getName()); + } + } + // log the stack trace of bad query thread for further analysis private void dumpStackTrace(Thread t) { int maxStackTraceDepth = kylinConfig.getBadQueryStackTraceDepth(); http://git-wip-us.apache.org/repos/asf/kylin/blob/1f33dd93/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java index 7b30606..56fab34 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -194,6 +194,7 @@ public class QueryService extends BasicService { } finally { String badReason = (ret != null && ret.isPushDown()) ? BadQueryEntry.ADJ_PUSHDOWN : null; badQueryDetector.queryEnd(Thread.currentThread(), badReason); + Thread.interrupted(); //reset if interrupted } } http://git-wip-us.apache.org/repos/asf/kylin/blob/1f33dd93/server/src/test/java/org/apache/kylin/rest/service/BadQueryDetectorTest.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/kylin/rest/service/BadQueryDetectorTest.java b/server/src/test/java/org/apache/kylin/rest/service/BadQueryDetectorTest.java index fc18d92..d61dfbe 100644 --- a/server/src/test/java/org/apache/kylin/rest/service/BadQueryDetectorTest.java +++ b/server/src/test/java/org/apache/kylin/rest/service/BadQueryDetectorTest.java @@ -50,7 +50,7 @@ public class BadQueryDetectorTest extends LocalFileMetadataTestCase { String mockSql = "select * from just_a_test"; final ArrayList<String[]> alerts = new ArrayList<>(); - BadQueryDetector badQueryDetector = new BadQueryDetector(alertRunningSec * 1000, alertMB, alertRunningSec); + BadQueryDetector badQueryDetector = new BadQueryDetector(alertRunningSec * 1000, alertMB, alertRunningSec, 1000); badQueryDetector.registerNotifier(new BadQueryDetector.Notifier() { @Override public void badQueryFound(String adj, float runningSec, long startTime, String project, String sql, String user, Thread t) { @@ -72,7 +72,7 @@ public class BadQueryDetectorTest extends LocalFileMetadataTestCase { badQueryDetector.queryEnd(Thread.currentThread(), BadQueryEntry.ADJ_PUSHDOWN); } - badQueryDetector.stop(); + badQueryDetector.interrupt(); assertEquals(2, alerts.size()); // second check founds a Slow http://git-wip-us.apache.org/repos/asf/kylin/blob/1f33dd93/server/src/test/java/org/apache/kylin/rest/service/KyilnQueryTimeoutTest.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/kylin/rest/service/KyilnQueryTimeoutTest.java b/server/src/test/java/org/apache/kylin/rest/service/KyilnQueryTimeoutTest.java new file mode 100644 index 0000000..25ff75b --- /dev/null +++ b/server/src/test/java/org/apache/kylin/rest/service/KyilnQueryTimeoutTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.rest.service; +import java.sql.SQLException; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.exceptions.KylinTimeoutException; +import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.apache.kylin.metadata.realization.IRealization; +import org.apache.kylin.metadata.realization.SQLDigest; +import org.apache.kylin.metadata.tuple.ITupleIterator; +import org.apache.kylin.metadata.tuple.TupleInfo; +import org.apache.kylin.query.security.QueryACLTestUtil; +import org.apache.kylin.rest.request.SQLRequest; +import org.apache.kylin.storage.IStorage; +import org.apache.kylin.storage.IStorageQuery; +import org.apache.kylin.storage.StorageContext; +import org.apache.kylin.storage.StorageFactory; +import org.hamcrest.CoreMatchers; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class KyilnQueryTimeoutTest extends LocalFileMetadataTestCase { + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Before + public void setUp() { + this.createTestMetadata(); + KylinConfig config = KylinConfig.getInstanceFromEnv(); + config.setProperty("kylin.storage.provider.2", MockQueryTimeoutStorage.class.getName()); + config.setProperty("kylin.storage.default", "2"); + } + + @After + public void after() throws Exception { + this.cleanupTestMetadata(); + StorageFactory.clearCache(); + } + + @Test + public void testQueryTimeout() throws SQLException { + thrown.expectCause(CoreMatchers.isA(KylinTimeoutException.class)); + thrown.expectMessage(CoreMatchers.containsString("Kylin query timeout")); + StorageFactory.clearCache(); + BadQueryDetector detector = new BadQueryDetector(100, BadQueryDetector.getSystemAvailMB() * 2, 100, 1); + detector.start(); + SQLRequest request = new SQLRequest(); + request.setProject("default"); + request.setSql("select count(*) from STREAMING_TABLE"); + detector.queryStart(Thread.currentThread(), request, "ADMIN"); + try { + QueryACLTestUtil.mockQuery("default", "select * from STREAMING_TABLE"); + } finally{ + detector.queryEnd(Thread.currentThread(), "timeout"); + detector.interrupt(); + } + } + + public static class MockQueryTimeoutStorage implements IStorage { + + @Override + public IStorageQuery createQuery(IRealization realization) { + return new MockQueryTimeoutQuery(); + } + + @Override + public <I> I adaptToBuildEngine(Class<I> engineInterface) { + return null; + } + } + + private static class MockQueryTimeoutQuery implements IStorageQuery { + @Override + public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) { + try { + Thread.sleep(5 * 1000); + } catch (InterruptedException e) { + throw new KylinTimeoutException("Kylin query timeout"); + } + return null; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/1f33dd93/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java index 2e82140..634a3cd 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java @@ -311,11 +311,8 @@ public abstract class CubeHBaseRPC implements IGTStorage { coopTimeout = (long) (rpcTimeout * 0.9); } - long millisBeforeDeadline = queryContext.checkMillisBeforeDeadline(); - coopTimeout = Math.min(coopTimeout, millisBeforeDeadline); - - logger.debug("{} = {} ms, {} ms before deadline, use {} ms as timeout for coprocessor", - HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout, millisBeforeDeadline, coopTimeout); + queryContext.checkMillisBeforeDeadline(); + logger.debug("{} = {} ms, use {} ms as timeout for coprocessor", HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout, coopTimeout); return coopTimeout; }