This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 07a4be3795fbad66a66f92dfdb96462eccaa1061
Author: fanfanAlice <41991994+fanfanal...@users.noreply.github.com>
AuthorDate: Fri Mar 3 09:48:11 2023 +0800

    KYLIN-5545 Try best to interrupt running queries and limit the number of 
queries to run.
---
 .../rest/controller/HealthControllerTest.java      |   2 +-
 .../apache/kylin/rest/service/HealthService.java   |   2 +-
 .../kylin/rest/service/HealthServiceTest.java      |   2 +-
 .../org/apache/kylin/common/KylinConfigBase.java   |   4 +
 .../query/exception/UserStopQueryException.java    |   0
 .../org/apache/kylin/query/util/CancelFlag.java    |  49 +++++++
 .../kylin/query/util/QueryInterruptChecker.java    |  46 +++++++
 .../kylin/query/util}/SlowQueryDetector.java       |   3 +-
 .../apache/kylin/newten/SlowQueryDetectorTest.java |   2 +-
 .../org/apache/kylin/query/util/PushDownUtil.java  |   2 +-
 .../org/apache/kylin/query/util/QueryUtil.java     |  18 +--
 .../query/util/RestoreFromComputedColumn.java      |   4 +-
 .../apache/kylin/rest/service/QueryService.java    |   4 +-
 .../kylin/rest/service/QueryServiceTest.java       |   2 +-
 .../org/apache/kylin/query/engine/QueryExec.java   |   6 +-
 .../kylin/query/engine/QueryRoutingEngine.java     |  66 ++++++---
 .../query/util/PushDownQueryRequestLimits.java}    |  28 ++--
 .../apache/kylin/query/SlowQueryDetectorTest.java  |   1 +
 .../kylin/query/engine/QueryRoutingEngineTest.java | 148 +++++++++++++++++++++
 .../hive/utils/TestResourceDetectUtilsByMock.scala |  34 ++++-
 .../kylin/query/pushdown/SparkSqlClient.scala      |  19 +--
 .../kylin/query/runtime/plan/ResultPlan.scala      |  19 +--
 .../query/pushdown/PushdownJobCancelTest.java      |   4 +-
 .../kylin/query/runtime/plan/TestResultPlan.java   |  14 +-
 .../spark/sql/hive/utils/ResourceDetectUtils.scala |  54 +++++---
 25 files changed, 431 insertions(+), 102 deletions(-)

diff --git 
a/src/common-server/src/test/java/org/apache/kylin/rest/controller/HealthControllerTest.java
 
b/src/common-server/src/test/java/org/apache/kylin/rest/controller/HealthControllerTest.java
index a0f2aa269f..e6c2998605 100644
--- 
a/src/common-server/src/test/java/org/apache/kylin/rest/controller/HealthControllerTest.java
+++ 
b/src/common-server/src/test/java/org/apache/kylin/rest/controller/HealthControllerTest.java
@@ -20,7 +20,7 @@ package org.apache.kylin.rest.controller;
 import java.util.List;
 import java.util.concurrent.ConcurrentMap;
 
-import org.apache.kylin.query.SlowQueryDetector;
+import org.apache.kylin.query.util.SlowQueryDetector;
 import org.apache.kylin.rest.service.ServiceTestBase;
 import org.apache.kylin.rest.response.HealthResponse;
 import org.apache.spark.sql.SparderEnv;
diff --git 
a/src/common-service/src/main/java/org/apache/kylin/rest/service/HealthService.java
 
b/src/common-service/src/main/java/org/apache/kylin/rest/service/HealthService.java
index 264993f59c..afb27b583c 100644
--- 
a/src/common-service/src/main/java/org/apache/kylin/rest/service/HealthService.java
+++ 
b/src/common-service/src/main/java/org/apache/kylin/rest/service/HealthService.java
@@ -20,7 +20,7 @@ package org.apache.kylin.rest.service;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.kylin.query.SlowQueryDetector;
+import org.apache.kylin.query.util.SlowQueryDetector;
 import org.apache.kylin.rest.response.HealthResponse;
 import org.apache.spark.sql.SparderEnv;
 import org.springframework.stereotype.Component;
diff --git 
a/src/common-service/src/test/java/org/apache/kylin/rest/service/HealthServiceTest.java
 
b/src/common-service/src/test/java/org/apache/kylin/rest/service/HealthServiceTest.java
index b598e3f4fb..d5f33aa471 100644
--- 
a/src/common-service/src/test/java/org/apache/kylin/rest/service/HealthServiceTest.java
+++ 
b/src/common-service/src/test/java/org/apache/kylin/rest/service/HealthServiceTest.java
@@ -20,7 +20,7 @@ package org.apache.kylin.rest.service;
 import java.util.List;
 import java.util.concurrent.ConcurrentMap;
 
-import org.apache.kylin.query.SlowQueryDetector;
+import org.apache.kylin.query.util.SlowQueryDetector;
 import org.apache.kylin.rest.response.HealthResponse;
 import org.apache.spark.sql.SparderEnv;
 import org.junit.After;
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index fccc636084..f887fbd30a 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -3842,6 +3842,10 @@ public abstract class KylinConfigBase implements 
Serializable {
         return 
Long.parseLong(getOptional("kylin.query.max-measure-segment-pruner-before-days",
 "-1"));
     }
 
+    public int getQueryConcurrentRunningThresholdForPushDown() {
+        return 
Integer.parseInt(getOptional("kylin.query.pushdown-concurrent-running-threshold",
 "10"));
+    }
+
     // 
============================================================================
     // Cost based index Planner
     // 
============================================================================
diff --git 
a/src/query-common/src/main/java/org/apache/kylin/query/exception/UserStopQueryException.java
 
b/src/core-common/src/main/java/org/apache/kylin/query/exception/UserStopQueryException.java
similarity index 100%
copy from 
src/query-common/src/main/java/org/apache/kylin/query/exception/UserStopQueryException.java
copy to 
src/core-common/src/main/java/org/apache/kylin/query/exception/UserStopQueryException.java
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/query/util/CancelFlag.java 
b/src/core-common/src/main/java/org/apache/kylin/query/util/CancelFlag.java
new file mode 100644
index 0000000000..1c1885fb60
--- /dev/null
+++ b/src/core-common/src/main/java/org/apache/kylin/query/util/CancelFlag.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.query.util;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.base.Preconditions;
+
+public class CancelFlag {
+    public final AtomicBoolean atomicBoolean;
+    private static final ThreadLocal<CancelFlag> CONTEXT_CANCEL_FLAG = 
ThreadLocal
+            .withInitial(() -> new CancelFlag(new AtomicBoolean(false)));
+
+    public CancelFlag(AtomicBoolean atomicBoolean) {
+        this.atomicBoolean = Preconditions.checkNotNull(atomicBoolean);
+    }
+
+    public static CancelFlag getContextCancelFlag() {
+        return CONTEXT_CANCEL_FLAG.get();
+    }
+
+    public boolean isCancelRequested() {
+        return this.atomicBoolean.get();
+    }
+
+    public void requestCancel() {
+        this.atomicBoolean.compareAndSet(false, true);
+    }
+
+    public void clearCancel() {
+        this.atomicBoolean.compareAndSet(true, false);
+    }
+}
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/query/util/QueryInterruptChecker.java
 
