This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 3297beff726784e70753c8a2cf5b0bba48fdfdca Author: Jiawei Li <1019037...@qq.com> AuthorDate: Mon Mar 6 10:05:03 2023 +0800 KYLIN-5546 Add stop async query job api. --- .../config/initialize/ProcessStatusListener.java | 8 ++ .../apache/kylin/query/util/SlowQueryDetector.java | 50 ++++++-- .../org/apache/kylin/event/SchemaChangeTest.java | 8 +- .../apache/kylin/newten/SlowQueryDetectorTest.java | 135 +++++++++++++++++++++ .../rest/controller/NBuildAndQueryMetricsTest.java | 10 +- .../rest/controller/NAsyncQueryController.java | 31 +++++ .../rest/controller/NAsyncQueryControllerTest.java | 67 ++++++++++ .../apache/kylin/rest/service/QueryService.java | 20 ++- .../kylin/rest/service/QueryServiceTest.java | 4 +- 9 files changed, 300 insertions(+), 33 deletions(-) diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/ProcessStatusListener.java b/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/ProcessStatusListener.java index 5bc16b914f..d18db060d5 100644 --- a/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/ProcessStatusListener.java +++ b/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/ProcessStatusListener.java @@ -33,6 +33,8 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import org.apache.commons.io.FileUtils; +import org.apache.kylin.cluster.ClusterManagerFactory; +import org.apache.kylin.cluster.IClusterManager; import org.apache.kylin.common.KapConfig; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.CliCommandExecutor; @@ -105,6 +107,12 @@ public class ProcessStatusListener { @Subscribe public void destroyProcessByJobId(CliCommandExecutor.JobKilled jobKilled) { val jobId = jobKilled.getJobId(); + + IClusterManager clusterManager = ClusterManagerFactory.create(KylinConfig.getInstanceFromEnv()); + if (clusterManager.applicationExisted(jobId)) { + clusterManager.killApplication(jobId); + } + final Map<Integer, String> children; fileLock.lock(); try { diff --git a/src/core-common/src/main/java/org/apache/kylin/query/util/SlowQueryDetector.java b/src/core-common/src/main/java/org/apache/kylin/query/util/SlowQueryDetector.java index 387399d161..a0b1394265 100644 --- a/src/core-common/src/main/java/org/apache/kylin/query/util/SlowQueryDetector.java +++ b/src/core-common/src/main/java/org/apache/kylin/query/util/SlowQueryDetector.java @@ -23,6 +23,8 @@ import java.util.concurrent.ConcurrentMap; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.QueryContext; +import org.apache.kylin.common.scheduler.EventBusFactory; +import org.apache.kylin.common.util.CliCommandExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,18 +77,41 @@ public class SlowQueryDetector extends Thread { } public void queryStart(String stopId) { - if (QueryContext.current().getQueryTagInfo().isAsyncQuery()) { - return; + runningQueries.put(currentThread(), new QueryEntry(System.currentTimeMillis(), currentThread(), + QueryContext.current().getQueryId(), QueryContext.current().getUserSQL(), stopId, false, + QueryContext.current().getQueryTagInfo().isAsyncQuery(), null, CancelFlag.getContextCancelFlag())); + } + + public void addJobIdForAsyncQueryJob(String jobId) { + QueryEntry queryEntry = runningQueries.get(currentThread()); + if (queryEntry != null) { + queryEntry.setJobId(jobId); } - runningQueries.put(currentThread(), - new QueryEntry(System.currentTimeMillis(), currentThread(), QueryContext.current().getQueryId(), - QueryContext.current().getUserSQL(), stopId, false, CancelFlag.getContextCancelFlag())); + } - public void queryEnd() { - if (QueryContext.current().getQueryTagInfo().isAsyncQuery()) { - return; + public void stopQuery(String id) { + for (SlowQueryDetector.QueryEntry e : SlowQueryDetector.getRunningQueries().values()) { + if ((e.isAsyncQuery() && id.equals(e.getQueryId())) || (!e.isAsyncQuery() && id.equals(e.getStopId()))) { + e.setStopByUser(true); + doStopQuery(e); + break; + } + } + } + + private void doStopQuery(QueryEntry e) { + if (e.getJobId() == null) { + e.getPlannerCancelFlag().requestCancel(); + logger.error("Trying to cancel query: {}", e.getThread().getName()); + e.getThread().interrupt(); + } else { + logger.error("Trying to cancel query job : {},{}", e.getThread().getName(), e.getJobId()); + EventBusFactory.getInstance().postSync(new CliCommandExecutor.JobKilled(e.getJobId())); } + } + + public void queryEnd() { QueryEntry entry = runningQueries.remove(currentThread()); if (null != entry && null != canceledSlowQueriesStatus.get(entry.queryId)) { canceledSlowQueriesStatus.remove(entry.queryId); @@ -113,9 +138,7 @@ public class SlowQueryDetector extends Thread { // interrupt query thread if Stop By User but running for (QueryEntry e : runningQueries.values()) { if (e.isStopByUser) { - e.getPlannerCancelFlag().requestCancel(); - e.getThread().interrupt(); - logger.error("Trying to cancel query: {}", e.getThread().getName()); + doStopQuery(e); } } } @@ -164,6 +187,8 @@ public class SlowQueryDetector extends Thread { final String sql; final String stopId; boolean isStopByUser; + final boolean isAsyncQuery; + String jobId; final CancelFlag plannerCancelFlag; public long getRunningTime() { @@ -171,6 +196,9 @@ public class SlowQueryDetector extends Thread { } private boolean setInterruptIfTimeout() { + if (isAsyncQuery) { + return false; + } long runningMs = System.currentTimeMillis() - startTime; if (runningMs >= queryTimeoutMs) { plannerCancelFlag.requestCancel(); diff --git a/src/kylin-it/src/test/java/org/apache/kylin/event/SchemaChangeTest.java b/src/kylin-it/src/test/java/org/apache/kylin/event/SchemaChangeTest.java index 00b58e7418..c3e3e36b99 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/event/SchemaChangeTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/event/SchemaChangeTest.java @@ -231,21 +231,21 @@ public class SchemaChangeTest extends AbstractMVCIntegrationTestCase { } @Test - public void testAddColumn() throws Exception { + public void testAddColumn() throws Throwable { addColumn(TABLE_IDENTITY, new ColumnDesc("", "tmp1", "bigint", "", "", "", null)); tableService.reloadTable(getProject(), TABLE_IDENTITY, false, -1, true); assertSqls(); } @Test - public void testRemoveColumn() throws Exception { + public void testRemoveColumn() throws Throwable { removeColumn(TABLE_IDENTITY, "SRC_ID"); tableService.reloadTable(getProject(), TABLE_IDENTITY, false, -1, true); assertSqls(); } @Test - public void testChangeColumnType() throws Exception { + public void testChangeColumnType() throws Throwable { changeColumns(TABLE_IDENTITY, Sets.newHashSet("SRC_ID"), columnDesc -> columnDesc.setDatatype("string")); tableService.reloadTable(getProject(), TABLE_IDENTITY, false, -1, true); assertSqls(); @@ -265,7 +265,7 @@ public class SchemaChangeTest extends AbstractMVCIntegrationTestCase { Assert.assertEquals(0, pair.getSecond().size()); } - private void assertSqls() throws Exception { + private void assertSqls() throws Throwable { for (Pair<String, Boolean> pair : Arrays.asList(Pair.newPair(SQL_LOOKUP, false), Pair.newPair(SQL_DERIVED, false), Pair.newPair(SQL_LOOKUP2, true), Pair.newPair(SQL_DERIVED2, true))) { val req = new SQLRequest(); diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/SlowQueryDetectorTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/SlowQueryDetectorTest.java index 15301252ab..c863cd2632 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/newten/SlowQueryDetectorTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/SlowQueryDetectorTest.java @@ -21,7 +21,9 @@ package org.apache.kylin.newten; import static org.awaitility.Awaitility.await; import java.io.File; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.exception.ExceptionUtils; @@ -98,6 +100,139 @@ public class SlowQueryDetectorTest extends NLocalWithSparkSessionTest { slowQueryDetector.queryEnd(); } + @Test + public void testStopQuery() throws Exception { + AtomicReference<InterruptedException> exception = new AtomicReference<>(); + Semaphore semaphore = new Semaphore(1); + semaphore.acquire(); + Thread query = new Thread(() -> { + slowQueryDetector.queryStart("stopId"); + try { + semaphore.acquire(); + Assert.fail(); + } catch (InterruptedException e) { + exception.set(e); + } finally { + slowQueryDetector.queryEnd(); + + } + + }); + query.start(); + await().atMost(2, TimeUnit.SECONDS).until(() -> slowQueryDetector.getRunningQueries().size() > 0); + slowQueryDetector.stopQuery("stopId"); + semaphore.release(); + query.join(); + assert exception.get() != null; + + } + + @Test + public void testStopQueryWrong() throws Exception { + AtomicReference<InterruptedException> exception = new AtomicReference<>(); + Semaphore semaphore = new Semaphore(1); + semaphore.acquire(); + Thread query = new Thread(() -> { + slowQueryDetector.queryStart("stopId"); + try { + semaphore.acquire(); + } catch (InterruptedException e) { + exception.set(e); + } finally { + slowQueryDetector.queryEnd(); + } + + }); + query.start(); + await().atMost(2, TimeUnit.SECONDS).until(() -> slowQueryDetector.getRunningQueries().size() > 0); + slowQueryDetector.stopQuery("stopId-wrong"); + semaphore.release(); + query.join(); + Assert.assertEquals(null, exception.get()); + + } + + @Test + public void testStopAsyncQuery() throws Exception { + AtomicReference<InterruptedException> exception = new AtomicReference<>(); + Semaphore semaphore = new Semaphore(1); + semaphore.acquire(); + Thread asyncQuery = new Thread(() -> { + try { + QueryContext.current().getQueryTagInfo().setAsyncQuery(true); + QueryContext.current().setQueryId("queryId-async"); + slowQueryDetector.queryStart(""); + semaphore.acquire(); + Assert.fail(); + } catch (InterruptedException e) { + exception.set(e); + } finally { + slowQueryDetector.queryEnd(); + } + + }); + asyncQuery.start(); + await().atMost(2, TimeUnit.SECONDS).until(() -> slowQueryDetector.getRunningQueries().size() > 0); + slowQueryDetector.stopQuery("queryId-async"); + semaphore.release(); + asyncQuery.join(); + assert exception.get() != null; + + } + + @Test + public void testStopAsyncQueryWrong() throws Exception { + AtomicReference<InterruptedException> exception = new AtomicReference<>(); + Semaphore semaphore = new Semaphore(1); + semaphore.acquire(); + Thread asyncQuery = new Thread(() -> { + try { + QueryContext.current().getQueryTagInfo().setAsyncQuery(true); + QueryContext.current().setQueryId("queryId"); + slowQueryDetector.queryStart(""); + semaphore.acquire(); + } catch (InterruptedException e) { + exception.set(e); + + } + slowQueryDetector.queryEnd(); + }); + asyncQuery.start(); + await().atMost(2, TimeUnit.SECONDS).until(() -> slowQueryDetector.getRunningQueries().size() > 0); + slowQueryDetector.stopQuery("queryId-error"); + semaphore.release(); + asyncQuery.join(); + Assert.assertEquals(null, exception.get()); + + } + + @Test + public void testStopAsyncQueryJob() throws Exception { + AtomicReference<Exception> exception = new AtomicReference<>(); + Semaphore semaphore = new Semaphore(1); + semaphore.acquire(); + Thread asyncQuery = new Thread(() -> { + try { + QueryContext.current().getQueryTagInfo().setAsyncQuery(true); + QueryContext.current().setQueryId("queryId"); + slowQueryDetector.addJobIdForAsyncQueryJob("jobId2"); + slowQueryDetector.queryStart(""); + slowQueryDetector.addJobIdForAsyncQueryJob("jobId"); + semaphore.acquire(); + } catch (InterruptedException e) { + exception.set(e); + } + slowQueryDetector.queryEnd(); + }); + asyncQuery.start(); + await().atMost(2, TimeUnit.SECONDS).until(() -> slowQueryDetector.getRunningQueries().size() > 0); + slowQueryDetector.stopQuery("queryId"); + semaphore.release(); + asyncQuery.join(); + Assert.assertEquals(null, exception.get()); + + } + @Test public void testSparderTimeoutCancelJob() throws Exception { val df = SparderEnv.getSparkSession().emptyDataFrame(); diff --git a/src/kylin-it/src/test/java/org/apache/kylin/rest/controller/NBuildAndQueryMetricsTest.java b/src/kylin-it/src/test/java/org/apache/kylin/rest/controller/NBuildAndQueryMetricsTest.java index 3af6d6773a..74ed6491e0 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/rest/controller/NBuildAndQueryMetricsTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/rest/controller/NBuildAndQueryMetricsTest.java @@ -249,31 +249,31 @@ public class NBuildAndQueryMetricsTest extends AbstractMVCIntegrationTestCase { } @Test - public void testMetricsScanForPushDown() throws Exception { + public void testMetricsScanForPushDown() throws Throwable { String sql = "select account_id from test_account limit 30"; assertMetric(sql, 30); } @Test - public void testMetricsScanForTableIndex() throws Exception { + public void testMetricsScanForTableIndex() throws Throwable { String sql = "select count(distinct case when trans_id > 100 then order_id else 0 end)," + "sum(case when trans_id > 100 then price else 0 end), price from test_kylin_fact group by price limit 20"; assertMetric(sql, 10000); } @Test - public void testMetricsScanForTableIndex2() throws Exception { + public void testMetricsScanForTableIndex2() throws Throwable { String sql = "select trans_id from test_kylin_fact limit 20"; assertMetric(sql, 4096); } @Test - public void testMetricsScanForAggIndex() throws Exception { + public void testMetricsScanForAggIndex() throws Throwable { String sql = "select trans_id from test_kylin_fact group by trans_id limit 20"; assertMetric(sql, 10000); } - private void assertMetric(String sql, long scanRowsExpect) throws Exception { + private void assertMetric(String sql, long scanRowsExpect) throws Throwable { val req = new SQLRequest(); req.setSql(sql); req.setProject(getProject()); diff --git a/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java b/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java index 99a3f4ee7a..5616116f6e 100644 --- a/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java +++ b/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java @@ -22,7 +22,9 @@ import static org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLI import static org.apache.kylin.common.exception.ServerErrorCode.ACCESS_DENIED; import static org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_INCLUDE_HEADER_NOT_EMPTY; import static org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_PROJECT_NAME_EMPTY; +import static org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_RESULT_NOT_FOUND; import static org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_TIME_FORMAT_ERROR; +import static org.apache.kylin.rest.service.AsyncQueryService.QueryStatus.RUNNING; import java.io.IOException; import java.text.ParseException; @@ -43,6 +45,8 @@ import org.apache.kylin.common.exception.KylinException; import org.apache.kylin.common.exception.QueryErrorCode; import org.apache.kylin.common.msg.Message; import org.apache.kylin.common.msg.MsgPicker; +import org.apache.kylin.common.persistence.transaction.StopQueryBroadcastEventNotifier; +import org.apache.kylin.common.scheduler.EventBusFactory; import org.apache.kylin.metadata.project.NProjectManager; import org.apache.kylin.query.util.AsyncQueryUtil; import org.apache.kylin.rest.exception.ForbiddenException; @@ -245,6 +249,33 @@ public class NAsyncQueryController extends NBasicController { return new EnvelopeResponse<>(KylinException.CODE_SUCCESS, false, MsgPicker.getMsg().getCleanFolderFail()); } + @ApiOperation(value = "stopQuery", tags = { "QE" }) + @DeleteMapping(value = "/async_query/stop/{query_id:.+}") + @ResponseBody + public EnvelopeResponse<String> stopAsyncQuery(@PathVariable("query_id") String queryId, + @RequestParam(value = "project", required = false) String project) throws IOException { + if (project == null) { + throw new KylinException(ASYNC_QUERY_PROJECT_NAME_EMPTY); + } + aclEvaluate.checkProjectAdminPermission(project); + checkProjectName(project); + if (!asyncQueryService.hasPermission(queryId, project)) { + return new EnvelopeResponse<>(KylinException.CODE_UNAUTHORIZED, "Access denied.", + "Access denied. Only admin users can stop the query"); + } + AsyncQueryService.QueryStatus queryStatus = asyncQueryService.queryStatus(project, queryId); + if (queryStatus == AsyncQueryService.QueryStatus.MISS) { + throw new KylinException(ASYNC_QUERY_RESULT_NOT_FOUND); + } + if (queryStatus != RUNNING) { + return new EnvelopeResponse<>(KylinException.CODE_UNDEFINED, "Query is not running", + "Query is not running. please check"); + } + queryService.stopQuery(queryId); + EventBusFactory.getInstance().postAsync(new StopQueryBroadcastEventNotifier(queryId)); + return new EnvelopeResponse<>(KylinException.CODE_SUCCESS, "", ""); + } + @ApiOperation(value = "query", tags = { "QE" }, notes = "Update Response: query_id") @GetMapping(value = "/async_query/{query_id:.+}/status") @ResponseBody diff --git a/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerTest.java b/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerTest.java index 07bd68bd9c..b26a655067 100644 --- a/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerTest.java +++ b/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerTest.java @@ -169,6 +169,73 @@ public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase { Mockito.verify(nAsyncQueryController).deleteByQueryId(Mockito.anyString(), Mockito.any(), Mockito.any()); } + @Test + public void testStopByQueryId() throws Exception { + String queryId = "123XXX"; + + mockMvc.perform(MockMvcRequestBuilders.delete("/api/async_query/stop/{query_id}", "123") + .contentType(MediaType.APPLICATION_JSON) + .content(JsonUtil.writeValueAsString(mockAsyncQuerySQLRequest())) + .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON))) + .andExpect(MockMvcResultMatchers.status().isInternalServerError()); + + Mockito.doThrow(new KylinException(ACCESS_DENIED, "Access is denied")).when(aclEvaluate) + .checkProjectAdminPermission(PROJECT); + + mockMvc.perform(MockMvcRequestBuilders.delete("/api/async_query/stop/{query_id}?project=default", queryId) + .contentType(MediaType.APPLICATION_JSON) + .content(JsonUtil.writeValueAsString(mockAsyncQuerySQLRequest())) + .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON))) + .andExpect(MockMvcResultMatchers.status().isInternalServerError()); + + Mockito.doNothing().when(aclEvaluate).checkProjectAdminPermission(PROJECT); + + Mockito.doReturn(false).when(asyncQueryService).hasPermission(queryId, PROJECT); + mockMvc.perform(MockMvcRequestBuilders.delete("/api/async_query/stop/{query_id}?project=default", queryId) + .contentType(MediaType.APPLICATION_JSON) + .content(JsonUtil.writeValueAsString(mockAsyncQuerySQLRequest())) + .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON))) + .andExpect(MockMvcResultMatchers.status().isOk()).andExpect(result -> { + result.getResponse().getContentAsString() + .contains("Access denied. Only admin users can stop the query"); + + }); + + Mockito.doReturn(true).when(asyncQueryService).hasPermission(queryId, PROJECT); + Mockito.doReturn(MISS).when(asyncQueryService).queryStatus(PROJECT, queryId); + mockMvc.perform(MockMvcRequestBuilders.delete("/api/async_query/stop/{query_id}?project=default", queryId) + .contentType(MediaType.APPLICATION_JSON) + .content(JsonUtil.writeValueAsString(mockAsyncQuerySQLRequest())) + .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON))) + .andExpect(MockMvcResultMatchers.status().isInternalServerError()).andExpect(result -> { + result.getResponse().getContentAsString() + .contains("Can’t find the query by this query ID in this project"); + + }); + + Mockito.doReturn(SUCCESS).when(asyncQueryService).queryStatus(PROJECT, queryId); + mockMvc.perform(MockMvcRequestBuilders.delete("/api/async_query/stop/{query_id}?project=default", queryId) + .contentType(MediaType.APPLICATION_JSON) + .content(JsonUtil.writeValueAsString(mockAsyncQuerySQLRequest())) + .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON))) + .andExpect(MockMvcResultMatchers.status().isOk()).andExpect(result -> { + result.getResponse().getContentAsString().contains("Query is not running"); + + }); + + Mockito.doReturn(RUNNING).when(asyncQueryService).queryStatus(PROJECT, queryId); + mockMvc.perform(MockMvcRequestBuilders.delete("/api/async_query/stop/{query_id}?project=default", queryId) + .contentType(MediaType.APPLICATION_JSON) + .content(JsonUtil.writeValueAsString(mockAsyncQuerySQLRequest())) + .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON))) + .andExpect(MockMvcResultMatchers.status().isOk()).andExpect(result -> { + result.getResponse().getContentAsString().contains("000"); + + }); + Mockito.verify(kapQueryService).stopQuery(queryId); + + } + @Test public void testQueryStatusNoProjectPermission() throws Exception { Mockito.doThrow(new KylinException(ACCESS_DENIED, "Access is denied")).when(aclEvaluate) diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java index 14eacb5d43..c22b2ee0d9 100644 --- a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -80,6 +80,7 @@ import org.apache.kylin.common.util.AddressUtil; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.common.util.Pair; import org.apache.kylin.common.util.SetThreadName; +import org.apache.kylin.job.execution.ExecuteResult; import org.apache.kylin.metadata.MetadataConstants; import org.apache.kylin.metadata.acl.AclTCR; import org.apache.kylin.metadata.acl.AclTCRManager; @@ -260,7 +261,7 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup } } - public SQLResponse query(SQLRequest sqlRequest) throws Exception { + public SQLResponse query(SQLRequest sqlRequest) throws Throwable { try { slowQueryDetector.queryStart(sqlRequest.getStopId()); markHighPriorityQueryIfNeeded(); @@ -305,7 +306,12 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup } AsyncQueryJob asyncQueryJob = new AsyncQueryJob(); asyncQueryJob.setProject(queryParams.getProject()); - asyncQueryJob.submit(queryParams); + slowQueryDetector.addJobIdForAsyncQueryJob(asyncQueryJob.getId()); + ExecuteResult result = asyncQueryJob.submit(queryParams); + if (!result.succeed()) { + throw result.getThrowable(); + + } return buildSqlResponse(false, Collections.emptyList(), 0, Lists.newArrayList(), sqlRequest.getProject()); } @@ -332,15 +338,7 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup } public void stopQuery(String id) { - for (SlowQueryDetector.QueryEntry e : SlowQueryDetector.getRunningQueries().values()) { - if (e.getStopId().equals(id)) { - logger.error("Trying to cancel query: {}", e.getThread().getName()); - e.setStopByUser(true); - e.getPlannerCancelFlag().requestCancel(); - e.getThread().interrupt(); - break; - } - } + slowQueryDetector.stopQuery(id); } private void markHighPriorityQueryIfNeeded() { diff --git a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java index 2470fbecf4..fab3a328f4 100644 --- a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java +++ b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java @@ -682,7 +682,7 @@ public class QueryServiceTest extends NLocalFileMetadataTestCase { } @Test - public void testQueryWithTimeOutException() throws Exception { + public void testQueryWithTimeOutException() throws Throwable { final String sql = "select * from exception_table"; final String project = "newten"; @@ -2004,7 +2004,7 @@ public class QueryServiceTest extends NLocalFileMetadataTestCase { } @Test - public void testTableauIntercept() throws Exception { + public void testTableauIntercept() throws Throwable { List<String> sqlList = Files.walk(Paths.get("./src/test/resources/query/tableau_probing")) .filter(file -> Files.isRegularFile(file)).map(path -> { try {