This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 552f8f8fd40ab7bc23e568136ef50fcaaff8a7b3
Author: Jiawei Li <1019037...@qq.com>
AuthorDate: Tue Jan 17 13:55:25 2023 +0800

    KYLIN-5495 add async query count limit
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  5 ++
 .../kylin/common/exception/QueryErrorCode.java     |  1 +
 .../org/apache/kylin/common/msg/CnMessage.java     |  8 ++-
 .../java/org/apache/kylin/common/msg/Message.java  |  4 ++
 .../resources/kylin_errorcode_conf_en.properties   |  1 +
 .../resources/kylin_errorcode_conf_zh.properties   |  1 +
 .../rest/controller/NAsyncQueryController.java     | 63 ++++++++++--------
 .../rest/controller/NAsyncQueryControllerTest.java | 74 +++++++++++++++++++++
 .../apache/kylin/rest/service/QueryService.java    | 13 ++--
 .../kylin/rest/util/AsyncQueryRequestLimits.java   | 68 +++++++++++++++++++
 .../rest/util/AsyncQueryRequestLimitsTest.java     | 77 ++++++++++++++++++++++
 11 files changed, 279 insertions(+), 36 deletions(-)

diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index d76736609f..b1f8541092 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -501,6 +501,11 @@ public abstract class KylinConfigBase implements 
Serializable {
         return 
Integer.parseInt(getOptional("kylin.query.project-concurrent-running-threshold",
 "0"));
     }
 
+    public int getAsyncQueryMaxConcurrentJobs() {
+        // by default there's no limitation
+        return 
Integer.parseInt(getOptional("kylin.query.async-query.max-concurrent-jobs", 
"0"));
+    }
+
     public boolean isAdminUserExportAllowed() {
         return 
Boolean.parseBoolean(getOptional("kylin.web.export-allow-admin", TRUE));
     }
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/exception/QueryErrorCode.java
 
