This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 552f8f8fd40ab7bc23e568136ef50fcaaff8a7b3 Author: Jiawei Li <1019037...@qq.com> AuthorDate: Tue Jan 17 13:55:25 2023 +0800 KYLIN-5495 add async query count limit --- .../org/apache/kylin/common/KylinConfigBase.java | 5 ++ .../kylin/common/exception/QueryErrorCode.java | 1 + .../org/apache/kylin/common/msg/CnMessage.java | 8 ++- .../java/org/apache/kylin/common/msg/Message.java | 4 ++ .../resources/kylin_errorcode_conf_en.properties | 1 + .../resources/kylin_errorcode_conf_zh.properties | 1 + .../rest/controller/NAsyncQueryController.java | 63 ++++++++++-------- .../rest/controller/NAsyncQueryControllerTest.java | 74 +++++++++++++++++++++ .../apache/kylin/rest/service/QueryService.java | 13 ++-- .../kylin/rest/util/AsyncQueryRequestLimits.java | 68 +++++++++++++++++++ .../rest/util/AsyncQueryRequestLimitsTest.java | 77 ++++++++++++++++++++++ 11 files changed, 279 insertions(+), 36 deletions(-) diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index d76736609f..b1f8541092 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -501,6 +501,11 @@ public abstract class KylinConfigBase implements Serializable { return Integer.parseInt(getOptional("kylin.query.project-concurrent-running-threshold", "0")); } + public int getAsyncQueryMaxConcurrentJobs() { + // by default there's no limitation + return Integer.parseInt(getOptional("kylin.query.async-query.max-concurrent-jobs", "0")); + } + public boolean isAdminUserExportAllowed() { return Boolean.parseBoolean(getOptional("kylin.web.export-allow-admin", TRUE)); } diff --git a/src/core-common/src/main/java/org/apache/kylin/common/exception/QueryErrorCode.java b/src/core-common/src/main/java/org/apache/kylin/common/exception/QueryErrorCode.java index 56fe23380a..96e305316c 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/exception/QueryErrorCode.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/exception/QueryErrorCode.java @@ -52,6 +52,7 @@ public enum QueryErrorCode implements ErrorCodeSupplier { // 20040XXX async query ASYNC_QUERY_ILLEGAL_PARAM("KE-020040001"), + TOO_MANY_ASYNC_QUERY("KE-020040002"), // 20050XXX invalid query params INVALID_QUERY_PARAMS("KE-020050001"), diff --git a/src/core-common/src/main/java/org/apache/kylin/common/msg/CnMessage.java b/src/core-common/src/main/java/org/apache/kylin/common/msg/CnMessage.java index 8403f70fa5..679e15176f 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/msg/CnMessage.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/msg/CnMessage.java @@ -785,6 +785,11 @@ public class CnMessage extends Message { return "查询请求数量超过上限,无法提交。请稍后再试,或联系项目管理员修改设置。"; } + @Override + public String getAsyncQueryTooManyRunning() { + return "查询失败,异步查询总数已达到管理员设置的上限,请等候并重试。"; + } + @Override public String getSelfDisableForbidden() { return "您不可以禁用您自己"; @@ -1692,7 +1697,6 @@ public class CnMessage extends Message { @Override public String getLoadLogicalViewError(String tableName, String project) { - return String.format(Locale.ROOT, - "无法加载表: %s , 仅支持在项目 %s 中加载此表", tableName, project); + return String.format(Locale.ROOT, "无法加载表: %s , 仅支持在项目 %s 中加载此表", tableName, project); } } diff --git a/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java b/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java index 98bd73637f..32ef6010b5 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java @@ -438,6 +438,10 @@ public class Message { return "Can’t submit query at the moment as there are too many ongoing queries. Please try again later, or contact project admin to adjust configuration."; } + public String getAsyncQueryTooManyRunning() { + return "Query failed cause the total number of async queries hit the upper limit set by Admin.Please wait and try again."; + } + public String getExportResultNotAllowed() { return "Don’t have permission to export the query result. Please contact admin if needed."; } diff --git a/src/core-common/src/main/resources/kylin_errorcode_conf_en.properties b/src/core-common/src/main/resources/kylin_errorcode_conf_en.properties index 8d649d7813..664c1b8d01 100644 --- a/src/core-common/src/main/resources/kylin_errorcode_conf_en.properties +++ b/src/core-common/src/main/resources/kylin_errorcode_conf_en.properties @@ -191,6 +191,7 @@ KE-020007001=Empty Table KE-020030001=Invalid Parameter When PushDown KE-020032001=Busy Query KE-020040001=Asynchronous query illegal parameters +KE-020040002=Busy Async Query #job KE-030001000=Job building error KE-030001001=Sanity Check diff --git a/src/core-common/src/main/resources/kylin_errorcode_conf_zh.properties b/src/core-common/src/main/resources/kylin_errorcode_conf_zh.properties index 1102001892..f5e12bc109 100644 --- a/src/core-common/src/main/resources/kylin_errorcode_conf_zh.properties +++ b/src/core-common/src/main/resources/kylin_errorcode_conf_zh.properties @@ -191,6 +191,7 @@ KE-020007001=空表 KE-020030001=下压时具有非法参数 KE-020032001=查询繁忙 KE-020040001=异步查询非法参数 +KE-020040002=异步查询繁忙 #任务 KE-030001000=构建任务异常 KE-030001001=数据检查 diff --git a/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java b/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java index 3be9d9d753..8db117d415 100644 --- a/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java +++ b/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java @@ -43,6 +43,7 @@ import org.apache.kylin.common.exception.KylinException; import org.apache.kylin.common.exception.QueryErrorCode; import org.apache.kylin.common.msg.Message; import org.apache.kylin.common.msg.MsgPicker; +import org.apache.kylin.metadata.project.NProjectManager; import org.apache.kylin.query.util.AsyncQueryUtil; import org.apache.kylin.rest.exception.ForbiddenException; import org.apache.kylin.rest.request.AsyncQuerySQLRequest; @@ -52,6 +53,7 @@ import org.apache.kylin.rest.response.SQLResponse; import org.apache.kylin.rest.service.AsyncQueryService; import org.apache.kylin.rest.service.QueryService; import org.apache.kylin.rest.util.AclEvaluate; +import org.apache.kylin.rest.util.AsyncQueryRequestLimits; import org.apache.spark.sql.SparderEnv; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -123,6 +125,9 @@ public class NAsyncQueryController extends NBasicController { if (StringUtils.isEmpty(sqlRequest.getSeparator())) { sqlRequest.setSeparator(","); } + if (NProjectManager.getProjectConfig(sqlRequest.getProject()).isUniqueAsyncQueryYarnQueue()) { + AsyncQueryRequestLimits.checkCount(); + } executorService.submit(Objects.requireNonNull(TtlRunnable.get(() -> { String format = sqlRequest.getFormat().toLowerCase(Locale.ROOT); @@ -154,14 +159,14 @@ public class NAsyncQueryController extends NBasicController { } catch (Exception e) { try { logger.error("failed to run query {}", queryContext.getQueryId(), e); - AsyncQueryUtil.createErrorFlag(sqlRequest.getProject(), queryContext.getQueryId(), - e.getMessage()); + AsyncQueryUtil.createErrorFlag(sqlRequest.getProject(), queryContext.getQueryId(), e.getMessage()); exceptionHandle.set(e.getMessage()); } catch (Exception e1) { exceptionHandle.set(exceptionHandle.get() + "\n" + e.getMessage()); throw new RuntimeException(e1); } } finally { + logger.info("Async query with queryId: {} end", queryContext.getQueryId()); QueryContext.current().close(); } }))); @@ -171,18 +176,18 @@ public class NAsyncQueryController extends NBasicController { } switch (asyncQueryService.queryStatus(sqlRequest.getProject(), sqlRequest.getQueryId())) { - case SUCCESS: - return new EnvelopeResponse<>(KylinException.CODE_SUCCESS, - new AsyncQueryResponse(queryIdRef.get(), AsyncQueryResponse.Status.SUCCESSFUL, "query success"), - ""); - case FAILED: - return new EnvelopeResponse<>(KylinException.CODE_SUCCESS, - new AsyncQueryResponse(queryIdRef.get(), AsyncQueryResponse.Status.FAILED, exceptionHandle.get()), - ""); - default: - return new EnvelopeResponse<>(KylinException.CODE_SUCCESS, - new AsyncQueryResponse(queryIdRef.get(), AsyncQueryResponse.Status.RUNNING, "query still running"), - ""); + case SUCCESS: + return new EnvelopeResponse<>(KylinException.CODE_SUCCESS, + new AsyncQueryResponse(queryIdRef.get(), AsyncQueryResponse.Status.SUCCESSFUL, "query success"), + ""); + case FAILED: + return new EnvelopeResponse<>(KylinException.CODE_SUCCESS, + new AsyncQueryResponse(queryIdRef.get(), AsyncQueryResponse.Status.FAILED, exceptionHandle.get()), + ""); + default: + return new EnvelopeResponse<>(KylinException.CODE_SUCCESS, + new AsyncQueryResponse(queryIdRef.get(), AsyncQueryResponse.Status.RUNNING, "query still running"), + ""); } } @@ -258,21 +263,21 @@ public class NAsyncQueryController extends NBasicController { AsyncQueryService.QueryStatus queryStatus = asyncQueryService.queryStatus(project, queryId); AsyncQueryResponse asyncQueryResponse; switch (queryStatus) { - case SUCCESS: - asyncQueryResponse = new AsyncQueryResponse(queryId, AsyncQueryResponse.Status.SUCCESSFUL, - "await fetching results"); - break; - case RUNNING: - asyncQueryResponse = new AsyncQueryResponse(queryId, AsyncQueryResponse.Status.RUNNING, "still running"); - break; - case FAILED: - asyncQueryResponse = new AsyncQueryResponse(queryId, AsyncQueryResponse.Status.FAILED, - asyncQueryService.retrieveSavedQueryException(project, queryId)); - break; - default: - asyncQueryResponse = new AsyncQueryResponse(queryId, AsyncQueryResponse.Status.MISSING, - "query status is lost"); // - break; + case SUCCESS: + asyncQueryResponse = new AsyncQueryResponse(queryId, AsyncQueryResponse.Status.SUCCESSFUL, + "await fetching results"); + break; + case RUNNING: + asyncQueryResponse = new AsyncQueryResponse(queryId, AsyncQueryResponse.Status.RUNNING, "still running"); + break; + case FAILED: + asyncQueryResponse = new AsyncQueryResponse(queryId, AsyncQueryResponse.Status.FAILED, + asyncQueryService.retrieveSavedQueryException(project, queryId)); + break; + default: + asyncQueryResponse = new AsyncQueryResponse(queryId, AsyncQueryResponse.Status.MISSING, + "query status is lost"); // + break; } return new EnvelopeResponse<>(KylinException.CODE_SUCCESS, asyncQueryResponse, ""); diff --git a/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerTest.java b/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerTest.java index 65d74a56f0..07bd68bd9c 100644 --- a/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerTest.java +++ b/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerTest.java @@ -19,6 +19,7 @@ package org.apache.kylin.rest.controller; import static org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_JSON; +import static org.apache.kylin.common.exception.QueryErrorCode.TOO_MANY_ASYNC_QUERY; import static org.apache.kylin.common.exception.ServerErrorCode.ACCESS_DENIED; import static org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_PROJECT_NAME_EMPTY; import static org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_RESULT_NOT_FOUND; @@ -29,6 +30,8 @@ import static org.apache.kylin.rest.service.AsyncQueryService.QueryStatus.RUNNIN import static org.apache.kylin.rest.service.AsyncQueryService.QueryStatus.SUCCESS; import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; import java.text.ParseException; import org.apache.kylin.common.KylinConfig; @@ -45,6 +48,7 @@ import org.apache.kylin.rest.response.SQLResponse; import org.apache.kylin.rest.service.AsyncQueryService; import org.apache.kylin.rest.service.QueryService; import org.apache.kylin.rest.util.AclEvaluate; +import org.apache.kylin.rest.util.AsyncQueryRequestLimits; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -272,6 +276,76 @@ public class NAsyncQueryControllerTest extends NLocalFileMetadataTestCase { Mockito.verify(nAsyncQueryController).query(Mockito.any()); } + @Test + public void testQueryReachLimit() throws Exception { + Mockito.doReturn(SUCCESS).when(asyncQueryService).queryStatus(Mockito.anyString(), Mockito.anyString()); + SQLResponse response = new SQLResponse(); + response.setException(false); + Mockito.doReturn(response).when(kapQueryService).queryWithCache(Mockito.any()); + getTestConfig().setProperty("kylin.query.unique-async-query-yarn-queue-enabled", "true"); + getTestConfig().setProperty("kylin.query.async-query.max-concurrent-jobs", "3"); + reloadAsyncQueryRequestLimits(); + AsyncQueryRequestLimits limit1 = new AsyncQueryRequestLimits(); + AsyncQueryRequestLimits limit2 = new AsyncQueryRequestLimits(); + mockMvc.perform(MockMvcRequestBuilders.post("/api/async_query").contentType(MediaType.APPLICATION_JSON) + .content(JsonUtil.writeValueAsString(mockAsyncQuerySQLRequest())) + .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON))) + .andExpect(MockMvcResultMatchers.status().isOk()); + + Mockito.verify(nAsyncQueryController).query(Mockito.any()); + AsyncQueryRequestLimits limits3 = new AsyncQueryRequestLimits(); + mockMvc.perform(MockMvcRequestBuilders.post("/api/async_query").contentType(MediaType.APPLICATION_JSON) + .content(JsonUtil.writeValueAsString(mockAsyncQuerySQLRequest())) + .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON))) + .andExpect(MockMvcResultMatchers.status().is5xxServerError()).andExpect(result -> { + Assert.assertTrue(result.getResolvedException() instanceof KylinException); + Assert.assertEquals(TOO_MANY_ASYNC_QUERY.toErrorCode().getCodeString(), + ((KylinException) result.getResolvedException()).getErrorCode().getCodeString()); + Assert.assertEquals(MsgPicker.getMsg().getAsyncQueryTooManyRunning(), + result.getResolvedException().getMessage()); + }); + limits3.close(); + mockMvc.perform(MockMvcRequestBuilders.post("/api/async_query").contentType(MediaType.APPLICATION_JSON) + .content(JsonUtil.writeValueAsString(mockAsyncQuerySQLRequest())) + .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON))) + .andExpect(MockMvcResultMatchers.status().isOk()); + limit1.close(); + limit2.close(); + + } + + @Test + public void testQueryReachLimitCase2() throws Exception { + + Mockito.doReturn(SUCCESS).when(asyncQueryService).queryStatus(Mockito.anyString(), Mockito.anyString()); + SQLResponse response = new SQLResponse(); + response.setException(false); + Mockito.doReturn(response).when(kapQueryService).queryWithCache(Mockito.any()); + getTestConfig().setProperty("kylin.query.unique-async-query-yarn-queue-enabled", "true"); + getTestConfig().setProperty("kylin.query.async-query.max-concurrent-jobs", "0"); + reloadAsyncQueryRequestLimits(); + + AsyncQueryRequestLimits limit1 = new AsyncQueryRequestLimits(); + AsyncQueryRequestLimits limit2 = new AsyncQueryRequestLimits(); + mockMvc.perform(MockMvcRequestBuilders.post("/api/async_query").contentType(MediaType.APPLICATION_JSON) + .content(JsonUtil.writeValueAsString(mockAsyncQuerySQLRequest())) + .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON))) + .andExpect(MockMvcResultMatchers.status().isOk()); + limit1.close(); + limit2.close(); + + } + + private void reloadAsyncQueryRequestLimits() throws Exception { + Field field = AsyncQueryRequestLimits.class.getDeclaredField("MAX_COUNT"); + field.setAccessible(true); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL); + int count = KylinConfig.getInstanceFromEnv().getAsyncQueryMaxConcurrentJobs(); + field.setInt(null, count); + } + @Test public void testAsyncQueryContextClean() throws Exception { AsyncQuerySQLRequest asyncQuerySQLRequest1 = new AsyncQuerySQLRequest(); diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java index 04dad61a83..bd6c676374 100644 --- a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -142,6 +142,7 @@ import org.apache.kylin.rest.response.TableMetaCacheResultV2; import org.apache.kylin.rest.security.MutableAclRecord; import org.apache.kylin.rest.util.AclEvaluate; import org.apache.kylin.rest.util.AclPermissionUtil; +import org.apache.kylin.rest.util.AsyncQueryRequestLimits; import org.apache.kylin.rest.util.PrepareSQLUtils; import org.apache.kylin.rest.util.QueryCacheSignatureUtil; import org.apache.kylin.rest.util.QueryRequestLimits; @@ -303,11 +304,13 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup if (StringUtils.isNotEmpty(sqlRequest.getSparkQueue())) { queryParams.setSparkQueue(sqlRequest.getSparkQueue()); } - AsyncQueryJob asyncQueryJob = new AsyncQueryJob(); - asyncQueryJob.setProject(queryParams.getProject()); - asyncQueryJob.submit(queryParams); - return buildSqlResponse(false, Collections.emptyList(), 0, Lists.newArrayList(), - sqlRequest.getProject()); + try (AsyncQueryRequestLimits ignored = new AsyncQueryRequestLimits()) { + AsyncQueryJob asyncQueryJob = new AsyncQueryJob(); + asyncQueryJob.setProject(queryParams.getProject()); + asyncQueryJob.submit(queryParams); + return buildSqlResponse(false, Collections.emptyList(), 0, Lists.newArrayList(), + sqlRequest.getProject()); + } } SQLResponse fakeResponse = TableauInterceptor.tableauIntercept(queryParams.getSql()); diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/util/AsyncQueryRequestLimits.java b/src/query-service/src/main/java/org/apache/kylin/rest/util/AsyncQueryRequestLimits.java new file mode 100644 index 0000000000..ea3df6e9e4 --- /dev/null +++ b/src/query-service/src/main/java/org/apache/kylin/rest/util/AsyncQueryRequestLimits.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.rest.util; + +import static org.apache.kylin.common.exception.QueryErrorCode.TOO_MANY_ASYNC_QUERY; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.exception.KylinException; +import org.apache.kylin.common.msg.MsgPicker; + +public class AsyncQueryRequestLimits implements AutoCloseable { + private static volatile AtomicInteger asyncQueryCount = new AtomicInteger(0); + + private static final int MAX_COUNT = KylinConfig.getInstanceFromEnv().getAsyncQueryMaxConcurrentJobs(); + + private static void openAsyncQueryRequest() { + if (MAX_COUNT <= 0) { + return; + } + asyncQueryCount.incrementAndGet(); + + } + + public static void checkCount() { + if (MAX_COUNT <= 0) { + return; + } + if (asyncQueryCount.get() >= MAX_COUNT) { + throw new KylinException(TOO_MANY_ASYNC_QUERY, MsgPicker.getMsg().getAsyncQueryTooManyRunning()); + } + + } + + private static void closeAsyncQueryRequest() { + if (MAX_COUNT <= 0) { + return; + } + asyncQueryCount.decrementAndGet(); + + } + + public AsyncQueryRequestLimits() { + openAsyncQueryRequest(); + } + + @Override + public void close() { + closeAsyncQueryRequest(); + } +} diff --git a/src/query-service/src/test/java/org/apache/kylin/rest/util/AsyncQueryRequestLimitsTest.java b/src/query-service/src/test/java/org/apache/kylin/rest/util/AsyncQueryRequestLimitsTest.java new file mode 100644 index 0000000000..6c1d37dca8 --- /dev/null +++ b/src/query-service/src/test/java/org/apache/kylin/rest/util/AsyncQueryRequestLimitsTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.rest.util; + +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.exception.KylinException; +import org.apache.kylin.common.util.NLocalFileMetadataTestCase; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class AsyncQueryRequestLimitsTest extends NLocalFileMetadataTestCase { + + @Before + public void setUp() { + createTestMetadata(); + } + + @Test + public void testException() throws Exception { + getTestConfig().setProperty("kylin.query.async-query.max-concurrent-jobs", "3"); + reloadAsyncQueryRequestLimits(); + AsyncQueryRequestLimits limits1 = new AsyncQueryRequestLimits(); + AsyncQueryRequestLimits limits2 = new AsyncQueryRequestLimits(); + AsyncQueryRequestLimits limit3 = new AsyncQueryRequestLimits(); + Assert.assertThrows(KylinException.class, () -> AsyncQueryRequestLimits.checkCount()); + limit3.close(); + AsyncQueryRequestLimits.checkCount(); + limits1.close(); + limits2.close(); + + } + + @Test + public void testcase2() throws Exception { + getTestConfig().setProperty("kylin.query.async-query.max-concurrent-jobs", "0"); + reloadAsyncQueryRequestLimits(); + AsyncQueryRequestLimits limits1 = new AsyncQueryRequestLimits(); + AsyncQueryRequestLimits limits2 = new AsyncQueryRequestLimits(); + AsyncQueryRequestLimits limits3 = new AsyncQueryRequestLimits(); + AsyncQueryRequestLimits.checkCount(); + limits1.close(); + limits2.close(); + limits3.close(); + + } + + private void reloadAsyncQueryRequestLimits() throws Exception { + Field field = AsyncQueryRequestLimits.class.getDeclaredField("MAX_COUNT"); + field.setAccessible(true); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL); + int count = KylinConfig.getInstanceFromEnv().getAsyncQueryMaxConcurrentJobs(); + field.setInt(null, count); + } + +}