This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 96d806eae898e912494df714b03f3b12ce2a21ba
Author: jlf <longfei.ji...@kyligence.io>
AuthorDate: Fri Feb 24 22:27:19 2023 +0800

    KYLIN-5532 When calcite optimize, can cancel query search
---
 .../org/apache/kylin/query/SlowQueryDetector.java  |  9 ++++++--
 .../apache/kylin/rest/service/QueryService.java    |  1 +
 .../kylin/rest/service/QueryServiceTest.java       | 24 ++++++++++++++++++++++
 3 files changed, 32 insertions(+), 2 deletions(-)

diff --git 
a/src/query-common/src/main/java/org/apache/kylin/query/SlowQueryDetector.java 
b/src/query-common/src/main/java/org/apache/kylin/query/SlowQueryDetector.java
index 3810c2d09f..136127a3b3 100644
--- 
a/src/query-common/src/main/java/org/apache/kylin/query/SlowQueryDetector.java
+++ 
b/src/query-common/src/main/java/org/apache/kylin/query/SlowQueryDetector.java
@@ -21,6 +21,7 @@ package org.apache.kylin.query;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.calcite.util.CancelFlag;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
 import org.slf4j.Logger;
@@ -78,8 +79,9 @@ public class SlowQueryDetector extends Thread {
         if (QueryContext.current().getQueryTagInfo().isAsyncQuery()) {
             return;
         }
-        runningQueries.put(currentThread(), new 
QueryEntry(System.currentTimeMillis(), currentThread(),
-                QueryContext.current().getQueryId(), 
QueryContext.current().getUserSQL(), stopId, false));
+        runningQueries.put(currentThread(),
+                new QueryEntry(System.currentTimeMillis(), currentThread(), 
QueryContext.current().getQueryId(),
+                        QueryContext.current().getUserSQL(), stopId, false, 
CancelFlag.getContextCancelFlag()));
     }
 
     public void queryEnd() {
@@ -112,6 +114,7 @@ public class SlowQueryDetector extends Thread {
         // interrupt query thread if Stop By User but running
         for (QueryEntry e : runningQueries.values()) {
             if (e.isStopByUser) {
+                e.getPlannerCancelFlag().requestCancel();
                 e.getThread().interrupt();
                 logger.error("Trying to cancel query: {}", 
e.getThread().getName());
             }
@@ -162,6 +165,7 @@ public class SlowQueryDetector extends Thread {
         final String sql;
         final String stopId;
         boolean isStopByUser;
+        final CancelFlag plannerCancelFlag;
 
         public long getRunningTime() {
             return (System.currentTimeMillis() - startTime) / 1000;
@@ -170,6 +174,7 @@ public class SlowQueryDetector extends Thread {
         private boolean setInterruptIfTimeout() {
             long runningMs = System.currentTimeMillis() - startTime;
             if (runningMs >= queryTimeoutMs) {
+                plannerCancelFlag.requestCancel();
                 thread.interrupt();
                 logger.error("Trying to cancel query: {}", thread.getName());
                 return true;
diff --git 
a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
 
b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 42fa8425b5..9a3c19a10c 100644
--- 
a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ 
b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -336,6 +336,7 @@ public class QueryService extends BasicService implements 
CacheSignatureQuerySup
             if (e.getStopId().equals(id)) {
                 logger.error("Trying to cancel query: {}", 
e.getThread().getName());
                 e.setStopByUser(true);
+                e.getPlannerCancelFlag().requestCancel();
                 e.getThread().interrupt();
                 break;
             }
diff --git 
a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
 
b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
index 4dd8eff89f..4f1c9c1fba 100644
--- 
a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
+++ 
b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
@@ -18,11 +18,13 @@
 
 package org.apache.kylin.rest.service;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.kylin.common.QueryContext.PUSHDOWN_HIVE;
 import static org.apache.kylin.common.QueryTrace.EXECUTION;
 import static org.apache.kylin.common.QueryTrace.SPARK_JOB_EXECUTION;
 import static 
org.apache.kylin.common.exception.code.ErrorCodeServer.PROJECT_NOT_EXIST;
 import static 
org.apache.kylin.rest.metrics.QueryMetricsContextTest.getInfluxdbFields;
+import static org.awaitility.Awaitility.await;
 import static 
org.springframework.security.acls.domain.BasePermission.ADMINISTRATION;
 
 import java.io.ByteArrayInputStream;
@@ -101,6 +103,7 @@ import org.apache.kylin.metadata.querymeta.TableMeta;
 import org.apache.kylin.metadata.querymeta.TableMetaWithType;
 import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+import org.apache.kylin.query.SlowQueryDetector;
 import org.apache.kylin.metadata.user.ManagedUser;
 import org.apache.kylin.query.blacklist.SQLBlacklistItem;
 import org.apache.kylin.query.blacklist.SQLBlacklistManager;
@@ -133,6 +136,7 @@ import org.apache.kylin.rest.util.QueryCacheSignatureUtil;
 import org.apache.kylin.rest.util.SpringContext;
 import org.apache.kylin.source.adhocquery.PushdownResult;
 import org.apache.spark.sql.SparkSession;
+import org.awaitility.Duration;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -2698,4 +2702,24 @@ public class QueryServiceTest extends 
NLocalFileMetadataTestCase {
         Object routeToCalcite2 = isCalciteEngineCapable.invoke(queryExec, 
rel2);
         Assert.assertEquals(true, routeToCalcite2);
     }
+
+    @Test
+    public void testStop() {
+        val stopId = RandomUtil.randomUUIDStr();
+        val execute = new Thread(() -> {
+            QueryContext.current().setQueryId(RandomUtil.randomUUIDStr());
+            QueryContext.current().setProject("default");
+            QueryContext.current().setUserSQL("select 1");
+            queryService.slowQueryDetector.queryStart(stopId);
+            await().pollDelay(new Duration(5, SECONDS)).until(() -> true);
+        });
+        execute.start();
+        await().pollDelay(new Duration(1, SECONDS)).until(() -> true);
+        queryService.stopQuery(stopId);
+        val result = SlowQueryDetector.getRunningQueries().values().stream()
+                .filter(entry -> StringUtils.equals(stopId, 
entry.getStopId())).findFirst();
+        Assert.assertTrue(result.isPresent());
+        val queryEntry = result.get();
+        
Assert.assertTrue(queryEntry.getPlannerCancelFlag().isCancelRequested());
+    }
 }

Reply via email to