b/src/core-common/src/main/java/org/apache/kylin/common/exception/QueryErrorCode.java
index 56fe23380a..96e305316c 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/common/exception/QueryErrorCode.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/exception/QueryErrorCode.java
@@ -52,6 +52,7 @@ public enum QueryErrorCode implements ErrorCodeSupplier {
 
     // 20040XXX async query
     ASYNC_QUERY_ILLEGAL_PARAM("KE-020040001"),
+    TOO_MANY_ASYNC_QUERY("KE-020040002"),
 
     // 20050XXX invalid query params
     INVALID_QUERY_PARAMS("KE-020050001"),
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/msg/CnMessage.java 
b/src/core-common/src/main/java/org/apache/kylin/common/msg/CnMessage.java
index 8403f70fa5..679e15176f 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/msg/CnMessage.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/msg/CnMessage.java
@@ -785,6 +785,11 @@ public class CnMessage extends Message {
         return "查询请求数量超过上限,无法提交。请稍后再试,或联系项目管理员修改设置。";
     }
 
+    @Override
+    public String getAsyncQueryTooManyRunning() {
+        return "查询失败,异步查询总数已达到管理员设置的上限,请等候并重试。";
+    }
+
     @Override
     public String getSelfDisableForbidden() {
         return "您不可以禁用您自己";
@@ -1692,7 +1697,6 @@ public class CnMessage extends Message {
 
     @Override
     public String getLoadLogicalViewError(String tableName, String project) {
-        return String.format(Locale.ROOT,
-            "无法加载表: %s , 仅支持在项目 %s 中加载此表", tableName, project);
+        return String.format(Locale.ROOT, "无法加载表: %s , 仅支持在项目 %s 中加载此表", 
tableName, project);
     }
 }
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java 
b/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java
index 98bd73637f..32ef6010b5 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java
@@ -438,6 +438,10 @@ public class Message {
         return "Can’t submit query at the moment as there are too many ongoing 
queries. Please try again later, or contact project admin to adjust 
configuration.";
     }
 
+    public String getAsyncQueryTooManyRunning() {
+        return "Query failed cause the total number of async queries hit the 
upper limit set by Admin.Please wait and try again.";
+    }
+
     public String getExportResultNotAllowed() {
         return "Don’t have permission to export the query result. Please 
contact admin if needed.";
     }
diff --git 
a/src/core-common/src/main/resources/kylin_errorcode_conf_en.properties 
b/src/core-common/src/main/resources/kylin_errorcode_conf_en.properties
index 8d649d7813..664c1b8d01 100644
--- a/src/core-common/src/main/resources/kylin_errorcode_conf_en.properties
+++ b/src/core-common/src/main/resources/kylin_errorcode_conf_en.properties
@@ -191,6 +191,7 @@ KE-020007001=Empty Table
 KE-020030001=Invalid Parameter When PushDown
 KE-020032001=Busy Query
 KE-020040001=Asynchronous query illegal parameters
+KE-020040002=Busy Async Query
 #job
 KE-030001000=Job building error
 KE-030001001=Sanity Check
diff --git 
a/src/core-common/src/main/resources/kylin_errorcode_conf_zh.properties 
b/src/core-common/src/main/resources/kylin_errorcode_conf_zh.properties
index 1102001892..f5e12bc109 100644
--- a/src/core-common/src/main/resources/kylin_errorcode_conf_zh.properties
+++ b/src/core-common/src/main/resources/kylin_errorcode_conf_zh.properties
@@ -191,6 +191,7 @@ KE-020007001=空表
 KE-020030001=下压时具有非法参数
 KE-020032001=查询繁忙
 KE-020040001=异步查询非法参数
+KE-020040002=异步查询繁忙
 #任务
 KE-030001000=构建任务异常
 KE-030001001=数据检查
diff --git 
a/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java
 
b/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java
index 3be9d9d753..8db117d415 100644
--- 
a/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java
+++ 
b/src/query-server/src/main/java/org/apache/kylin/rest/controller/NAsyncQueryController.java
@@ -43,6 +43,7 @@ import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.common.exception.QueryErrorCode;
 import org.apache.kylin.common.msg.Message;
 import org.apache.kylin.common.msg.MsgPicker;
+import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.query.util.AsyncQueryUtil;
 import org.apache.kylin.rest.exception.ForbiddenException;
 import org.apache.kylin.rest.request.AsyncQuerySQLRequest;
@@ -52,6 +53,7 @@ import org.apache.kylin.rest.response.SQLResponse;
 import org.apache.kylin.rest.service.AsyncQueryService;
 import org.apache.kylin.rest.service.QueryService;
 import org.apache.kylin.rest.util.AclEvaluate;
+import org.apache.kylin.rest.util.AsyncQueryRequestLimits;
 import org.apache.spark.sql.SparderEnv;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -123,6 +125,9 @@ public class NAsyncQueryController extends NBasicController 
{
         if (StringUtils.isEmpty(sqlRequest.getSeparator())) {
             sqlRequest.setSeparator(",");
         }
+        if 
(NProjectManager.getProjectConfig(sqlRequest.getProject()).isUniqueAsyncQueryYarnQueue())
 {
+            AsyncQueryRequestLimits.checkCount();
+        }
 
         executorService.submit(Objects.requireNonNull(TtlRunnable.get(() -> {
             String format = sqlRequest.getFormat().toLowerCase(Locale.ROOT);
@@ -154,14 +159,14 @@ public class NAsyncQueryController extends 
NBasicController {
             } catch (Exception e) {
                 try {
                     logger.error("failed to run query {}", 
queryContext.getQueryId(), e);
-                    AsyncQueryUtil.createErrorFlag(sqlRequest.getProject(), 
queryContext.getQueryId(),
-                            e.getMessage());
+                    AsyncQueryUtil.createErrorFlag(sqlRequest.getProject(), 
queryContext.getQueryId(), e.getMessage());
                     exceptionHandle.set(e.getMessage());
                 } catch (Exception e1) {
                     exceptionHandle.set(exceptionHandle.get() + "\n" + 
e.getMessage());
                     throw new RuntimeException(e1);
                 }
             } finally {
+                logger.info("Async query with queryId: {} end", 
queryContext.getQueryId());
                 QueryContext.current().close();
             }
         })));
@@ -171,18 +176,18 @@ public class NAsyncQueryController extends 
NBasicController {
         }
 
         switch (asyncQueryService.queryStatus(sqlRequest.getProject(), 
sqlRequest.getQueryId())) {
-            case SUCCESS:
-                return new EnvelopeResponse<>(KylinException.CODE_SUCCESS,
-                        new AsyncQueryResponse(queryIdRef.get(), 
AsyncQueryResponse.Status.SUCCESSFUL, "query success"),
-                        "");
-            case FAILED:
-                return new EnvelopeResponse<>(KylinException.CODE_SUCCESS,
-                        new AsyncQueryResponse(queryIdRef.get(), 
AsyncQueryResponse.Status.FAILED, exceptionHandle.get()),
-                        "");
-            default:
-                return new EnvelopeResponse<>(KylinException.CODE_SUCCESS,
-                        new AsyncQueryResponse(queryIdRef.get(), 
AsyncQueryResponse.Status.RUNNING, "query still running"),
-                        "");
+        case SUCCESS:
+            return new EnvelopeResponse<>(KylinException.CODE_SUCCESS,
+                    new AsyncQueryResponse(queryIdRef.get(), 
AsyncQueryResponse.Status.SUCCESSFUL, "query success"),
+                    "");
+        case FAILED:
+            return new EnvelopeResponse<>(KylinException.CODE_SUCCESS,
+                    new AsyncQueryResponse(queryIdRef.get(), 
AsyncQueryResponse.Status.FAILED, exceptionHandle.get()),
+                    "");
+        default:
+            return new EnvelopeResponse<>(KylinException.CODE_SUCCESS,
+                    new AsyncQueryResponse(queryIdRef.get(), 
AsyncQueryResponse.Status.RUNNING, "query still running"),
+                    "");
         }
     }
 
@@ -258,21 +263,21 @@ public class NAsyncQueryController extends 
NBasicController {
         AsyncQueryService.QueryStatus queryStatus = 
asyncQueryService.queryStatus(project, queryId);
         AsyncQueryResponse asyncQueryResponse;
         switch (queryStatus) {
-            case SUCCESS:
-                asyncQueryResponse = new AsyncQueryResponse(queryId, 
AsyncQueryResponse.Status.SUCCESSFUL,
-                        "await fetching results");
-                break;
-            case RUNNING:
-                asyncQueryResponse = new AsyncQueryResponse(queryId, 
AsyncQueryResponse.Status.RUNNING, "still running");
-                break;
-            case FAILED:
-                asyncQueryResponse = new AsyncQueryResponse(queryId, 
AsyncQueryResponse.Status.FAILED,
-                        asyncQueryService.retrieveSavedQueryException(project, 
queryId));
-                break;
-            default:
-                asyncQueryResponse = new AsyncQueryResponse(queryId, 
AsyncQueryResponse.Status.MISSING,
-                        "query status is lost"); //
-                break;
+        case SUCCESS:
+            asyncQueryResponse = new AsyncQueryResponse(queryId, 
AsyncQueryResponse.Status.SUCCESSFUL,
+                    "await fetching results");
+            break;
+        case RUNNING:
+            asyncQueryResponse = new AsyncQueryResponse(queryId, 
AsyncQueryResponse.Status.RUNNING, "still running");
+            break;
+        case FAILED:
+            asyncQueryResponse = new AsyncQueryResponse(queryId, 
AsyncQueryResponse.Status.FAILED,
+                    asyncQueryService.retrieveSavedQueryException(project, 
queryId));
+            break;
+        default:
+            asyncQueryResponse = new AsyncQueryResponse(queryId, 
AsyncQueryResponse.Status.MISSING,
+                    "query status is lost"); //
+            break;
         }
 
         return new EnvelopeResponse<>(KylinException.CODE_SUCCESS, 
asyncQueryResponse, "");
diff --git 
a/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerTest.java
 
b/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerTest.java
index 65d74a56f0..07bd68bd9c 100644
--- 
a/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerTest.java
+++ 
b/src/query-server/src/test/java/org/apache/kylin/rest/controller/NAsyncQueryControllerTest.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.rest.controller;
 
 import static 
org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_JSON;
+import static 
org.apache.kylin.common.exception.QueryErrorCode.TOO_MANY_ASYNC_QUERY;
 import static org.apache.kylin.common.exception.ServerErrorCode.ACCESS_DENIED;
 import static 
org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_PROJECT_NAME_EMPTY;
 import static 
org.apache.kylin.common.exception.code.ErrorCodeServer.ASYNC_QUERY_RESULT_NOT_FOUND;
@@ -29,6 +30,8 @@ import static 
org.apache.kylin.rest.service.AsyncQueryService.QueryStatus.RUNNIN
 import static 
org.apache.kylin.rest.service.AsyncQueryService.QueryStatus.SUCCESS;
 
 import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
 import java.text.ParseException;
 
 import org.apache.kylin.common.KylinConfig;
@@ -45,6 +48,7 @@ import org.apache.kylin.rest.response.SQLResponse;
 import org.apache.kylin.rest.service.AsyncQueryService;
 import org.apache.kylin.rest.service.QueryService;
 import org.apache.kylin.rest.util.AclEvaluate;
+import org.apache.kylin.rest.util.AsyncQueryRequestLimits;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -272,6 +276,76 @@ public class NAsyncQueryControllerTest extends 
NLocalFileMetadataTestCase {
         Mockito.verify(nAsyncQueryController).query(Mockito.any());
     }
 
+    @Test
+    public void testQueryReachLimit() throws Exception {
+        
Mockito.doReturn(SUCCESS).when(asyncQueryService).queryStatus(Mockito.anyString(),
 Mockito.anyString());
+        SQLResponse response = new SQLResponse();
+        response.setException(false);
+        
Mockito.doReturn(response).when(kapQueryService).queryWithCache(Mockito.any());
+        
getTestConfig().setProperty("kylin.query.unique-async-query-yarn-queue-enabled",
 "true");
+        
getTestConfig().setProperty("kylin.query.async-query.max-concurrent-jobs", "3");
+        reloadAsyncQueryRequestLimits();
+        AsyncQueryRequestLimits limit1 = new AsyncQueryRequestLimits();
+        AsyncQueryRequestLimits limit2 = new AsyncQueryRequestLimits();
+        
mockMvc.perform(MockMvcRequestBuilders.post("/api/async_query").contentType(MediaType.APPLICATION_JSON)
+                
.content(JsonUtil.writeValueAsString(mockAsyncQuerySQLRequest()))
+                .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON)))
+                .andExpect(MockMvcResultMatchers.status().isOk());
+
+        Mockito.verify(nAsyncQueryController).query(Mockito.any());
+        AsyncQueryRequestLimits limits3 = new AsyncQueryRequestLimits();
+        
mockMvc.perform(MockMvcRequestBuilders.post("/api/async_query").contentType(MediaType.APPLICATION_JSON)
+                
.content(JsonUtil.writeValueAsString(mockAsyncQuerySQLRequest()))
+                .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON)))
+                
.andExpect(MockMvcResultMatchers.status().is5xxServerError()).andExpect(result 
-> {
+                    Assert.assertTrue(result.getResolvedException() instanceof 
KylinException);
+                    
Assert.assertEquals(TOO_MANY_ASYNC_QUERY.toErrorCode().getCodeString(),
+                            ((KylinException) 
result.getResolvedException()).getErrorCode().getCodeString());
+                    
Assert.assertEquals(MsgPicker.getMsg().getAsyncQueryTooManyRunning(),
+                            result.getResolvedException().getMessage());
+                });
+        limits3.close();
+        
mockMvc.perform(MockMvcRequestBuilders.post("/api/async_query").contentType(MediaType.APPLICATION_JSON)
+                
.content(JsonUtil.writeValueAsString(mockAsyncQuerySQLRequest()))
+                .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON)))
+                .andExpect(MockMvcResultMatchers.status().isOk());
+        limit1.close();
+        limit2.close();
+
+    }
+
+    @Test
+    public void testQueryReachLimitCase2() throws Exception {
+
+        
Mockito.doReturn(SUCCESS).when(asyncQueryService).queryStatus(Mockito.anyString(),
 Mockito.anyString());
+        SQLResponse response = new SQLResponse();
+        response.setException(false);
+        
Mockito.doReturn(response).when(kapQueryService).queryWithCache(Mockito.any());
+        
getTestConfig().setProperty("kylin.query.unique-async-query-yarn-queue-enabled",
 "true");
+        
getTestConfig().setProperty("kylin.query.async-query.max-concurrent-jobs", "0");
+        reloadAsyncQueryRequestLimits();
+
+        AsyncQueryRequestLimits limit1 = new AsyncQueryRequestLimits();
+        AsyncQueryRequestLimits limit2 = new AsyncQueryRequestLimits();
+        
mockMvc.perform(MockMvcRequestBuilders.post("/api/async_query").contentType(MediaType.APPLICATION_JSON)
+                
.content(JsonUtil.writeValueAsString(mockAsyncQuerySQLRequest()))
+                .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON)))
+                .andExpect(MockMvcResultMatchers.status().isOk());
+        limit1.close();
+        limit2.close();
+
+    }
+
+    private void reloadAsyncQueryRequestLimits() throws Exception {
+        Field field = 
AsyncQueryRequestLimits.class.getDeclaredField("MAX_COUNT");
+        field.setAccessible(true);
+        Field modifiersField = Field.class.getDeclaredField("modifiers");
+        modifiersField.setAccessible(true);
+        modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
+        int count = 
KylinConfig.getInstanceFromEnv().getAsyncQueryMaxConcurrentJobs();
+        field.setInt(null, count);
+    }
+
     @Test
     public void testAsyncQueryContextClean() throws Exception {
         AsyncQuerySQLRequest asyncQuerySQLRequest1 = new 
AsyncQuerySQLRequest();
diff --git 
a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
 
b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 04dad61a83..bd6c676374 100644
--- 
a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ 
b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -142,6 +142,7 @@ import 
org.apache.kylin.rest.response.TableMetaCacheResultV2;
 import org.apache.kylin.rest.security.MutableAclRecord;
 import org.apache.kylin.rest.util.AclEvaluate;
 import org.apache.kylin.rest.util.AclPermissionUtil;
+import org.apache.kylin.rest.util.AsyncQueryRequestLimits;
 import org.apache.kylin.rest.util.PrepareSQLUtils;
 import org.apache.kylin.rest.util.QueryCacheSignatureUtil;
 import org.apache.kylin.rest.util.QueryRequestLimits;
@@ -303,11 +304,13 @@ public class QueryService extends BasicService implements 
CacheSignatureQuerySup
                 if (StringUtils.isNotEmpty(sqlRequest.getSparkQueue())) {
                     queryParams.setSparkQueue(sqlRequest.getSparkQueue());
                 }
-                AsyncQueryJob asyncQueryJob = new AsyncQueryJob();
-                asyncQueryJob.setProject(queryParams.getProject());
-                asyncQueryJob.submit(queryParams);
-                return buildSqlResponse(false, Collections.emptyList(), 0, 
Lists.newArrayList(),
-                        sqlRequest.getProject());
+                try (AsyncQueryRequestLimits ignored = new 
AsyncQueryRequestLimits()) {
+                    AsyncQueryJob asyncQueryJob = new AsyncQueryJob();
+                    asyncQueryJob.setProject(queryParams.getProject());
+                    asyncQueryJob.submit(queryParams);
+                    return buildSqlResponse(false, Collections.emptyList(), 0, 
Lists.newArrayList(),
+                            sqlRequest.getProject());
+                }
             }
 
             SQLResponse fakeResponse = 
