This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 3297beff726784e70753c8a2cf5b0bba48fdfdca
Author: Jiawei Li <1019037...@qq.com>
AuthorDate: Mon Mar 6 10:05:03 2023 +0800

    KYLIN-5546 Add stop async query job api.
---
 .../config/initialize/ProcessStatusListener.java   |   8 ++
 .../apache/kylin/query/util/SlowQueryDetector.java |  50 ++++++--
 .../org/apache/kylin/event/SchemaChangeTest.java   |   8 +-
 .../apache/kylin/newten/SlowQueryDetectorTest.java | 135 +++++++++++++++++++++
 .../rest/controller/NBuildAndQueryMetricsTest.java |  10 +-
 .../rest/controller/NAsyncQueryController.java     |  31 +++++
 .../rest/controller/NAsyncQueryControllerTest.java |  67 ++++++++++
 .../apache/kylin/rest/service/QueryService.java    |  20 ++-
 .../kylin/rest/service/QueryServiceTest.java       |   4 +-
 9 files changed, 300 insertions(+), 33 deletions(-)

diff --git 
a/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/ProcessStatusListener.java
 
b/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/ProcessStatusListener.java
index 5bc16b914f..d18db060d5 100644
--- 
a/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/ProcessStatusListener.java
+++ 
b/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/ProcessStatusListener.java
@@ -33,6 +33,8 @@ import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.kylin.cluster.ClusterManagerFactory;
+import org.apache.kylin.cluster.IClusterManager;
 import org.apache.kylin.common.KapConfig;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.CliCommandExecutor;
