This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 96d806eae898e912494df714b03f3b12ce2a21ba Author: jlf <longfei.ji...@kyligence.io> AuthorDate: Fri Feb 24 22:27:19 2023 +0800 KYLIN-5532 When calcite optimize, can cancel query search --- .../org/apache/kylin/query/SlowQueryDetector.java | 9 ++++++-- .../apache/kylin/rest/service/QueryService.java | 1 + .../kylin/rest/service/QueryServiceTest.java | 24 ++++++++++++++++++++++ 3 files changed, 32 insertions(+), 2 deletions(-) diff --git a/src/query-common/src/main/java/org/apache/kylin/query/SlowQueryDetector.java b/src/query-common/src/main/java/org/apache/kylin/query/SlowQueryDetector.java index 3810c2d09f..136127a3b3 100644 --- a/src/query-common/src/main/java/org/apache/kylin/query/SlowQueryDetector.java +++ b/src/query-common/src/main/java/org/apache/kylin/query/SlowQueryDetector.java @@ -21,6 +21,7 @@ package org.apache.kylin.query; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.apache.calcite.util.CancelFlag; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.QueryContext; import org.slf4j.Logger; @@ -78,8 +79,9 @@ public class SlowQueryDetector extends Thread { if (QueryContext.current().getQueryTagInfo().isAsyncQuery()) { return; } - runningQueries.put(currentThread(), new QueryEntry(System.currentTimeMillis(), currentThread(), - QueryContext.current().getQueryId(), QueryContext.current().getUserSQL(), stopId, false)); + runningQueries.put(currentThread(), + new QueryEntry(System.currentTimeMillis(), currentThread(), QueryContext.current().getQueryId(), + QueryContext.current().getUserSQL(), stopId, false, CancelFlag.getContextCancelFlag())); } public void queryEnd() { @@ -112,6 +114,7 @@ public class SlowQueryDetector extends Thread { // interrupt query thread if Stop By User but running for (QueryEntry e : runningQueries.values()) { if (e.isStopByUser) { + e.getPlannerCancelFlag().requestCancel(); e.getThread().interrupt(); logger.error("Trying to cancel query: {}", e.getThread().getName()); } @@ -162,6 +165,7 @@ public class SlowQueryDetector extends Thread { final String sql; final String stopId; boolean isStopByUser; + final CancelFlag plannerCancelFlag; public long getRunningTime() { return (System.currentTimeMillis() - startTime) / 1000; @@ -170,6 +174,7 @@ public class SlowQueryDetector extends Thread { private boolean setInterruptIfTimeout() { long runningMs = System.currentTimeMillis() - startTime; if (runningMs >= queryTimeoutMs) { + plannerCancelFlag.requestCancel(); thread.interrupt(); logger.error("Trying to cancel query: {}", thread.getName()); return true; diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java index 42fa8425b5..9a3c19a10c 100644 --- a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -336,6 +336,7 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup if (e.getStopId().equals(id)) { logger.error("Trying to cancel query: {}", e.getThread().getName()); e.setStopByUser(true); + e.getPlannerCancelFlag().requestCancel(); e.getThread().interrupt(); break; } diff --git a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java index 4dd8eff89f..4f1c9c1fba 100644 --- a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java +++ b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java @@ -18,11 +18,13 @@ package org.apache.kylin.rest.service; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.kylin.common.QueryContext.PUSHDOWN_HIVE; import static org.apache.kylin.common.QueryTrace.EXECUTION; import static org.apache.kylin.common.QueryTrace.SPARK_JOB_EXECUTION; import static org.apache.kylin.common.exception.code.ErrorCodeServer.PROJECT_NOT_EXIST; import static org.apache.kylin.rest.metrics.QueryMetricsContextTest.getInfluxdbFields; +import static org.awaitility.Awaitility.await; import static org.springframework.security.acls.domain.BasePermission.ADMINISTRATION; import java.io.ByteArrayInputStream; @@ -101,6 +103,7 @@ import org.apache.kylin.metadata.querymeta.TableMeta; import org.apache.kylin.metadata.querymeta.TableMetaWithType; import org.apache.kylin.metadata.realization.IRealization; import org.apache.kylin.metadata.realization.RealizationStatusEnum; +import org.apache.kylin.query.SlowQueryDetector; import org.apache.kylin.metadata.user.ManagedUser; import org.apache.kylin.query.blacklist.SQLBlacklistItem; import org.apache.kylin.query.blacklist.SQLBlacklistManager; @@ -133,6 +136,7 @@ import org.apache.kylin.rest.util.QueryCacheSignatureUtil; import org.apache.kylin.rest.util.SpringContext; import org.apache.kylin.source.adhocquery.PushdownResult; import org.apache.spark.sql.SparkSession; +import org.awaitility.Duration; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -2698,4 +2702,24 @@ public class QueryServiceTest extends NLocalFileMetadataTestCase { Object routeToCalcite2 = isCalciteEngineCapable.invoke(queryExec, rel2); Assert.assertEquals(true, routeToCalcite2); } + + @Test + public void testStop() { + val stopId = RandomUtil.randomUUIDStr(); + val execute = new Thread(() -> { + QueryContext.current().setQueryId(RandomUtil.randomUUIDStr()); + QueryContext.current().setProject("default"); + QueryContext.current().setUserSQL("select 1"); + queryService.slowQueryDetector.queryStart(stopId); + await().pollDelay(new Duration(5, SECONDS)).until(() -> true); + }); + execute.start(); + await().pollDelay(new Duration(1, SECONDS)).until(() -> true); + queryService.stopQuery(stopId); + val result = SlowQueryDetector.getRunningQueries().values().stream() + .filter(entry -> StringUtils.equals(stopId, entry.getStopId())).findFirst(); + Assert.assertTrue(result.isPresent()); + val queryEntry = result.get(); + Assert.assertTrue(queryEntry.getPlannerCancelFlag().isCancelRequested()); + } }