TableauInterceptor.tableauIntercept(queryParams.getSql());
diff --git 
a/src/query-service/src/main/java/org/apache/kylin/rest/util/AsyncQueryRequestLimits.java
 
b/src/query-service/src/main/java/org/apache/kylin/rest/util/AsyncQueryRequestLimits.java
new file mode 100644
index 0000000000..ea3df6e9e4
--- /dev/null
+++ 
b/src/query-service/src/main/java/org/apache/kylin/rest/util/AsyncQueryRequestLimits.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.util;
+
+import static 
org.apache.kylin.common.exception.QueryErrorCode.TOO_MANY_ASYNC_QUERY;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.exception.KylinException;
+import org.apache.kylin.common.msg.MsgPicker;
+
+public class AsyncQueryRequestLimits implements AutoCloseable {
+    private static volatile AtomicInteger asyncQueryCount = new 
AtomicInteger(0);
+
+    private static final int MAX_COUNT = 
KylinConfig.getInstanceFromEnv().getAsyncQueryMaxConcurrentJobs();
+
+    private static void openAsyncQueryRequest() {
+        if (MAX_COUNT <= 0) {
+            return;
+        }
+        asyncQueryCount.incrementAndGet();
+
+    }
+
+    public static void checkCount() {
+        if (MAX_COUNT <= 0) {
+            return;
+        }
+        if (asyncQueryCount.get() >= MAX_COUNT) {
+            throw new KylinException(TOO_MANY_ASYNC_QUERY, 
MsgPicker.getMsg().getAsyncQueryTooManyRunning());
+        }
+
+    }
+
+    private static void closeAsyncQueryRequest() {
+        if (MAX_COUNT <= 0) {
+            return;
+        }
+        asyncQueryCount.decrementAndGet();
+
+    }
+
+    public AsyncQueryRequestLimits() {
+        openAsyncQueryRequest();
+    }
+
+    @Override
+    public void close() {
+        closeAsyncQueryRequest();
+    }
+}
diff --git 
a/src/query-service/src/test/java/org/apache/kylin/rest/util/AsyncQueryRequestLimitsTest.java
 