@@ -105,6 +107,12 @@ public class ProcessStatusListener {
     @Subscribe
     public void destroyProcessByJobId(CliCommandExecutor.JobKilled jobKilled) {
         val jobId = jobKilled.getJobId();
+
+        IClusterManager clusterManager = 
ClusterManagerFactory.create(KylinConfig.getInstanceFromEnv());
+        if (clusterManager.applicationExisted(jobId)) {
+            clusterManager.killApplication(jobId);
+        }
+
         final Map<Integer, String> children;
         fileLock.lock();
         try {
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/query/util/SlowQueryDetector.java
 
b/src/core-common/src/main/java/org/apache/kylin/query/util/SlowQueryDetector.java
index 387399d161..a0b1394265 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/query/util/SlowQueryDetector.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/query/util/SlowQueryDetector.java
@@ -23,6 +23,8 @@ import java.util.concurrent.ConcurrentMap;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.scheduler.EventBusFactory;
+import org.apache.kylin.common.util.CliCommandExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,18 +77,41 @@ public class SlowQueryDetector extends Thread {
     }
 
     public void queryStart(String stopId) {
-        if (QueryContext.current().getQueryTagInfo().isAsyncQuery()) {
-            return;
+        runningQueries.put(currentThread(), new 
QueryEntry(System.currentTimeMillis(), currentThread(),
+                QueryContext.current().getQueryId(), 
QueryContext.current().getUserSQL(), stopId, false,
+                QueryContext.current().getQueryTagInfo().isAsyncQuery(), null, 
CancelFlag.getContextCancelFlag()));
+    }
+
+    public void addJobIdForAsyncQueryJob(String jobId) {
+        QueryEntry queryEntry = runningQueries.get(currentThread());
+        if (queryEntry != null) {
+            queryEntry.setJobId(jobId);
         }
-        runningQueries.put(currentThread(),
-                new QueryEntry(System.currentTimeMillis(), currentThread(), 
QueryContext.current().getQueryId(),
-                        QueryContext.current().getUserSQL(), stopId, false, 
CancelFlag.getContextCancelFlag()));
+
     }
 
-    public void queryEnd() {
-        if (QueryContext.current().getQueryTagInfo().isAsyncQuery()) {
-            return;
+    public void stopQuery(String id) {
+        for (SlowQueryDetector.QueryEntry e : 
SlowQueryDetector.getRunningQueries().values()) {
+            if ((e.isAsyncQuery() && id.equals(e.getQueryId())) || 
(!e.isAsyncQuery() && id.equals(e.getStopId()))) {
+                e.setStopByUser(true);
+                doStopQuery(e);
+                break;
+            }
+        }
+    }
+
+    private void doStopQuery(QueryEntry e) {
+        if (e.getJobId() == null) {
+            e.getPlannerCancelFlag().requestCancel();
+            logger.error("Trying to cancel query: {}", 
e.getThread().getName());
+            e.getThread().interrupt();
+        } else {
+            logger.error("Trying to cancel query job : {},{}", 
e.getThread().getName(), e.getJobId());
+            EventBusFactory.getInstance().postSync(new 
CliCommandExecutor.JobKilled(e.getJobId()));
         }
+    }
+
+    public void queryEnd() {
         QueryEntry entry = runningQueries.remove(currentThread());
         if (null != entry && null != 
canceledSlowQueriesStatus.get(entry.queryId)) {
             canceledSlowQueriesStatus.remove(entry.queryId);
@@ -113,9 +138,7 @@ public class SlowQueryDetector extends Thread {
         // interrupt query thread if Stop By User but running
         for (QueryEntry e : runningQueries.values()) {
             if (e.isStopByUser) {
-                e.getPlannerCancelFlag().requestCancel();
-                e.getThread().interrupt();
-                logger.error("Trying to cancel query: {}", 
e.getThread().getName());
+                doStopQuery(e);
             }
         }
     }
@@ -164,6 +187,8 @@ public class SlowQueryDetector extends Thread {
         final String sql;
         final String stopId;
         boolean isStopByUser;
+        final boolean isAsyncQuery;
+        String jobId;
         final CancelFlag plannerCancelFlag;
 
         public long getRunningTime() {
@@ -171,6 +196,9 @@ public class SlowQueryDetector extends Thread {
         }
 
         private boolean setInterruptIfTimeout() {
+            if (isAsyncQuery) {
+                return false;
+            }
             long runningMs = System.currentTimeMillis() - startTime;
             if (runningMs >= queryTimeoutMs) {
                 plannerCancelFlag.requestCancel();
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/event/SchemaChangeTest.java 
b/src/kylin-it/src/test/java/org/apache/kylin/event/SchemaChangeTest.java
index 00b58e7418..c3e3e36b99 100644
--- a/src/kylin-it/src/test/java/org/apache/kylin/event/SchemaChangeTest.java
+++ b/src/kylin-it/src/test/java/org/apache/kylin/event/SchemaChangeTest.java
@@ -231,21 +231,21 @@ public class SchemaChangeTest extends 
AbstractMVCIntegrationTestCase {
     }
 
     @Test
-    public void testAddColumn() throws Exception {
+    public void testAddColumn() throws Throwable {
         addColumn(TABLE_IDENTITY, new ColumnDesc("", "tmp1", "bigint", "", "", 
"", null));
         tableService.reloadTable(getProject(), TABLE_IDENTITY, false, -1, 
true);
         assertSqls();
     }
 
     @Test
-    public void testRemoveColumn() throws Exception {
+    public void testRemoveColumn() throws Throwable {
         removeColumn(TABLE_IDENTITY, "SRC_ID");
         tableService.reloadTable(getProject(), TABLE_IDENTITY, false, -1, 
true);
         assertSqls();
     }
 
     @Test
-    public void testChangeColumnType() throws Exception {
+    public void testChangeColumnType() throws Throwable {
         changeColumns(TABLE_IDENTITY, Sets.newHashSet("SRC_ID"), columnDesc -> 
columnDesc.setDatatype("string"));
         tableService.reloadTable(getProject(), TABLE_IDENTITY, false, -1, 
true);
         assertSqls();
@@ -265,7 +265,7 @@ public class SchemaChangeTest extends 
AbstractMVCIntegrationTestCase {
         Assert.assertEquals(0, pair.getSecond().size());
     }
 
-    private void assertSqls() throws Exception {
+    private void assertSqls() throws Throwable {
         for (Pair<String, Boolean> pair : 
Arrays.asList(Pair.newPair(SQL_LOOKUP, false),
                 Pair.newPair(SQL_DERIVED, false), Pair.newPair(SQL_LOOKUP2, 
true), Pair.newPair(SQL_DERIVED2, true))) {
             val req = new SQLRequest();
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/SlowQueryDetectorTest.java 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/SlowQueryDetectorTest.java
index 15301252ab..c863cd2632 100644
--- 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/SlowQueryDetectorTest.java
+++ 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/SlowQueryDetectorTest.java
@@ -21,7 +21,9 @@ package org.apache.kylin.newten;
 import static org.awaitility.Awaitility.await;
 
 import java.io.File;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
@@ -98,6 +100,139 @@ public class SlowQueryDetectorTest extends 
NLocalWithSparkSessionTest {
         slowQueryDetector.queryEnd();
     }
 
+    @Test
+    public void testStopQuery() throws Exception {
+        AtomicReference<InterruptedException> exception = new 
AtomicReference<>();
+        Semaphore semaphore = new Semaphore(1);
+        semaphore.acquire();
+        Thread query = new Thread(() -> {
+            slowQueryDetector.queryStart("stopId");
+            try {
+                semaphore.acquire();
+                Assert.fail();
+            } catch (InterruptedException e) {
+                exception.set(e);
+            } finally {
+                slowQueryDetector.queryEnd();
+
+            }
+
+        });
+        query.start();
+        await().atMost(2, TimeUnit.SECONDS).until(() -> 
slowQueryDetector.getRunningQueries().size() > 0);
+        slowQueryDetector.stopQuery("stopId");
+        semaphore.release();
+        query.join();
+        assert exception.get() != null;
+
+    }
+
+    @Test
+    public void testStopQueryWrong() throws Exception {
+        AtomicReference<InterruptedException> exception = new 
AtomicReference<>();
+        Semaphore semaphore = new Semaphore(1);
+        semaphore.acquire();
+        Thread query = new Thread(() -> {
+            slowQueryDetector.queryStart("stopId");
+            try {
+                semaphore.acquire();
+            } catch (InterruptedException e) {
+                exception.set(e);
+            } finally {
+                slowQueryDetector.queryEnd();
+            }
+
+        });
+        query.start();
+        await().atMost(2, TimeUnit.SECONDS).until(() -> 
slowQueryDetector.getRunningQueries().size() > 0);
+        slowQueryDetector.stopQuery("stopId-wrong");
+        semaphore.release();
+        query.join();
+        Assert.assertEquals(null, exception.get());
+
+    }
+
+    @Test
+    public void testStopAsyncQuery() throws Exception {
+        AtomicReference<InterruptedException> exception = new 
AtomicReference<>();
+        Semaphore semaphore = new Semaphore(1);
+        semaphore.acquire();
+        Thread asyncQuery = new Thread(() -> {
+            try {
+                QueryContext.current().getQueryTagInfo().setAsyncQuery(true);
+                QueryContext.current().setQueryId("queryId-async");
+                slowQueryDetector.queryStart("");
+                semaphore.acquire();
+                Assert.fail();
+            } catch (InterruptedException e) {
+                exception.set(e);
+            } finally {
+                slowQueryDetector.queryEnd();
+            }
+
+        });
+        asyncQuery.start();
+        await().atMost(2, TimeUnit.SECONDS).until(() -> 
slowQueryDetector.getRunningQueries().size() > 0);
+        slowQueryDetector.stopQuery("queryId-async");
+        semaphore.release();
+        asyncQuery.join();
+        assert exception.get() != null;
+
+    }
+
+    @Test
+    public void testStopAsyncQueryWrong() throws Exception {
+        AtomicReference<InterruptedException> exception = new 
AtomicReference<>();
+        Semaphore semaphore = new Semaphore(1);
+        semaphore.acquire();
+        Thread asyncQuery = new Thread(() -> {
+            try {
+                QueryContext.current().getQueryTagInfo().setAsyncQuery(true);
+                QueryContext.current().setQueryId("queryId");
+                slowQueryDetector.queryStart("");
+                semaphore.acquire();
+            } catch (InterruptedException e) {
+                exception.set(e);
+
+            }
+            slowQueryDetector.queryEnd();
+        });
+        asyncQuery.start();
+        await().atMost(2, TimeUnit.SECONDS).until(() -> 
slowQueryDetector.getRunningQueries().size() > 0);
+        slowQueryDetector.stopQuery("queryId-error");
+        semaphore.release();
+        asyncQuery.join();
+        Assert.assertEquals(null, exception.get());
+
+    }
+
+    @Test
+    public void testStopAsyncQueryJob() throws Exception {
+        AtomicReference<Exception> exception = new AtomicReference<>();
+        Semaphore semaphore = new Semaphore(1);
+        semaphore.acquire();
+        Thread asyncQuery = new Thread(() -> {
+            try {
+                QueryContext.current().getQueryTagInfo().setAsyncQuery(true);
+                QueryContext.current().setQueryId("queryId");
+                slowQueryDetector.addJobIdForAsyncQueryJob("jobId2");
+                slowQueryDetector.queryStart("");
+                slowQueryDetector.addJobIdForAsyncQueryJob("jobId");
+                semaphore.acquire();
+            } catch (InterruptedException e) {
+                exception.set(e);
+            }
+            slowQueryDetector.queryEnd();
+        });
+        asyncQuery.start();
+        await().atMost(2, TimeUnit.SECONDS).until(() -> 
slowQueryDetector.getRunningQueries().size() > 0);
+        slowQueryDetector.stopQuery("queryId");
+        semaphore.release();
+        asyncQuery.join();
+        Assert.assertEquals(null, exception.get());
+
+    }
+
     @Test
     public void testSparderTimeoutCancelJob() throws Exception {
         val df = SparderEnv.getSparkSession().emptyDataFrame();
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/rest/controller/NBuildAndQueryMetricsTest.java
 
b/src/kylin-it/src/test/java/org/apache/kylin/rest/controller/NBuildAndQueryMetricsTest.java
index 3af6d6773a..74ed6491e0 100644
--- 
a/src/kylin-it/src/test/java/org/apache/kylin/rest/controller/NBuildAndQueryMetricsTest.java
+++ 
b/src/kylin-it/src/test/java/org/apache/kylin/rest/controller/NBuildAndQueryMetricsTest.java
@@ -249,31 +249,31 @@ public class NBuildAndQueryMetricsTest extends 
AbstractMVCIntegrationTestCase {
     }
 
     @Test
-    public void testMetricsScanForPushDown() throws Exception {
+    public void testMetricsScanForPushDown() throws Throwable {
         String sql = "select account_id from test_account limit 30";
         assertMetric(sql, 30);
     }
 
     @Test
-    public void testMetricsScanForTableIndex() throws Exception {
+    public void testMetricsScanForTableIndex() throws Throwable {
         String sql = "select count(distinct case when trans_id > 100 then 
order_id else 0 end),"
                 + "sum(case when trans_id > 100 then price else 0 end), price 
from test_kylin_fact group by price limit 20";
         assertMetric(sql, 10000);
     }
 
     @Test
-    public void testMetricsScanForTableIndex2() throws Exception {
+    public void testMetricsScanForTableIndex2() throws Throwable {
         String sql = "select trans_id from test_kylin_fact limit 20";
         assertMetric(sql, 4096);
     }
 
     @Test
-    public void testMetricsScanForAggIndex() throws Exception {
+    public void testMetricsScanForAggIndex() throws Throwable {
         String sql = "select trans_id from test_kylin_fact group by trans_id 
limit 20";
         assertMetric(sql, 10000);
     }
 
-    private void assertMetric(String sql, long scanRowsExpect) throws 
Exception {
+    private void assertMetric(String sql, long scanRowsExpect) throws 
Throwable {
         val req = new SQLRequest();
         req.setSql(sql);
         req.setProject(getProject());
diff --git 
a/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java
 
b/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java
index 99a3f4ee7a..5616116f6e 100644
--- 
a/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java
+++ 
b/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java
@@ -22,7 +22,9 @@ import static 
org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLI
 import static org.apache.kylin.common.exception.ServerErrorCode.ACCESS_DENIED;
 import static 
org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_INCLUDE_HEADER_NOT_EMPTY;
 import static 
org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_PROJECT_NAME_EMPTY;
+import static 
org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_RESULT_NOT_FOUND;
 import static 
org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_TIME_FORMAT_ERROR;
+import static 
org.apache.kylin.rest.service.AsyncQueryService.QueryStatus.RUNNING;
 
 import java.io.IOException;
 import java.text.ParseException;
@@ -43,6 +45,8 @@ import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.common.exception.QueryErrorCode;
 import org.apache.kylin.common.msg.Message;
 import org.apache.kylin.common.msg.MsgPicker;
+import 
org.apache.kylin.common.persistence.transaction.StopQueryBroadcastEventNotifier;
+import org.apache.kylin.common.scheduler.EventBusFactory;
 import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.query.util.AsyncQueryUtil;
 import org.apache.kylin.rest.exception.ForbiddenException;
@@ -245,6 +249,33 @@ public class NAsyncQueryController extends 
NBasicController {
             return new EnvelopeResponse<>(KylinException.CODE_SUCCESS, false, 
MsgPicker.getMsg().getCleanFolderFail());
     }
 
+    @ApiOperation(value = "stopQuery", tags = { "QE" })
+    @DeleteMapping(value = "/async_query/stop/{query_id:.+}")
+    @ResponseBody
+    public EnvelopeResponse<String> stopAsyncQuery(@PathVariable("query_id") 
String queryId,
+            @RequestParam(value = "project", required = false) String project) 
throws IOException {
+        if (project == null) {
+            throw new KylinException(ASYNC_QUERY_PROJECT_NAME_EMPTY);
+        }
+        aclEvaluate.checkProjectAdminPermission(project);
+        checkProjectName(project);
+        if (!asyncQueryService.hasPermission(queryId, project)) {
+            return new EnvelopeResponse<>(KylinException.CODE_UNAUTHORIZED, 
"Access denied.",
+                    "Access denied. Only admin users can stop the query");
+        }
+        AsyncQueryService.QueryStatus queryStatus = 
asyncQueryService.queryStatus(project, queryId);
+        if (queryStatus == AsyncQueryService.QueryStatus.MISS) {
+            throw new KylinException(ASYNC_QUERY_RESULT_NOT_FOUND);
+        }
+        if (queryStatus != RUNNING) {
+            return new EnvelopeResponse<>(KylinException.CODE_UNDEFINED, 
"Query is not running",
+                    "Query is not running. please check");
+        }
+        queryService.stopQuery(queryId);
+        EventBusFactory.getInstance().postAsync(new 
StopQueryBroadcastEventNotifier(queryId));
+        return new EnvelopeResponse<>(KylinException.CODE_SUCCESS, "", "");
+    }
+
     @ApiOperation(value = "query", tags = { "QE" }, notes = "Update Response: 
query_id")
     @GetMapping(value = "/async_query/{query_id:.+}/status")
     @ResponseBody
diff --git 
a/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerTest.java
 
b/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerTest.java
index 07bd68bd9c..b26a655067 100644
--- 
a/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerTest.java
+++ 
b/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerTest.java
@@ -169,6 +169,73 @@ public class NAsyncQueryControllerTest extends 
NLocalFileMetadataTestCase {
         
Mockito.verify(nAsyncQueryController).deleteByQueryId(Mockito.anyString(), 
Mockito.any(), Mockito.any());
     }
 
+    @Test
+    public void testStopByQueryId() throws Exception {
+        String queryId = "123XXX";
+
+        
mockMvc.perform(MockMvcRequestBuilders.delete("/api/async_query/stop/{query_id}",
 "123")
+                .contentType(MediaType.APPLICATION_JSON)
+                
.content(JsonUtil.writeValueAsString(mockAsyncQuerySQLRequest()))
+                .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON)))
+                
.andExpect(MockMvcResultMatchers.status().isInternalServerError());
+
+        Mockito.doThrow(new KylinException(ACCESS_DENIED, "Access is 
denied")).when(aclEvaluate)
+                .checkProjectAdminPermission(PROJECT);
+
+        
mockMvc.perform(MockMvcRequestBuilders.delete("/api/async_query/stop/{query_id}?project=default",
 queryId)
+                .contentType(MediaType.APPLICATION_JSON)
+                
.content(JsonUtil.writeValueAsString(mockAsyncQuerySQLRequest()))
+                .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON)))
+                
.andExpect(MockMvcResultMatchers.status().isInternalServerError());
+
+        
Mockito.doNothing().when(aclEvaluate).checkProjectAdminPermission(PROJECT);
+
+        Mockito.doReturn(false).when(asyncQueryService).hasPermission(queryId, 
PROJECT);
+        
mockMvc.perform(MockMvcRequestBuilders.delete("/api/async_query/stop/{query_id}?project=default",
 queryId)
+                .contentType(MediaType.APPLICATION_JSON)
+                
.content(JsonUtil.writeValueAsString(mockAsyncQuerySQLRequest()))
+                .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON)))
+                
.andExpect(MockMvcResultMatchers.status().isOk()).andExpect(result -> {
+                    result.getResponse().getContentAsString()
+                            .contains("Access denied. Only admin users can 
stop the query");
+
+                });
+
+        Mockito.doReturn(true).when(asyncQueryService).hasPermission(queryId, 
PROJECT);
+        Mockito.doReturn(MISS).when(asyncQueryService).queryStatus(PROJECT, 
queryId);
+        
mockMvc.perform(MockMvcRequestBuilders.delete("/api/async_query/stop/{query_id}?project=default",
 queryId)
+                .contentType(MediaType.APPLICATION_JSON)
+                
.content(JsonUtil.writeValueAsString(mockAsyncQuerySQLRequest()))
+                .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON)))
+                
.andExpect(MockMvcResultMatchers.status().isInternalServerError()).andExpect(result
 -> {
+                    result.getResponse().getContentAsString()
+                            .contains("Can’t find the query by this query ID 
in this project");
+
+                });
+
+        Mockito.doReturn(SUCCESS).when(asyncQueryService).queryStatus(PROJECT, 
queryId);
+        
mockMvc.perform(MockMvcRequestBuilders.delete("/api/async_query/stop/{query_id}?project=default",
 queryId)
+                .contentType(MediaType.APPLICATION_JSON)
+                
.content(JsonUtil.writeValueAsString(mockAsyncQuerySQLRequest()))
+                .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON)))
+                
.andExpect(MockMvcResultMatchers.status().isOk()).andExpect(result -> {
+                    result.getResponse().getContentAsString().contains("Query 
is not running");
+
+                });
+
+        Mockito.doReturn(RUNNING).when(asyncQueryService).queryStatus(PROJECT, 
queryId);
+        
mockMvc.perform(MockMvcRequestBuilders.delete("/api/async_query/stop/{query_id}?project=default",
 queryId)
+                .contentType(MediaType.APPLICATION_JSON)
+                
.content(JsonUtil.writeValueAsString(mockAsyncQuerySQLRequest()))
+                .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON)))
+                
.andExpect(MockMvcResultMatchers.status().isOk()).andExpect(result -> {
+                    result.getResponse().getContentAsString().contains("000");
+
+                });
+        Mockito.verify(kapQueryService).stopQuery(queryId);
+
+    }
+
     @Test
     public void testQueryStatusNoProjectPermission() throws Exception {
         Mockito.doThrow(new KylinException(ACCESS_DENIED, "Access is 
denied")).when(aclEvaluate)
diff --git 
a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
 
b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 14eacb5d43..c22b2ee0d9 100644
--- 
a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ 
b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -80,6 +80,7 @@ import org.apache.kylin.common.util.AddressUtil;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.SetThreadName;
+import org.apache.kylin.job.execution.ExecuteResult;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.acl.AclTCR;
 import org.apache.kylin.metadata.acl.AclTCRManager;
@@ -260,7 +261,7 @@ public class QueryService extends BasicService implements 
CacheSignatureQuerySup
         }
     }
 
-    public SQLResponse query(SQLRequest sqlRequest) throws Exception {
+    public SQLResponse query(SQLRequest sqlRequest) throws Throwable {
         try {
             slowQueryDetector.queryStart(sqlRequest.getStopId());
             markHighPriorityQueryIfNeeded();
@@ -305,7 +306,12 @@ public class QueryService extends BasicService implements 
CacheSignatureQuerySup
                 }
                 AsyncQueryJob asyncQueryJob = new AsyncQueryJob();
                 asyncQueryJob.setProject(queryParams.getProject());
-                asyncQueryJob.submit(queryParams);
+                
slowQueryDetector.addJobIdForAsyncQueryJob(asyncQueryJob.getId());
+                ExecuteResult result = asyncQueryJob.submit(queryParams);
+                if (!result.succeed()) {
+                    throw result.getThrowable();
+
+                }
                 return buildSqlResponse(false, Collections.emptyList(), 0, 
Lists.newArrayList(),
                         sqlRequest.getProject());
             }
@@ -332,15 +338,7 @@ public class QueryService extends BasicService implements 
CacheSignatureQuerySup
     }
 
     public void stopQuery(String id) {
-        for (SlowQueryDetector.QueryEntry e : 
SlowQueryDetector.getRunningQueries().values()) {
-            if (e.getStopId().equals(id)) {
-                logger.error("Trying to cancel query: {}", 
e.getThread().getName());
-                e.setStopByUser(true);
-                e.getPlannerCancelFlag().requestCancel();
-                e.getThread().interrupt();
-                break;
-            }
-        }
+        slowQueryDetector.stopQuery(id);
     }
 
     private void markHighPriorityQueryIfNeeded() {
diff --git 
a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
 
b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
index 2470fbecf4..fab3a328f4 100644
--- 
a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
+++ 
b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
@@ -682,7 +682,7 @@ public class QueryServiceTest extends 
NLocalFileMetadataTestCase {
     }
 
     @Test
-    public void testQueryWithTimeOutException() throws Exception {
+    public void testQueryWithTimeOutException() throws Throwable {
         final String sql = "select * from exception_table";
         final String project = "newten";
 
@@ -2004,7 +2004,7 @@ public class QueryServiceTest extends 
NLocalFileMetadataTestCase {
     }
 
     @Test
-    public void testTableauIntercept() throws Exception {
+    public void testTableauIntercept() throws Throwable {
         List<String> sqlList = 
Files.walk(Paths.get("./src/test/resources/query/tableau_probing"))
                 .filter(file -> Files.isRegularFile(file)).map(path -> {
                     try {

Reply via email to