b/src/core-common/src/main/java/org/apache/kylin/query/util/QueryInterruptChecker.java
new file mode 100644
index 0000000000..b163cc5a31
--- /dev/null
+++ 
b/src/core-common/src/main/java/org/apache/kylin/query/util/QueryInterruptChecker.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.query.util;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.exception.KylinTimeoutException;
+import org.apache.kylin.query.exception.UserStopQueryException;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class QueryInterruptChecker {
+
+    private QueryInterruptChecker() {
+        // This is Utils.
+    }
+
+    public static void checkThreadInterrupted(String errorMsgLog, String 
stepInfo) {
+        if (Thread.currentThread().isInterrupted()) {
+            log.error("{} {}", QueryContext.current().getQueryId(), 
errorMsgLog);
+            if 
(SlowQueryDetector.getRunningQueries().get(Thread.currentThread()).isStopByUser())
 {
+                throw new UserStopQueryException("");
+            }
+            QueryContext.current().getQueryTagInfo().setTimeout(true);
+            throw new KylinTimeoutException("The query exceeds the set time 
limit of "
+                    + 
KylinConfig.getInstanceFromEnv().getQueryTimeoutSeconds() + "s. " + stepInfo);
+        }
+    }
+}
diff --git 
a/src/query-common/src/main/java/org/apache/kylin/query/SlowQueryDetector.java 
b/src/core-common/src/main/java/org/apache/kylin/query/util/SlowQueryDetector.java
similarity index 98%
rename from 
src/query-common/src/main/java/org/apache/kylin/query/SlowQueryDetector.java
rename to 
src/core-common/src/main/java/org/apache/kylin/query/util/SlowQueryDetector.java
index 136127a3b3..387399d161 100644
--- 
a/src/query-common/src/main/java/org/apache/kylin/query/SlowQueryDetector.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/query/util/SlowQueryDetector.java
@@ -16,12 +16,11 @@
  * limitations under the License.
  */
 
-package org.apache.kylin.query;
+package org.apache.kylin.query.util;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
-import org.apache.calcite.util.CancelFlag;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
 import org.slf4j.Logger;
diff --git 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/SlowQueryDetectorTest.java 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/SlowQueryDetectorTest.java
index 3e478f940c..15301252ab 100644
--- 
a/src/kylin-it/src/test/java/org/apache/kylin/newten/SlowQueryDetectorTest.java
+++ 
b/src/kylin-it/src/test/java/org/apache/kylin/newten/SlowQueryDetectorTest.java
@@ -33,12 +33,12 @@ import 
org.apache.kylin.engine.spark.NLocalWithSparkSessionTest;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
 import org.apache.kylin.metadata.project.NProjectManager;
-import org.apache.kylin.query.SlowQueryDetector;
 import org.apache.kylin.query.engine.QueryExec;
 import org.apache.kylin.query.pushdown.SparkSqlClient;
 import org.apache.kylin.query.runtime.plan.ResultPlan;
 import org.apache.kylin.query.util.QueryParams;
 import org.apache.kylin.query.util.QueryUtil;
+import org.apache.kylin.query.util.SlowQueryDetector;
 import org.apache.kylin.util.ExecAndComp;
 import org.apache.spark.sql.SparderEnv;
 import org.junit.After;
diff --git 
a/src/query-common/src/main/java/org/apache/kylin/query/util/PushDownUtil.java 
b/src/query-common/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
index 60c02d73dc..10ca1ecf94 100644
--- 
a/src/query-common/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
+++ 
b/src/query-common/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
@@ -200,7 +200,7 @@ public class PushDownUtil {
                     .map(c -> 
c.getClass().getCanonicalName()).collect(Collectors.joining(",")));
         }
         for (IPushDownConverter converter : pushDownConverters) {
-            QueryUtil.checkThreadInterrupted("Interrupted sql transformation 
at the stage of " + converter.getClass(),
+            QueryInterruptChecker.checkThreadInterrupted("Interrupted sql 
transformation at the stage of " + converter.getClass(),
                     "Current step: Massage push-down sql. ");
             sql = converter.convert(sql, queryParams.getProject(), 
queryParams.getDefaultSchema());
         }
diff --git 
a/src/query-common/src/main/java/org/apache/kylin/query/util/QueryUtil.java 
b/src/query-common/src/main/java/org/apache/kylin/query/util/QueryUtil.java
index 77c7b2784d..69f4711099 100644
--- a/src/query-common/src/main/java/org/apache/kylin/query/util/QueryUtil.java
+++ b/src/query-common/src/main/java/org/apache/kylin/query/util/QueryUtil.java
@@ -32,14 +32,11 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KapConfig;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
-import org.apache.kylin.common.exception.KylinTimeoutException;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.StringHelper;
 import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.metadata.query.BigQueryThresholdUpdater;
 import org.apache.kylin.query.IQueryTransformer;
-import org.apache.kylin.query.SlowQueryDetector;
-import org.apache.kylin.query.exception.UserStopQueryException;
 import org.apache.kylin.query.security.AccessDeniedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -218,7 +215,8 @@ public class QueryUtil {
         }
 
         for (IQueryTransformer t : transformers) {
-            QueryUtil.checkThreadInterrupted("Interrupted sql transformation 
at the stage of " + t.getClass(),
+            QueryInterruptChecker.checkThreadInterrupted(
+                    "Interrupted sql transformation at the stage of " + 
t.getClass(),
                     "Current step: SQL transformation.");
             sql = t.transform(sql, queryParams.getProject(), 
queryParams.getDefaultSchema());
         }
@@ -317,16 +315,4 @@ public class QueryUtil {
         }
         return sqlSelect;
     }
-
-    public static void checkThreadInterrupted(String errorMsgLog, String 
stepInfo) {
-        if (Thread.currentThread().isInterrupted()) {
-            log.error("{} {}", QueryContext.current().getQueryId(), 
errorMsgLog);
-            if 
(SlowQueryDetector.getRunningQueries().get(Thread.currentThread()).isStopByUser())
 {
-                throw new UserStopQueryException("");
-            }
-            QueryContext.current().getQueryTagInfo().setTimeout(true);
-            throw new KylinTimeoutException("The query exceeds the set time 
limit of "
-                    + 
KylinConfig.getInstanceFromEnv().getQueryTimeoutSeconds() + "s. " + stepInfo);
-        }
-    }
 }
diff --git 
a/src/query-common/src/main/java/org/apache/kylin/query/util/RestoreFromComputedColumn.java
 