b/src/query-service/src/test/java/org/apache/kylin/rest/util/AsyncQueryRequestLimitsTest.java
new file mode 100644
index 0000000000..6c1d37dca8
--- /dev/null
+++ 
b/src/query-service/src/test/java/org/apache/kylin/rest/util/AsyncQueryRequestLimitsTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.util;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.exception.KylinException;
+import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AsyncQueryRequestLimitsTest extends NLocalFileMetadataTestCase {
+
+    @Before
+    public void setUp() {
+        createTestMetadata();
+    }
+
+    @Test
+    public void testException() throws Exception {
+        
getTestConfig().setProperty("kylin.query.async-query.max-concurrent-jobs", "3");
+        reloadAsyncQueryRequestLimits();
+        AsyncQueryRequestLimits limits1 = new AsyncQueryRequestLimits();
+        AsyncQueryRequestLimits limits2 = new AsyncQueryRequestLimits();
+        AsyncQueryRequestLimits limit3 = new AsyncQueryRequestLimits();
+        Assert.assertThrows(KylinException.class, () -> 
AsyncQueryRequestLimits.checkCount());
+        limit3.close();
+        AsyncQueryRequestLimits.checkCount();
+        limits1.close();
+        limits2.close();
+
+    }
+
+    @Test
+    public void testcase2() throws Exception {
+        
getTestConfig().setProperty("kylin.query.async-query.max-concurrent-jobs", "0");
+        reloadAsyncQueryRequestLimits();
+        AsyncQueryRequestLimits limits1 = new AsyncQueryRequestLimits();
+        AsyncQueryRequestLimits limits2 = new AsyncQueryRequestLimits();
+        AsyncQueryRequestLimits limits3 = new AsyncQueryRequestLimits();
+        AsyncQueryRequestLimits.checkCount();
+        limits1.close();
+        limits2.close();
+        limits3.close();
+
+    }
+
+    private void reloadAsyncQueryRequestLimits() throws Exception {
+        Field field = 
AsyncQueryRequestLimits.class.getDeclaredField("MAX_COUNT");
+        field.setAccessible(true);
+        Field modifiersField = Field.class.getDeclaredField("modifiers");
+        modifiersField.setAccessible(true);
+        modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
+        int count = 
KylinConfig.getInstanceFromEnv().getAsyncQueryMaxConcurrentJobs();
+        field.setInt(null, count);
+    }
+
+}

Reply via email to