b/src/query-common/src/main/java/org/apache/kylin/query/util/RestoreFromComputedColumn.java
index 8ae1337b11..406aa5ba5c 100644
--- 
a/src/query-common/src/main/java/org/apache/kylin/query/util/RestoreFromComputedColumn.java
+++ 
b/src/query-common/src/main/java/org/apache/kylin/query/util/RestoreFromComputedColumn.java
@@ -81,7 +81,7 @@ public class RestoreFromComputedColumn implements 
IPushDownConverter {
         int maxRecursionTimes = 
KapConfig.getInstanceFromEnv().getComputedColumnMaxRecursionTimes();
 
         while (recursionTimes < maxRecursionTimes) {
-            QueryUtil.checkThreadInterrupted("Interrupted sql transformation 
at the stage of RestoreFromComputedColumn",
+            QueryInterruptChecker.checkThreadInterrupted("Interrupted sql 
transformation at the stage of RestoreFromComputedColumn",
                     "Current step: SQL transformation");
             recursionTimes++;
             boolean recursionCompleted = true;
@@ -181,7 +181,7 @@ public class RestoreFromComputedColumn implements 
IPushDownConverter {
         for (NDataModel model : modelMap.values()) {
             QueryAliasMatchInfo info = 
model.getComputedColumnDescs().isEmpty() ? null
                     : queryAliasMatcher.match(model, sqlSelect);
-            QueryUtil.checkThreadInterrupted("Interrupted sql transformation 
at the stage of RestoreFromComputedColumn",
+            QueryInterruptChecker.checkThreadInterrupted("Interrupted sql 
transformation at the stage of RestoreFromComputedColumn",
                     "Current step: SQL transformation");
             if (info == null) {
                 continue;
diff --git 
a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
 
b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 9a3c19a10c..14eacb5d43 100644
--- 
a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ 
b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -108,7 +108,6 @@ import org.apache.kylin.metadata.querymeta.TableMeta;
 import org.apache.kylin.metadata.querymeta.TableMetaWithType;
 import org.apache.kylin.metadata.realization.NoRealizationFoundException;
 import org.apache.kylin.metadata.realization.RoutingIndicatorException;
-import org.apache.kylin.query.SlowQueryDetector;
 import org.apache.kylin.query.blacklist.SQLBlacklistItem;
 import org.apache.kylin.query.blacklist.SQLBlacklistManager;
 import org.apache.kylin.query.calcite.KEDialect;
@@ -128,6 +127,7 @@ import org.apache.kylin.query.util.QueryParams;
 import org.apache.kylin.query.util.QueryUtil;
 import org.apache.kylin.query.util.RawSql;
 import org.apache.kylin.query.util.RawSqlParser;
+import org.apache.kylin.query.util.SlowQueryDetector;
 import org.apache.kylin.query.util.TokenMgrError;
 import org.apache.kylin.rest.aspect.Transaction;
 import org.apache.kylin.rest.cluster.ClusterManager;
@@ -930,7 +930,7 @@ public class QueryService extends BasicService implements 
CacheSignatureQuerySup
 
     private int getSqlConcurrentCount(SQLBlacklistItem sqlBlacklistItem) {
         int concurrentCount = 0;
-        Collection<SlowQueryDetector.QueryEntry> runningQueries = 
slowQueryDetector.getRunningQueries().values();
+        Collection<SlowQueryDetector.QueryEntry> runningQueries = 
SlowQueryDetector.getRunningQueries().values();
         for (SlowQueryDetector.QueryEntry query : runningQueries) {
             if (sqlBlacklistItem.match(query.getSql())) {
                 concurrentCount++;
diff --git 
a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
 
b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
index 46a0ca4b2c..2470fbecf4 100644
--- 
a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
+++ 
b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java
@@ -103,7 +103,6 @@ import org.apache.kylin.metadata.querymeta.TableMeta;
 import org.apache.kylin.metadata.querymeta.TableMetaWithType;
 import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
-import org.apache.kylin.query.SlowQueryDetector;
 import org.apache.kylin.metadata.user.ManagedUser;
 import org.apache.kylin.query.blacklist.SQLBlacklistItem;
 import org.apache.kylin.query.blacklist.SQLBlacklistManager;
@@ -116,6 +115,7 @@ import 
org.apache.kylin.query.util.DateNumberFilterTransformer;
 import org.apache.kylin.query.util.QueryParams;
 import org.apache.kylin.query.util.QueryUtil;
 import org.apache.kylin.query.util.RawSqlParser;
+import org.apache.kylin.query.util.SlowQueryDetector;
 import org.apache.kylin.rest.cluster.ClusterManager;
 import org.apache.kylin.rest.cluster.DefaultClusterManager;
 import org.apache.kylin.rest.config.AppConfig;
diff --git 
a/src/query/src/main/java/org/apache/kylin/query/engine/QueryExec.java 
b/src/query/src/main/java/org/apache/kylin/query/engine/QueryExec.java
index ac2e8f673c..6ce92a61b5 100644
--- a/src/query/src/main/java/org/apache/kylin/query/engine/QueryExec.java
+++ b/src/query/src/main/java/org/apache/kylin/query/engine/QueryExec.java
@@ -68,6 +68,7 @@ import org.apache.kylin.query.relnode.OLAPContext;
 import org.apache.kylin.query.util.AsyncQueryUtil;
 import org.apache.kylin.query.util.CalcitePlanRouterVisitor;
 import org.apache.kylin.query.util.HepUtils;
+import org.apache.kylin.query.util.QueryInterruptChecker;
 import org.apache.kylin.query.util.QueryUtil;
 import org.apache.kylin.query.util.RelAggPushDownUtil;
 import org.slf4j.Logger;
@@ -410,12 +411,13 @@ public class QueryExec {
             RelAggPushDownUtil.clearUnmatchedJoinDigest();
             return new SparderQueryPlanExec().executeToIterable(transformed, 
dataContext);
         } catch (NoRealizationFoundException e) {
-            QueryUtil.checkThreadInterrupted("Interrupted 
SparderQueryOptimized NoRealizationFoundException",
+            QueryInterruptChecker.checkThreadInterrupted(
+                    "Interrupted SparderQueryOptimized 
NoRealizationFoundException",
                     "Current step: TableIndex join snapshot aggPushDown");
             tryTimes = tryTimes + 1;
             return sparderQueryOptimized(transformed, tryTimes, postOptRules);
         } catch (Exception e) {
-            QueryUtil.checkThreadInterrupted("Interrupted 
SparderQueryOptimized error",
+            QueryInterruptChecker.checkThreadInterrupted("Interrupted 
SparderQueryOptimized error",
                     "Current step: Table Index join snapshot aggPushDown");
             setSparderQueryOptimizedExceptionMsg(e.getMessage());
             return null;
diff --git 
a/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java 
b/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java
index cad6d87c86..e4ed2992d0 100644
--- 
a/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java
+++ 
b/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java
@@ -28,6 +28,8 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.calcite.avatica.ColumnMetaData;
 import org.apache.calcite.prepare.CalcitePrepareImpl;
@@ -50,10 +52,13 @@ import org.apache.kylin.metadata.query.StructField;
 import org.apache.kylin.metadata.querymeta.SelectedColumnMeta;
 import 
org.apache.kylin.metadata.realization.NoStreamingRealizationFoundException;
 import org.apache.kylin.query.engine.data.QueryResult;
+import org.apache.kylin.query.exception.BusyQueryException;
 import org.apache.kylin.query.exception.NotSupportedSQLException;
 import org.apache.kylin.query.mask.QueryResultMasks;
 import org.apache.kylin.query.relnode.OLAPContext;
+import org.apache.kylin.query.util.PushDownQueryRequestLimits;
 import org.apache.kylin.query.util.PushDownUtil;
+import org.apache.kylin.query.util.QueryInterruptChecker;
 import org.apache.kylin.query.util.QueryParams;
 import org.apache.kylin.query.util.QueryUtil;
 import org.apache.kylin.source.adhocquery.PushdownResult;
@@ -261,24 +266,55 @@ public class QueryRoutingEngine {
     public PushdownResult tryPushDownSelectQuery(QueryParams queryParams, 
SQLException sqlException, boolean isPrepare)
             throws Exception {
         
QueryContext.currentTrace().startSpan(QueryTrace.SQL_PUSHDOWN_TRANSFORMATION);
-        String sqlString = queryParams.getSql();
-        if (isPrepareStatementWithParams(queryParams)) {
-            sqlString = queryParams.getPrepareSql();
-        }
+        Semaphore semaphore = 
PushDownQueryRequestLimits.getSingletonInstance();
+        logger.info("Query: {} Before the current push down counter {}.", 
QueryContext.current().getQueryId(),
+                semaphore.availablePermits());
+        boolean acquired = false;
+        boolean asyncQuery = 
QueryContext.current().getQueryTagInfo().isAsyncQuery();
+        KylinConfig projectConfig = 
NProjectManager.getProjectConfig(queryParams.getProject());
+        try {
+            //SlowQueryDetector query timeout period is system-level
+            int queryTimeout = 
KylinConfig.getInstanceFromEnv().getQueryTimeoutSeconds();
+            if (!asyncQuery && projectConfig.isPushDownEnabled()) {
+                acquired = semaphore.tryAcquire(queryTimeout, 
TimeUnit.SECONDS);
+                if (!acquired) {
+                    logger.info("query: {} failed to get acquire.", 
QueryContext.current().getQueryId());
+                    throw new BusyQueryException("Query rejected. Caused by 
PushDown query server is too busy.");
+                } else {
+                    logger.info("query: {} success to get acquire.", 
QueryContext.current().getQueryId());
+                }
+            }
+            String sqlString = queryParams.getSql();
+            if (isPrepareStatementWithParams(queryParams)) {
+                sqlString = queryParams.getPrepareSql();
+            }
 
-        if (BackdoorToggles.getPrepareOnly()) {
-            sqlString = QueryUtil.addLimit(sqlString);
-        }
+            if (BackdoorToggles.getPrepareOnly()) {
+                sqlString = QueryUtil.addLimit(sqlString);
+            }
 
-        String massagedSql = 
QueryUtil.appendLimitOffset(queryParams.getProject(), sqlString, 
queryParams.getLimit(),
-                queryParams.getOffset());
-        if (isPrepareStatementWithParams(queryParams)) {
-            QueryContext.current().getMetrics().setCorrectedSql(massagedSql);
+            String massagedSql = 
QueryUtil.appendLimitOffset(queryParams.getProject(), sqlString,
+                    queryParams.getLimit(), queryParams.getOffset());
+            if (isPrepareStatementWithParams(queryParams)) {
+                
QueryContext.current().getMetrics().setCorrectedSql(massagedSql);
+            }
+            queryParams.setSql(massagedSql);
+            queryParams.setSqlException(sqlException);
+            queryParams.setPrepare(isPrepare);
+            return PushDownUtil.tryIterQuery(queryParams);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            QueryInterruptChecker.checkThreadInterrupted("Interrupted sql push 
down at the stage of QueryRoutingEngine",
+                    "Current step: try push down select query");
+            throw e;
+        } finally {
+            if (acquired) {
+                semaphore.release();
+                logger.info("Query: {} success to release acquire", 
QueryContext.current().getQueryId());
+            }
+            logger.info("Query: {} After the current push down counter {}.", 
QueryContext.current().getQueryId(),
+                    semaphore.availablePermits());
         }
-        queryParams.setSql(massagedSql);
-        queryParams.setSqlException(sqlException);
-        queryParams.setPrepare(isPrepare);
-        return PushDownUtil.tryIterQuery(queryParams);
     }
 
     private boolean isPrepareStatementWithParams(QueryParams queryParams) {
diff --git 
a/src/query-common/src/main/java/org/apache/kylin/query/exception/UserStopQueryException.java
 
b/src/query/src/main/java/org/apache/kylin/query/util/PushDownQueryRequestLimits.java
similarity index 55%
rename from 
src/query-common/src/main/java/org/apache/kylin/query/exception/UserStopQueryException.java
rename to 
src/query/src/main/java/org/apache/kylin/query/util/PushDownQueryRequestLimits.java
index 01d5c565ce..5234b99b43 100644
--- 
a/src/query-common/src/main/java/org/apache/kylin/query/exception/UserStopQueryException.java
+++ 
b/src/query/src/main/java/org/apache/kylin/query/util/PushDownQueryRequestLimits.java
@@ -16,20 +16,28 @@
  * limitations under the License.
  */
 
-package org.apache.kylin.query.exception;
+package org.apache.kylin.query.util;
 
-import static org.apache.kylin.common.exception.QueryErrorCode.USER_STOP_QUERY;
+import java.util.concurrent.Semaphore;
 
-import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.kylin.common.exception.KylinException;
+import org.apache.kylin.common.KylinConfig;
 
-public class UserStopQueryException extends KylinException {
+public class PushDownQueryRequestLimits {
 
-    public UserStopQueryException(String message) {
-        super(USER_STOP_QUERY, message);
+    private PushDownQueryRequestLimits() {
     }
 
-    public static boolean causedByUserStop(Throwable e) {
-        return e instanceof UserStopQueryException || 
ExceptionUtils.getRootCause(e) instanceof UserStopQueryException;
+    private static volatile Semaphore instance;
+
+    public static Semaphore getSingletonInstance() {
+        if (instance == null) {
+            synchronized (PushDownQueryRequestLimits.class) {
+                if (instance == null) {
+                    instance = new Semaphore(
+                            
KylinConfig.getInstanceFromEnv().getQueryConcurrentRunningThresholdForPushDown(),
 true);
+                }
+            }
+        }
+        return instance;
     }
-}
\ No newline at end of file
+}
diff --git 
a/src/query/src/test/java/org/apache/kylin/query/SlowQueryDetectorTest.java 
b/src/query/src/test/java/org/apache/kylin/query/SlowQueryDetectorTest.java
index 8c14acd1ed..61defcba39 100644
--- a/src/query/src/test/java/org/apache/kylin/query/SlowQueryDetectorTest.java
+++ b/src/query/src/test/java/org/apache/kylin/query/SlowQueryDetectorTest.java
@@ -20,6 +20,7 @@ package org.apache.kylin.query;
 
 import java.util.concurrent.TimeUnit;
 
+import org.apache.kylin.query.util.SlowQueryDetector;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
diff --git 
a/src/query/src/test/java/org/apache/kylin/query/engine/QueryRoutingEngineTest.java
 
b/src/query/src/test/java/org/apache/kylin/query/engine/QueryRoutingEngineTest.java
index 7599095bf6..44ab952e70 100644
--- 
a/src/query/src/test/java/org/apache/kylin/query/engine/QueryRoutingEngineTest.java
+++ 
b/src/query/src/test/java/org/apache/kylin/query/engine/QueryRoutingEngineTest.java
@@ -25,12 +25,18 @@ import java.sql.Date;
 import java.sql.SQLException;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.util.UUID;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.exception.CommonErrorCode;
 import org.apache.kylin.common.exception.KylinException;
+import org.apache.kylin.common.exception.KylinTimeoutException;
 import org.apache.kylin.common.exception.NewQueryRefuseException;
 import org.apache.kylin.common.exception.QueryErrorCode;
+import org.apache.kylin.common.exception.ServerErrorCode;
 import org.apache.kylin.common.exception.TargetSegmentNotFoundException;
 import org.apache.kylin.common.persistence.InMemResourceStore;
 import org.apache.kylin.common.persistence.ResourceStore;
@@ -39,8 +45,12 @@ import 
org.apache.kylin.common.util.NLocalFileMetadataTestCase;
 import 
org.apache.kylin.metadata.realization.NoStreamingRealizationFoundException;
 import org.apache.kylin.query.QueryExtension;
 import org.apache.kylin.query.engine.data.QueryResult;
+import org.apache.kylin.query.exception.BusyQueryException;
 import org.apache.kylin.query.exception.NotSupportedSQLException;
+import org.apache.kylin.query.exception.UserStopQueryException;
+import org.apache.kylin.query.util.PushDownQueryRequestLimits;
 import org.apache.kylin.query.util.QueryParams;
+import org.apache.kylin.query.util.SlowQueryDetector;
 import org.apache.kylin.source.adhocquery.PushdownResult;
 import org.apache.spark.SparkException;
 import org.junit.After;
@@ -48,6 +58,8 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
+import org.mockito.MockSettings;
+import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 
 public class QueryRoutingEngineTest extends NLocalFileMetadataTestCase {
@@ -320,4 +332,140 @@ public class QueryRoutingEngineTest extends 
NLocalFileMetadataTestCase {
 
         Assert.assertThrows(NotSupportedSQLException.class, () -> 
queryRoutingEngine.queryWithSqlMassage(queryParams));
     }
+
+    @Test
+    public void testQueryPushDownFail() {
+        final String sql = "SELECT 1";
+        final String project = "tpch";
+        KylinConfig kylinconfig = KylinConfig.getInstanceFromEnv();
+        kylinconfig.setProperty("kylin.query.timeout-seconds", "5");
+        Semaphore semaphore = new Semaphore(0, true);
+        try (MockedStatic<PushDownQueryRequestLimits> pushRequest = Mockito
+                .mockStatic(PushDownQueryRequestLimits.class)) {
+            pushRequest.when((MockedStatic.Verification) 
PushDownQueryRequestLimits.getSingletonInstance())
+                    .thenReturn(semaphore);
+            QueryParams queryParams = new QueryParams();
+            queryParams.setForcedToPushDown(true);
+            queryParams.setProject(project);
+            queryParams.setSql(sql);
+            queryParams.setKylinConfig(kylinconfig);
+            queryParams.setSelect(true);
+            QueryRoutingEngine queryRoutingEngine = 
Mockito.spy(QueryRoutingEngine.class);
+            try {
+                queryRoutingEngine.tryPushDownSelectQuery(queryParams, null, 
true);
+                Assert.fail("Query rejected. Caused by PushDown query server 
is too busy");
+            } catch (Exception e) {
+                Assert.assertTrue(e instanceof BusyQueryException);
+                Assert.assertEquals(QueryErrorCode.BUSY_QUERY.toErrorCode(), 
((BusyQueryException) e).getErrorCode());
+            }
+        }
+    }
+
+    @Test
+    public void testQueryPushDownSuccess() {
+        final String sql = "SELECT 1";
+        final String project = "tpch";
+        KylinConfig kylinconfig = KylinConfig.getInstanceFromEnv();
+        QueryParams queryParams = new QueryParams();
+        queryParams.setForcedToPushDown(true);
+        queryParams.setProject(project);
+        queryParams.setSql(sql);
+        queryParams.setKylinConfig(kylinconfig);
+        queryParams.setSelect(true);
+        try {
+            queryRoutingEngine.tryPushDownSelectQuery(queryParams, null, true);
+            Assert.fail("Can't complete the operation. Please check the Spark 
environment and try again.");
+        } catch (Exception e) {
+            Assert.assertTrue(e instanceof KylinException);
+            Assert.assertEquals(ServerErrorCode.SPARK_FAILURE.toErrorCode(), 
((KylinException) e).getErrorCode());
+        }
+    }
+
+    @Test
+    public void testQueryAsync() {
+        final String sql = "SELECT 1";
+        final String project = "tpch";
+        KylinConfig kylinconfig = KylinConfig.getInstanceFromEnv();
+        QueryParams queryParams = new QueryParams();
+        queryParams.setForcedToPushDown(true);
+        queryParams.setProject(project);
+        queryParams.setSql(sql);
+        queryParams.setKylinConfig(kylinconfig);
+        queryParams.setSelect(true);
+        try {
+            QueryContext.current().getQueryTagInfo().setAsyncQuery(true);
+            queryRoutingEngine.tryPushDownSelectQuery(queryParams, null, true);
+            Assert.fail("Can't complete the operation. Please check the Spark 
environment and try again.");
+        } catch (Exception e) {
+            Assert.assertTrue(e instanceof KylinException);
+            Assert.assertEquals(ServerErrorCode.SPARK_FAILURE.toErrorCode(), 
((KylinException) e).getErrorCode());
+        }
+    }
+
+    @Test
+    public void testQueryInterruptedTimeOut() {
+        final String sql = "SELECT 1";
+        final String project = "tpch";
+        KylinConfig kylinconfig = KylinConfig.getInstanceFromEnv();
+        QueryParams queryParams = new QueryParams();
+        queryParams.setForcedToPushDown(true);
+        queryParams.setProject(project);
+        queryParams.setSql(sql);
+        queryParams.setKylinConfig(kylinconfig);
+        queryParams.setSelect(true);
+        MockSettings mockSettings = 
Mockito.withSettings().defaultAnswer(Mockito.RETURNS_DEFAULTS);
+        Semaphore semaphore = Mockito.mock(Semaphore.class, mockSettings);
+        SlowQueryDetector slowQueryDetector = new SlowQueryDetector();
+        try (MockedStatic<PushDownQueryRequestLimits> pushRequest = Mockito
+                .mockStatic(PushDownQueryRequestLimits.class)) {
+            
pushRequest.when(PushDownQueryRequestLimits::getSingletonInstance).thenReturn(semaphore);
+            try {
+                UUID uuid = UUID.randomUUID();
+                slowQueryDetector.queryStart(uuid.toString());
+                Mockito.doThrow(new 
InterruptedException()).when(semaphore).tryAcquire(Mockito.anyLong(),
+                        Mockito.any(TimeUnit.class));
+                QueryRoutingEngine queryRoutingEngine = 
Mockito.spy(QueryRoutingEngine.class);
+                queryRoutingEngine.tryPushDownSelectQuery(queryParams, null, 
true);
+            } catch (Exception e) {
+                Assert.assertTrue(e instanceof KylinTimeoutException);
+                Assert.assertEquals(CommonErrorCode.TIMEOUT.toErrorCode(), 
((KylinException) e).getErrorCode());
+            }
+        } finally {
+            slowQueryDetector.queryEnd();
+        }
+    }
+
+    @Test
+    public void testQueryInterruptedUserStop() {
+        final String sql = "SELECT 1";
+        final String project = "tpch";
+        KylinConfig kylinconfig = KylinConfig.getInstanceFromEnv();
+        QueryParams queryParams = new QueryParams();
+        queryParams.setForcedToPushDown(true);
+        queryParams.setProject(project);
+        queryParams.setSql(sql);
+        queryParams.setKylinConfig(kylinconfig);
+        queryParams.setSelect(true);
+        MockSettings mockSettings = 
Mockito.withSettings().defaultAnswer(Mockito.RETURNS_DEFAULTS);
+        Semaphore semaphore = Mockito.mock(Semaphore.class, mockSettings);
+        SlowQueryDetector slowQueryDetector = new SlowQueryDetector();
+        try (MockedStatic<PushDownQueryRequestLimits> pushRequest = Mockito
+                .mockStatic(PushDownQueryRequestLimits.class)) {
+            
pushRequest.when(PushDownQueryRequestLimits::getSingletonInstance).thenReturn(semaphore);
+            try {
+                UUID uuid = UUID.randomUUID();
+                slowQueryDetector.queryStart(uuid.toString());
+                
SlowQueryDetector.getRunningQueries().get(Thread.currentThread()).setStopByUser(true);
+                Mockito.doThrow(new 
InterruptedException()).when(semaphore).tryAcquire(Mockito.anyLong(),
+                        Mockito.any(TimeUnit.class));
+                QueryRoutingEngine queryRoutingEngine = 
Mockito.spy(QueryRoutingEngine.class);
+                queryRoutingEngine.tryPushDownSelectQuery(queryParams, null, 
true);
+            } catch (Exception e) {
+                Assert.assertTrue(e instanceof UserStopQueryException);
+                
Assert.assertEquals(QueryErrorCode.USER_STOP_QUERY.toErrorCode(), 
((KylinException) e).getErrorCode());
+            }
+        } finally {
+            slowQueryDetector.queryEnd();
+        }
+    }
 }
diff --git 
a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/sql/hive/utils/TestResourceDetectUtilsByMock.scala
 
b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/sql/hive/utils/TestResourceDetectUtilsByMock.scala
index b93a98cb20..164022af61 100644
--- 
a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/sql/hive/utils/TestResourceDetectUtilsByMock.scala
+++ 
b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/sql/hive/utils/TestResourceDetectUtilsByMock.scala
@@ -21,7 +21,7 @@ package org.apache.spark.sql.hive.utils
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.common.SharedSparkSession
-import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.execution.{FileSourceScanExec, 
LayoutFileSourceScanExec}
 import org.apache.spark.sql.execution.datasources.{FileIndex, 
HadoopFsRelation, PartitionDirectory}
 import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
 import org.scalamock.scalatest.MockFactory
@@ -60,4 +60,36 @@ class TestResourceDetectUtilsByMock extends AnyWordSpec with 
MockFactory with Sh
       }
     }
   }
+
+  "getPaths" when {
+    "LayoutFileSourceScanExec" should {
+      "get root paths" in {
+        val paths = Seq(new Path("test"))
+        val fileIndex = mock[FileIndex]
+        (fileIndex.rootPaths _).expects().returning(paths).anyNumberOfTimes()
+        (fileIndex.partitionSchema _).expects().returning(new 
StructType()).anyNumberOfTimes()
+        val relation = HadoopFsRelation(fileIndex, new StructType(), new 
StructType(), null, null, null)(spark)
+        val sparkPlan = LayoutFileSourceScanExec(relation, Nil, 
relation.schema, Nil, None, None, Nil, None)
+        assert(paths == ResourceDetectUtils.getPaths(sparkPlan))
+      }
+    }
+  }
+
+  "getPaths" when {
+    "LayoutFileSourceScanExec" should {
+      "get partition paths" in {
+        val path = new Path("test")
+        val paths = Seq(path)
+        val fileIndex = mock[FileIndex]
+        val relation = HadoopFsRelation(fileIndex, new StructType(), new 
StructType(), null, null, null)(spark)
+        val sparkPlan = LayoutFileSourceScanExec(relation, Nil, 
relation.schema, null, None, None, Nil, None)
+        val dataFilters = Seq.empty
+        val fileStatus = new FileStatus()
+        fileStatus.setPath(path)
+        (fileIndex.partitionSchema 
_).expects().returning(StructType(StructField("f1", IntegerType, true) :: 
Nil)).anyNumberOfTimes()
+        (fileIndex.listFiles _).expects(null, 
dataFilters).returning(Seq(PartitionDirectory(null, 
Seq(fileStatus)))).anyNumberOfTimes()
+        assert(paths == ResourceDetectUtils.getPaths(sparkPlan))
+      }
+    }
+  }
 }
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
index b5640d7c29..48ef13da3b 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
@@ -18,8 +18,10 @@
 
 package org.apache.kylin.query.pushdown
 
-import com.google.common.collect.ImmutableList
-import io.kyligence.kap.guava20.shaded.common.collect.Lists
+import java.sql.Timestamp
+import java.util
+import java.util.{UUID, List => JList}
+
 import org.apache.commons.lang3.StringUtils
 import org.apache.kylin.common.util.{DateFormat, HadoopUtil, Pair}
 import org.apache.kylin.common.{KapConfig, KylinConfig, QueryContext}
@@ -28,7 +30,7 @@ import org.apache.kylin.metadata.query.StructField
 import org.apache.kylin.query.mask.QueryResultMasks
 import org.apache.kylin.query.runtime.plan.QueryToExecutionIDCache
 import org.apache.kylin.query.runtime.plan.ResultPlan.saveAsyncQueryResult
-import org.apache.kylin.query.util.{QueryUtil, SparkJobTrace}
+import org.apache.kylin.query.util.{QueryInterruptChecker, SparkJobTrace}
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.sql.hive.QueryMetricUtils
 import org.apache.spark.sql.hive.utils.ResourceDetectUtils
@@ -36,12 +38,13 @@ import org.apache.spark.sql.util.SparderTypeUtil
 import org.apache.spark.sql.{DataFrame, Row, SparderEnv, SparkSession}
 import org.slf4j.{Logger, LoggerFactory}
 
-import java.sql.Timestamp
-import java.util
-import java.util.{UUID, List => JList}
 import scala.collection.JavaConverters._
 import scala.collection.{immutable, mutable}
 
+import com.google.common.collect.ImmutableList
+
+import io.kyligence.kap.guava20.shaded.common.collect.Lists
+
 object SparkSqlClient {
   val DEFAULT_DB: String = "spark.sql.default.database"
 
@@ -138,7 +141,7 @@ object SparkSqlClient {
         if (e.isInstanceOf[InterruptedException]) {
           Thread.currentThread.interrupt()
           ss.sparkContext.cancelJobGroup(jobGroup)
-          QueryUtil.checkThreadInterrupted("Interrupted at the stage of 
collecting result in SparkSqlClient.",
+          QueryInterruptChecker.checkThreadInterrupted("Interrupted at the 
stage of collecting result in SparkSqlClient.",
             "Current step: Collecting dataset of push-down.")
         }
         throw e
@@ -164,7 +167,7 @@ object SparkSqlClient {
           val row = resultRows.next()
           readRowSize += 1;
           if (checkInterrupt && readRowSize % checkInterruptSize == 0) {
-            QueryUtil.checkThreadInterrupted("Interrupted at the stage of 
collecting result in SparkSqlClient.",
+            QueryInterruptChecker.checkThreadInterrupted("Interrupted at the 
stage of collecting result in SparkSqlClient.",
               "Current step: Collecting dataset of push-down.")
           }
           row.toSeq.map(rawValueToString(_)).asJava
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
index 8cc3a6ba25..7815e981b0 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala
@@ -18,8 +18,11 @@
 
 package org.apache.kylin.query.runtime.plan
 
-import com.google.common.cache.{Cache, CacheBuilder}
-import io.kyligence.kap.secondstorage.SecondStorageUtil
+import java.io.{File, FileOutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.atomic.AtomicLong
+import java.{lang, util}
+
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
 import org.apache.commons.io.IOUtils
 import org.apache.hadoop.fs.Path
@@ -33,7 +36,7 @@ import 
org.apache.kylin.query.engine.RelColumnMetaDataExtractor
 import org.apache.kylin.query.engine.exec.ExecuteResult
 import org.apache.kylin.query.pushdown.SparkSqlClient.readPushDownResultRow
 import org.apache.kylin.query.relnode.OLAPContext
-import org.apache.kylin.query.util.{AsyncQueryUtil, QueryUtil, SparkJobTrace, 
SparkQueryJobManager}
+import org.apache.kylin.query.util.{AsyncQueryUtil, QueryInterruptChecker, 
SparkJobTrace, SparkQueryJobManager}
 import org.apache.poi.xssf.usermodel.{XSSFSheet, XSSFWorkbook}
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.execution._
@@ -41,14 +44,14 @@ import org.apache.spark.sql.hive.QueryMetricUtils
 import org.apache.spark.sql.util.SparderTypeUtil
 import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparderEnv}
 
-import java.io.{File, FileOutputStream, OutputStreamWriter}
-import java.nio.charset.StandardCharsets
-import java.util.concurrent.atomic.AtomicLong
-import java.{lang, util}
 import scala.collection.JavaConverters._
 import scala.collection.convert.ImplicitConversions.`iterator asScala`
 import scala.collection.mutable
 
+import com.google.common.cache.{Cache, CacheBuilder}
+
+import io.kyligence.kap.secondstorage.SecondStorageUtil
+
 // scalastyle:off
 object ResultType extends Enumeration {
   type ResultType = Value
@@ -153,7 +156,7 @@ object ResultPlan extends LogEx {
         if (e.isInstanceOf[InterruptedException]) {
           Thread.currentThread.interrupt()
           sparkContext.cancelJobGroup(jobGroup)
-          QueryUtil.checkThreadInterrupted("Interrupted at the stage of 
collecting result in ResultPlan.",
+          QueryInterruptChecker.checkThreadInterrupted("Interrupted at the 
stage of collecting result in ResultPlan.",
             "Current step: Collecting dataset for sparder.")
         }
         throw e
diff --git 
a/src/spark-project/sparder/src/test/java/org/apache/kylin/query/pushdown/PushdownJobCancelTest.java
 
b/src/spark-project/sparder/src/test/java/org/apache/kylin/query/pushdown/PushdownJobCancelTest.java
index d5a2d5df51..cd1e9e9956 100644
--- 
a/src/spark-project/sparder/src/test/java/org/apache/kylin/query/pushdown/PushdownJobCancelTest.java
+++ 
b/src/spark-project/sparder/src/test/java/org/apache/kylin/query/pushdown/PushdownJobCancelTest.java
@@ -22,10 +22,10 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
 import org.apache.kylin.common.util.RandomUtil;
-import org.apache.kylin.query.SlowQueryDetector;
 import org.apache.kylin.query.exception.UserStopQueryException;
-import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
+import org.apache.kylin.query.util.SlowQueryDetector;
 import org.apache.spark.scheduler.JobFailed;
 import org.apache.spark.scheduler.SparkListener;
 import org.apache.spark.scheduler.SparkListenerJobEnd;
diff --git 
a/src/spark-project/sparder/src/test/java/org/apache/kylin/query/runtime/plan/TestResultPlan.java
 
b/src/spark-project/sparder/src/test/java/org/apache/kylin/query/runtime/plan/TestResultPlan.java
index 5dfcc6e66e..917bc3ecf9 100644
--- 
a/src/spark-project/sparder/src/test/java/org/apache/kylin/query/runtime/plan/TestResultPlan.java
+++ 
b/src/spark-project/sparder/src/test/java/org/apache/kylin/query/runtime/plan/TestResultPlan.java
@@ -27,14 +27,14 @@ import org.apache.kylin.common.KapConfig;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.common.exception.NewQueryRefuseException;
-import org.apache.kylin.query.SlowQueryDetector;
-import org.apache.kylin.query.exception.UserStopQueryException;
 import org.apache.kylin.common.state.StateSwitchConstant;
 import org.apache.kylin.common.util.AddressUtil;
 import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
 import org.apache.kylin.metadata.query.BigQueryThresholdUpdater;
 import org.apache.kylin.metadata.state.QueryShareStateManager;
 import org.apache.kylin.query.MockContext;
+import org.apache.kylin.query.exception.UserStopQueryException;
+import org.apache.kylin.query.util.SlowQueryDetector;
 import org.apache.spark.SparkConf;
 import org.apache.spark.scheduler.JobFailed;
 import org.apache.spark.scheduler.SparkListener;
@@ -63,8 +63,7 @@ public class TestResultPlan extends 
NLocalFileMetadataTestCase {
                 
"test@jdbc,driverClassName=org.h2.Driver,url=jdbc:h2:mem:db_default;DB_CLOSE_DELAY=-1,username=sa,password=");
         
getTestConfig().setProperty("kylin.query.share-state-switch-implement", "jdbc");
         
getTestConfig().setProperty("kylin.query.big-query-source-scan-rows-threshold", 
"100000000");
-        ss = SparkSession.builder().appName("local").master("local[1]")
-                .getOrCreate();
+        ss = 
SparkSession.builder().appName("local").master("local[1]").getOrCreate();
         SparderEnv.setSparkSession(ss);
         StructType schema = new StructType();
         schema = schema.add("TRANS_ID", DataTypes.LongType, false);
@@ -134,8 +133,8 @@ public class TestResultPlan extends 
NLocalFileMetadataTestCase {
                 QueryShareStateManager.getInstance().setState(
                         
Collections.singletonList(AddressUtil.concatInstanceName()),
                         StateSwitchConstant.QUERY_LIMIT_STATE, "false");
-                QueryContext.current().getMetrics()
-                        
.addAccumSourceScanRows(KapConfig.getInstanceFromEnv().getBigQuerySourceScanRowsThreshold()
 + 1);
+                QueryContext.current().getMetrics().addAccumSourceScanRows(
+                        
KapConfig.getInstanceFromEnv().getBigQuerySourceScanRowsThreshold() + 1);
                 String sql = "select * from TEST_KYLIN_FACT";
                 ResultPlan.getResult(ss.sql(sql), null);
             } catch (Exception e) {
@@ -148,7 +147,8 @@ public class TestResultPlan extends 
NLocalFileMetadataTestCase {
         queryThread.join();
         isJobEnd.await(10, TimeUnit.SECONDS);
         Assert.assertTrue(sparkJobEnd.get().jobResult() instanceof JobFailed);
-        
Assert.assertTrue(((JobFailed)sparkJobEnd.get().jobResult()).exception().getMessage().contains("cancelled
 part of cancelled job group"));
+        Assert.assertTrue(((JobFailed) 
sparkJobEnd.get().jobResult()).exception().getMessage()
+                .contains("cancelled part of cancelled job group"));
 
         Thread queryThread2 = new Thread(() -> {
             try {
diff --git 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/hive/utils/ResourceDetectUtils.scala
 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/hive/utils/ResourceDetectUtils.scala
index e0bdefb303..2a27fc27ec 100644
--- 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/hive/utils/ResourceDetectUtils.scala
+++ 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/hive/utils/ResourceDetectUtils.scala
@@ -24,18 +24,18 @@ import java.util.concurrent.ForkJoinPool
 import java.util.concurrent.atomic.AtomicLong
 import java.util.{Map => JMap}
 
-import com.google.common.collect.Maps
-import com.google.gson.Gson
-import com.google.gson.reflect.TypeToken
 import org.apache.hadoop.conf.Configuration
-import org.apache.kylin.metadata.cube.model.{DimensionRangeInfo, LayoutEntity}
 import org.apache.hadoop.fs._
 import org.apache.kylin.common.KylinConfig
 import org.apache.kylin.common.util.HadoopUtil
+import org.apache.kylin.metadata.cube.model.{DimensionRangeInfo, LayoutEntity}
+import org.apache.kylin.query.util.QueryInterruptChecker
 import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
+import org.apache.spark.sql.execution.datasources.FileIndex
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
ReusedExchangeExec}
-import org.apache.spark.sql.execution._
 import org.apache.spark.sql.hive.execution.HiveTableScanExec
 import org.apache.spark.sql.sources.NBaseRelation
 
@@ -43,36 +43,32 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.parallel.ForkJoinTaskSupport
 
+import com.google.common.collect.Maps
+import com.google.gson.Gson
+import com.google.gson.reflect.TypeToken
+
 object ResourceDetectUtils extends Logging {
   private val json = new Gson()
 
+  private val errorMsgLog: String = "Interrupted at the stage of get paths in 
ResourceDetectUtils."
+
   def getPaths(plan: SparkPlan): Seq[Path] = {
     var paths = Seq.empty[Path]
     plan.foreach {
       case plan: FileSourceScanExec =>
-        if (plan.relation.location.partitionSchema.nonEmpty) {
-          val selectedPartitions = 
plan.relation.location.listFiles(plan.partitionFilters, plan.dataFilters)
-          selectedPartitions.flatMap(partition => 
partition.files).foreach(file => {
-            paths :+= file.getPath
-          })
-        } else {
-          paths ++= plan.relation.location.rootPaths
-        }
+        val info = "Current step: get Partition file status of 
FileSourceScanExec."
+        paths ++= getFilePaths(plan.relation.location, plan.partitionFilters, 
plan.dataFilters, info)
       case plan: LayoutFileSourceScanExec =>
-        if (plan.relation.location.partitionSchema.nonEmpty) {
-          val selectedPartitions = 
plan.relation.location.listFiles(plan.partitionFilters, plan.dataFilters)
-          selectedPartitions.flatMap(partition => 
partition.files).foreach(file => {
-            paths :+= file.getPath
-          })
-        } else {
-          paths ++= plan.relation.location.rootPaths
-        }
+        val info = "Current step: get Partition file status of 
LayoutFileSourceScanExec."
+        paths ++= getFilePaths(plan.relation.location, plan.partitionFilters, 
plan.dataFilters, info)
       case plan: InMemoryTableScanExec =>
         val _plan = plan.relation.cachedPlan
         paths ++= getPaths(_plan)
       case plan: HiveTableScanExec =>
         if (plan.relation.isPartitioned) {
           plan.rawPartitions.foreach { partition =>
+            QueryInterruptChecker.checkThreadInterrupted(errorMsgLog,
+              "Current step: get Partition file status of HiveTableScanExec.")
             paths ++= partition.getPath
           }
         } else {
@@ -89,6 +85,22 @@ object ResourceDetectUtils extends Logging {
     paths
   }
 
+  def getFilePaths(fileIndex: FileIndex, partitionFilters: Seq[Expression], 
dataFilters: Seq[Expression], info: String): Seq[Path] = {
+    var paths = Seq.empty[Path]
+    if (fileIndex.partitionSchema.nonEmpty) {
+      val selectedPartitions = fileIndex.listFiles(partitionFilters, 
dataFilters)
+      selectedPartitions.flatMap(partition => {
+        QueryInterruptChecker.checkThreadInterrupted(errorMsgLog, info)
+        partition.files
+      }).foreach(file => {
+        paths :+= file.getPath
+      })
+    } else {
+      paths ++= fileIndex.rootPaths
+    }
+    paths
+  }
+
   def getPartitions(plan: SparkPlan): String = {
     val leafNodePartitionsLengthMap: mutable.Map[String, Int] = mutable.Map()
     var pNum = 0

Reply via email to