This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 07a4be3795fbad66a66f92dfdb96462eccaa1061 Author: fanfanAlice <41991994+fanfanal...@users.noreply.github.com> AuthorDate: Fri Mar 3 09:48:11 2023 +0800 KYLIN-5545 Try best to interrupt running queries and limit the number of queries to run. --- .../rest/controller/HealthControllerTest.java | 2 +- .../apache/kylin/rest/service/HealthService.java | 2 +- .../kylin/rest/service/HealthServiceTest.java | 2 +- .../org/apache/kylin/common/KylinConfigBase.java | 4 + .../query/exception/UserStopQueryException.java | 0 .../org/apache/kylin/query/util/CancelFlag.java | 49 +++++++ .../kylin/query/util/QueryInterruptChecker.java | 46 +++++++ .../kylin/query/util}/SlowQueryDetector.java | 3 +- .../apache/kylin/newten/SlowQueryDetectorTest.java | 2 +- .../org/apache/kylin/query/util/PushDownUtil.java | 2 +- .../org/apache/kylin/query/util/QueryUtil.java | 18 +-- .../query/util/RestoreFromComputedColumn.java | 4 +- .../apache/kylin/rest/service/QueryService.java | 4 +- .../kylin/rest/service/QueryServiceTest.java | 2 +- .../org/apache/kylin/query/engine/QueryExec.java | 6 +- .../kylin/query/engine/QueryRoutingEngine.java | 66 ++++++--- .../query/util/PushDownQueryRequestLimits.java} | 28 ++-- .../apache/kylin/query/SlowQueryDetectorTest.java | 1 + .../kylin/query/engine/QueryRoutingEngineTest.java | 148 +++++++++++++++++++++ .../hive/utils/TestResourceDetectUtilsByMock.scala | 34 ++++- .../kylin/query/pushdown/SparkSqlClient.scala | 19 +-- .../kylin/query/runtime/plan/ResultPlan.scala | 19 +-- .../query/pushdown/PushdownJobCancelTest.java | 4 +- .../kylin/query/runtime/plan/TestResultPlan.java | 14 +- .../spark/sql/hive/utils/ResourceDetectUtils.scala | 54 +++++--- 25 files changed, 431 insertions(+), 102 deletions(-) diff --git a/src/common-server/src/test/java/org/apache/kylin/rest/controller/HealthControllerTest.java b/src/common-server/src/test/java/org/apache/kylin/rest/controller/HealthControllerTest.java index a0f2aa269f..e6c2998605 100644 --- a/src/common-server/src/test/java/org/apache/kylin/rest/controller/HealthControllerTest.java +++ b/src/common-server/src/test/java/org/apache/kylin/rest/controller/HealthControllerTest.java @@ -20,7 +20,7 @@ package org.apache.kylin.rest.controller; import java.util.List; import java.util.concurrent.ConcurrentMap; -import org.apache.kylin.query.SlowQueryDetector; +import org.apache.kylin.query.util.SlowQueryDetector; import org.apache.kylin.rest.service.ServiceTestBase; import org.apache.kylin.rest.response.HealthResponse; import org.apache.spark.sql.SparderEnv; diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/service/HealthService.java b/src/common-service/src/main/java/org/apache/kylin/rest/service/HealthService.java index 264993f59c..afb27b583c 100644 --- a/src/common-service/src/main/java/org/apache/kylin/rest/service/HealthService.java +++ b/src/common-service/src/main/java/org/apache/kylin/rest/service/HealthService.java @@ -20,7 +20,7 @@ package org.apache.kylin.rest.service; import java.util.List; import java.util.Map; -import org.apache.kylin.query.SlowQueryDetector; +import org.apache.kylin.query.util.SlowQueryDetector; import org.apache.kylin.rest.response.HealthResponse; import org.apache.spark.sql.SparderEnv; import org.springframework.stereotype.Component; diff --git a/src/common-service/src/test/java/org/apache/kylin/rest/service/HealthServiceTest.java b/src/common-service/src/test/java/org/apache/kylin/rest/service/HealthServiceTest.java index b598e3f4fb..d5f33aa471 100644 --- a/src/common-service/src/test/java/org/apache/kylin/rest/service/HealthServiceTest.java +++ b/src/common-service/src/test/java/org/apache/kylin/rest/service/HealthServiceTest.java @@ -20,7 +20,7 @@ package org.apache.kylin.rest.service; import java.util.List; import java.util.concurrent.ConcurrentMap; -import org.apache.kylin.query.SlowQueryDetector; +import org.apache.kylin.query.util.SlowQueryDetector; import org.apache.kylin.rest.response.HealthResponse; import org.apache.spark.sql.SparderEnv; import org.junit.After; diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index fccc636084..f887fbd30a 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -3842,6 +3842,10 @@ public abstract class KylinConfigBase implements Serializable { return Long.parseLong(getOptional("kylin.query.max-measure-segment-pruner-before-days", "-1")); } + public int getQueryConcurrentRunningThresholdForPushDown() { + return Integer.parseInt(getOptional("kylin.query.pushdown-concurrent-running-threshold", "10")); + } + // ============================================================================ // Cost based index Planner // ============================================================================ diff --git a/src/query-common/src/main/java/org/apache/kylin/query/exception/UserStopQueryException.java b/src/core-common/src/main/java/org/apache/kylin/query/exception/UserStopQueryException.java similarity index 100% copy from src/query-common/src/main/java/org/apache/kylin/query/exception/UserStopQueryException.java copy to src/core-common/src/main/java/org/apache/kylin/query/exception/UserStopQueryException.java diff --git a/src/core-common/src/main/java/org/apache/kylin/query/util/CancelFlag.java b/src/core-common/src/main/java/org/apache/kylin/query/util/CancelFlag.java new file mode 100644 index 0000000000..1c1885fb60 --- /dev/null +++ b/src/core-common/src/main/java/org/apache/kylin/query/util/CancelFlag.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.query.util; + +import java.util.concurrent.atomic.AtomicBoolean; + +import com.google.common.base.Preconditions; + +public class CancelFlag { + public final AtomicBoolean atomicBoolean; + private static final ThreadLocal<CancelFlag> CONTEXT_CANCEL_FLAG = ThreadLocal + .withInitial(() -> new CancelFlag(new AtomicBoolean(false))); + + public CancelFlag(AtomicBoolean atomicBoolean) { + this.atomicBoolean = Preconditions.checkNotNull(atomicBoolean); + } + + public static CancelFlag getContextCancelFlag() { + return CONTEXT_CANCEL_FLAG.get(); + } + + public boolean isCancelRequested() { + return this.atomicBoolean.get(); + } + + public void requestCancel() { + this.atomicBoolean.compareAndSet(false, true); + } + + public void clearCancel() { + this.atomicBoolean.compareAndSet(true, false); + } +} diff --git a/src/core-common/src/main/java/org/apache/kylin/query/util/QueryInterruptChecker.java b/src/core-common/src/main/java/org/apache/kylin/query/util/QueryInterruptChecker.java new file mode 100644 index 0000000000..b163cc5a31 --- /dev/null +++ b/src/core-common/src/main/java/org/apache/kylin/query/util/QueryInterruptChecker.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.query.util; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.QueryContext; +import org.apache.kylin.common.exception.KylinTimeoutException; +import org.apache.kylin.query.exception.UserStopQueryException; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class QueryInterruptChecker { + + private QueryInterruptChecker() { + // This is Utils. + } + + public static void checkThreadInterrupted(String errorMsgLog, String stepInfo) { + if (Thread.currentThread().isInterrupted()) { + log.error("{} {}", QueryContext.current().getQueryId(), errorMsgLog); + if (SlowQueryDetector.getRunningQueries().get(Thread.currentThread()).isStopByUser()) { + throw new UserStopQueryException(""); + } + QueryContext.current().getQueryTagInfo().setTimeout(true); + throw new KylinTimeoutException("The query exceeds the set time limit of " + + KylinConfig.getInstanceFromEnv().getQueryTimeoutSeconds() + "s. " + stepInfo); + } + } +} diff --git a/src/query-common/src/main/java/org/apache/kylin/query/SlowQueryDetector.java b/src/core-common/src/main/java/org/apache/kylin/query/util/SlowQueryDetector.java similarity index 98% rename from src/query-common/src/main/java/org/apache/kylin/query/SlowQueryDetector.java rename to src/core-common/src/main/java/org/apache/kylin/query/util/SlowQueryDetector.java index 136127a3b3..387399d161 100644 --- a/src/query-common/src/main/java/org/apache/kylin/query/SlowQueryDetector.java +++ b/src/core-common/src/main/java/org/apache/kylin/query/util/SlowQueryDetector.java @@ -16,12 +16,11 @@ * limitations under the License. */ -package org.apache.kylin.query; +package org.apache.kylin.query.util; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import org.apache.calcite.util.CancelFlag; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.QueryContext; import org.slf4j.Logger; diff --git a/src/kylin-it/src/test/java/org/apache/kylin/newten/SlowQueryDetectorTest.java b/src/kylin-it/src/test/java/org/apache/kylin/newten/SlowQueryDetectorTest.java index 3e478f940c..15301252ab 100644 --- a/src/kylin-it/src/test/java/org/apache/kylin/newten/SlowQueryDetectorTest.java +++ b/src/kylin-it/src/test/java/org/apache/kylin/newten/SlowQueryDetectorTest.java @@ -33,12 +33,12 @@ import org.apache.kylin.engine.spark.NLocalWithSparkSessionTest; import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.impl.threadpool.NDefaultScheduler; import org.apache.kylin.metadata.project.NProjectManager; -import org.apache.kylin.query.SlowQueryDetector; import org.apache.kylin.query.engine.QueryExec; import org.apache.kylin.query.pushdown.SparkSqlClient; import org.apache.kylin.query.runtime.plan.ResultPlan; import org.apache.kylin.query.util.QueryParams; import org.apache.kylin.query.util.QueryUtil; +import org.apache.kylin.query.util.SlowQueryDetector; import org.apache.kylin.util.ExecAndComp; import org.apache.spark.sql.SparderEnv; import org.junit.After; diff --git a/src/query-common/src/main/java/org/apache/kylin/query/util/PushDownUtil.java b/src/query-common/src/main/java/org/apache/kylin/query/util/PushDownUtil.java index 60c02d73dc..10ca1ecf94 100644 --- a/src/query-common/src/main/java/org/apache/kylin/query/util/PushDownUtil.java +++ b/src/query-common/src/main/java/org/apache/kylin/query/util/PushDownUtil.java @@ -200,7 +200,7 @@ public class PushDownUtil { .map(c -> c.getClass().getCanonicalName()).collect(Collectors.joining(","))); } for (IPushDownConverter converter : pushDownConverters) { - QueryUtil.checkThreadInterrupted("Interrupted sql transformation at the stage of " + converter.getClass(), + QueryInterruptChecker.checkThreadInterrupted("Interrupted sql transformation at the stage of " + converter.getClass(), "Current step: Massage push-down sql. "); sql = converter.convert(sql, queryParams.getProject(), queryParams.getDefaultSchema()); } diff --git a/src/query-common/src/main/java/org/apache/kylin/query/util/QueryUtil.java b/src/query-common/src/main/java/org/apache/kylin/query/util/QueryUtil.java index 77c7b2784d..69f4711099 100644 --- a/src/query-common/src/main/java/org/apache/kylin/query/util/QueryUtil.java +++ b/src/query-common/src/main/java/org/apache/kylin/query/util/QueryUtil.java @@ -32,14 +32,11 @@ import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KapConfig; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.QueryContext; -import org.apache.kylin.common.exception.KylinTimeoutException; import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.common.util.StringHelper; import org.apache.kylin.metadata.project.NProjectManager; import org.apache.kylin.metadata.query.BigQueryThresholdUpdater; import org.apache.kylin.query.IQueryTransformer; -import org.apache.kylin.query.SlowQueryDetector; -import org.apache.kylin.query.exception.UserStopQueryException; import org.apache.kylin.query.security.AccessDeniedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -218,7 +215,8 @@ public class QueryUtil { } for (IQueryTransformer t : transformers) { - QueryUtil.checkThreadInterrupted("Interrupted sql transformation at the stage of " + t.getClass(), + QueryInterruptChecker.checkThreadInterrupted( + "Interrupted sql transformation at the stage of " + t.getClass(), "Current step: SQL transformation."); sql = t.transform(sql, queryParams.getProject(), queryParams.getDefaultSchema()); } @@ -317,16 +315,4 @@ public class QueryUtil { } return sqlSelect; } - - public static void checkThreadInterrupted(String errorMsgLog, String stepInfo) { - if (Thread.currentThread().isInterrupted()) { - log.error("{} {}", QueryContext.current().getQueryId(), errorMsgLog); - if (SlowQueryDetector.getRunningQueries().get(Thread.currentThread()).isStopByUser()) { - throw new UserStopQueryException(""); - } - QueryContext.current().getQueryTagInfo().setTimeout(true); - throw new KylinTimeoutException("The query exceeds the set time limit of " - + KylinConfig.getInstanceFromEnv().getQueryTimeoutSeconds() + "s. " + stepInfo); - } - } } diff --git a/src/query-common/src/main/java/org/apache/kylin/query/util/RestoreFromComputedColumn.java b/src/query-common/src/main/java/org/apache/kylin/query/util/RestoreFromComputedColumn.java index 8ae1337b11..406aa5ba5c 100644 --- a/src/query-common/src/main/java/org/apache/kylin/query/util/RestoreFromComputedColumn.java +++ b/src/query-common/src/main/java/org/apache/kylin/query/util/RestoreFromComputedColumn.java @@ -81,7 +81,7 @@ public class RestoreFromComputedColumn implements IPushDownConverter { int maxRecursionTimes = KapConfig.getInstanceFromEnv().getComputedColumnMaxRecursionTimes(); while (recursionTimes < maxRecursionTimes) { - QueryUtil.checkThreadInterrupted("Interrupted sql transformation at the stage of RestoreFromComputedColumn", + QueryInterruptChecker.checkThreadInterrupted("Interrupted sql transformation at the stage of RestoreFromComputedColumn", "Current step: SQL transformation"); recursionTimes++; boolean recursionCompleted = true; @@ -181,7 +181,7 @@ public class RestoreFromComputedColumn implements IPushDownConverter { for (NDataModel model : modelMap.values()) { QueryAliasMatchInfo info = model.getComputedColumnDescs().isEmpty() ? null : queryAliasMatcher.match(model, sqlSelect); - QueryUtil.checkThreadInterrupted("Interrupted sql transformation at the stage of RestoreFromComputedColumn", + QueryInterruptChecker.checkThreadInterrupted("Interrupted sql transformation at the stage of RestoreFromComputedColumn", "Current step: SQL transformation"); if (info == null) { continue; diff --git a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java index 9a3c19a10c..14eacb5d43 100644 --- a/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/src/query-service/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -108,7 +108,6 @@ import org.apache.kylin.metadata.querymeta.TableMeta; import org.apache.kylin.metadata.querymeta.TableMetaWithType; import org.apache.kylin.metadata.realization.NoRealizationFoundException; import org.apache.kylin.metadata.realization.RoutingIndicatorException; -import org.apache.kylin.query.SlowQueryDetector; import org.apache.kylin.query.blacklist.SQLBlacklistItem; import org.apache.kylin.query.blacklist.SQLBlacklistManager; import org.apache.kylin.query.calcite.KEDialect; @@ -128,6 +127,7 @@ import org.apache.kylin.query.util.QueryParams; import org.apache.kylin.query.util.QueryUtil; import org.apache.kylin.query.util.RawSql; import org.apache.kylin.query.util.RawSqlParser; +import org.apache.kylin.query.util.SlowQueryDetector; import org.apache.kylin.query.util.TokenMgrError; import org.apache.kylin.rest.aspect.Transaction; import org.apache.kylin.rest.cluster.ClusterManager; @@ -930,7 +930,7 @@ public class QueryService extends BasicService implements CacheSignatureQuerySup private int getSqlConcurrentCount(SQLBlacklistItem sqlBlacklistItem) { int concurrentCount = 0; - Collection<SlowQueryDetector.QueryEntry> runningQueries = slowQueryDetector.getRunningQueries().values(); + Collection<SlowQueryDetector.QueryEntry> runningQueries = SlowQueryDetector.getRunningQueries().values(); for (SlowQueryDetector.QueryEntry query : runningQueries) { if (sqlBlacklistItem.match(query.getSql())) { concurrentCount++; diff --git a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java index 46a0ca4b2c..2470fbecf4 100644 --- a/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java +++ b/src/query-service/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java @@ -103,7 +103,6 @@ import org.apache.kylin.metadata.querymeta.TableMeta; import org.apache.kylin.metadata.querymeta.TableMetaWithType; import org.apache.kylin.metadata.realization.IRealization; import org.apache.kylin.metadata.realization.RealizationStatusEnum; -import org.apache.kylin.query.SlowQueryDetector; import org.apache.kylin.metadata.user.ManagedUser; import org.apache.kylin.query.blacklist.SQLBlacklistItem; import org.apache.kylin.query.blacklist.SQLBlacklistManager; @@ -116,6 +115,7 @@ import org.apache.kylin.query.util.DateNumberFilterTransformer; import org.apache.kylin.query.util.QueryParams; import org.apache.kylin.query.util.QueryUtil; import org.apache.kylin.query.util.RawSqlParser; +import org.apache.kylin.query.util.SlowQueryDetector; import org.apache.kylin.rest.cluster.ClusterManager; import org.apache.kylin.rest.cluster.DefaultClusterManager; import org.apache.kylin.rest.config.AppConfig; diff --git a/src/query/src/main/java/org/apache/kylin/query/engine/QueryExec.java b/src/query/src/main/java/org/apache/kylin/query/engine/QueryExec.java index ac2e8f673c..6ce92a61b5 100644 --- a/src/query/src/main/java/org/apache/kylin/query/engine/QueryExec.java +++ b/src/query/src/main/java/org/apache/kylin/query/engine/QueryExec.java @@ -68,6 +68,7 @@ import org.apache.kylin.query.relnode.OLAPContext; import org.apache.kylin.query.util.AsyncQueryUtil; import org.apache.kylin.query.util.CalcitePlanRouterVisitor; import org.apache.kylin.query.util.HepUtils; +import org.apache.kylin.query.util.QueryInterruptChecker; import org.apache.kylin.query.util.QueryUtil; import org.apache.kylin.query.util.RelAggPushDownUtil; import org.slf4j.Logger; @@ -410,12 +411,13 @@ public class QueryExec { RelAggPushDownUtil.clearUnmatchedJoinDigest(); return new SparderQueryPlanExec().executeToIterable(transformed, dataContext); } catch (NoRealizationFoundException e) { - QueryUtil.checkThreadInterrupted("Interrupted SparderQueryOptimized NoRealizationFoundException", + QueryInterruptChecker.checkThreadInterrupted( + "Interrupted SparderQueryOptimized NoRealizationFoundException", "Current step: TableIndex join snapshot aggPushDown"); tryTimes = tryTimes + 1; return sparderQueryOptimized(transformed, tryTimes, postOptRules); } catch (Exception e) { - QueryUtil.checkThreadInterrupted("Interrupted SparderQueryOptimized error", + QueryInterruptChecker.checkThreadInterrupted("Interrupted SparderQueryOptimized error", "Current step: Table Index join snapshot aggPushDown"); setSparderQueryOptimizedExceptionMsg(e.getMessage()); return null; diff --git a/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java b/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java index cad6d87c86..e4ed2992d0 100644 --- a/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java +++ b/src/query/src/main/java/org/apache/kylin/query/engine/QueryRoutingEngine.java @@ -28,6 +28,8 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import org.apache.calcite.avatica.ColumnMetaData; import org.apache.calcite.prepare.CalcitePrepareImpl; @@ -50,10 +52,13 @@ import org.apache.kylin.metadata.query.StructField; import org.apache.kylin.metadata.querymeta.SelectedColumnMeta; import org.apache.kylin.metadata.realization.NoStreamingRealizationFoundException; import org.apache.kylin.query.engine.data.QueryResult; +import org.apache.kylin.query.exception.BusyQueryException; import org.apache.kylin.query.exception.NotSupportedSQLException; import org.apache.kylin.query.mask.QueryResultMasks; import org.apache.kylin.query.relnode.OLAPContext; +import org.apache.kylin.query.util.PushDownQueryRequestLimits; import org.apache.kylin.query.util.PushDownUtil; +import org.apache.kylin.query.util.QueryInterruptChecker; import org.apache.kylin.query.util.QueryParams; import org.apache.kylin.query.util.QueryUtil; import org.apache.kylin.source.adhocquery.PushdownResult; @@ -261,24 +266,55 @@ public class QueryRoutingEngine { public PushdownResult tryPushDownSelectQuery(QueryParams queryParams, SQLException sqlException, boolean isPrepare) throws Exception { QueryContext.currentTrace().startSpan(QueryTrace.SQL_PUSHDOWN_TRANSFORMATION); - String sqlString = queryParams.getSql(); - if (isPrepareStatementWithParams(queryParams)) { - sqlString = queryParams.getPrepareSql(); - } + Semaphore semaphore = PushDownQueryRequestLimits.getSingletonInstance(); + logger.info("Query: {} Before the current push down counter {}.", QueryContext.current().getQueryId(), + semaphore.availablePermits()); + boolean acquired = false; + boolean asyncQuery = QueryContext.current().getQueryTagInfo().isAsyncQuery(); + KylinConfig projectConfig = NProjectManager.getProjectConfig(queryParams.getProject()); + try { + //SlowQueryDetector query timeout period is system-level + int queryTimeout = KylinConfig.getInstanceFromEnv().getQueryTimeoutSeconds(); + if (!asyncQuery && projectConfig.isPushDownEnabled()) { + acquired = semaphore.tryAcquire(queryTimeout, TimeUnit.SECONDS); + if (!acquired) { + logger.info("query: {} failed to get acquire.", QueryContext.current().getQueryId()); + throw new BusyQueryException("Query rejected. Caused by PushDown query server is too busy."); + } else { + logger.info("query: {} success to get acquire.", QueryContext.current().getQueryId()); + } + } + String sqlString = queryParams.getSql(); + if (isPrepareStatementWithParams(queryParams)) { + sqlString = queryParams.getPrepareSql(); + } - if (BackdoorToggles.getPrepareOnly()) { - sqlString = QueryUtil.addLimit(sqlString); - } + if (BackdoorToggles.getPrepareOnly()) { + sqlString = QueryUtil.addLimit(sqlString); + } - String massagedSql = QueryUtil.appendLimitOffset(queryParams.getProject(), sqlString, queryParams.getLimit(), - queryParams.getOffset()); - if (isPrepareStatementWithParams(queryParams)) { - QueryContext.current().getMetrics().setCorrectedSql(massagedSql); + String massagedSql = QueryUtil.appendLimitOffset(queryParams.getProject(), sqlString, + queryParams.getLimit(), queryParams.getOffset()); + if (isPrepareStatementWithParams(queryParams)) { + QueryContext.current().getMetrics().setCorrectedSql(massagedSql); + } + queryParams.setSql(massagedSql); + queryParams.setSqlException(sqlException); + queryParams.setPrepare(isPrepare); + return PushDownUtil.tryIterQuery(queryParams); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + QueryInterruptChecker.checkThreadInterrupted("Interrupted sql push down at the stage of QueryRoutingEngine", + "Current step: try push down select query"); + throw e; + } finally { + if (acquired) { + semaphore.release(); + logger.info("Query: {} success to release acquire", QueryContext.current().getQueryId()); + } + logger.info("Query: {} After the current push down counter {}.", QueryContext.current().getQueryId(), + semaphore.availablePermits()); } - queryParams.setSql(massagedSql); - queryParams.setSqlException(sqlException); - queryParams.setPrepare(isPrepare); - return PushDownUtil.tryIterQuery(queryParams); } private boolean isPrepareStatementWithParams(QueryParams queryParams) { diff --git a/src/query-common/src/main/java/org/apache/kylin/query/exception/UserStopQueryException.java b/src/query/src/main/java/org/apache/kylin/query/util/PushDownQueryRequestLimits.java similarity index 55% rename from src/query-common/src/main/java/org/apache/kylin/query/exception/UserStopQueryException.java rename to src/query/src/main/java/org/apache/kylin/query/util/PushDownQueryRequestLimits.java index 01d5c565ce..5234b99b43 100644 --- a/src/query-common/src/main/java/org/apache/kylin/query/exception/UserStopQueryException.java +++ b/src/query/src/main/java/org/apache/kylin/query/util/PushDownQueryRequestLimits.java @@ -16,20 +16,28 @@ * limitations under the License. */ -package org.apache.kylin.query.exception; +package org.apache.kylin.query.util; -import static org.apache.kylin.common.exception.QueryErrorCode.USER_STOP_QUERY; +import java.util.concurrent.Semaphore; -import org.apache.commons.lang3.exception.ExceptionUtils; -import org.apache.kylin.common.exception.KylinException; +import org.apache.kylin.common.KylinConfig; -public class UserStopQueryException extends KylinException { +public class PushDownQueryRequestLimits { - public UserStopQueryException(String message) { - super(USER_STOP_QUERY, message); + private PushDownQueryRequestLimits() { } - public static boolean causedByUserStop(Throwable e) { - return e instanceof UserStopQueryException || ExceptionUtils.getRootCause(e) instanceof UserStopQueryException; + private static volatile Semaphore instance; + + public static Semaphore getSingletonInstance() { + if (instance == null) { + synchronized (PushDownQueryRequestLimits.class) { + if (instance == null) { + instance = new Semaphore( + KylinConfig.getInstanceFromEnv().getQueryConcurrentRunningThresholdForPushDown(), true); + } + } + } + return instance; } -} \ No newline at end of file +} diff --git a/src/query/src/test/java/org/apache/kylin/query/SlowQueryDetectorTest.java b/src/query/src/test/java/org/apache/kylin/query/SlowQueryDetectorTest.java index 8c14acd1ed..61defcba39 100644 --- a/src/query/src/test/java/org/apache/kylin/query/SlowQueryDetectorTest.java +++ b/src/query/src/test/java/org/apache/kylin/query/SlowQueryDetectorTest.java @@ -20,6 +20,7 @@ package org.apache.kylin.query; import java.util.concurrent.TimeUnit; +import org.apache.kylin.query.util.SlowQueryDetector; import org.junit.After; import org.junit.Assert; import org.junit.Before; diff --git a/src/query/src/test/java/org/apache/kylin/query/engine/QueryRoutingEngineTest.java b/src/query/src/test/java/org/apache/kylin/query/engine/QueryRoutingEngineTest.java index 7599095bf6..44ab952e70 100644 --- a/src/query/src/test/java/org/apache/kylin/query/engine/QueryRoutingEngineTest.java +++ b/src/query/src/test/java/org/apache/kylin/query/engine/QueryRoutingEngineTest.java @@ -25,12 +25,18 @@ import java.sql.Date; import java.sql.SQLException; import java.sql.Time; import java.sql.Timestamp; +import java.util.UUID; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.QueryContext; +import org.apache.kylin.common.exception.CommonErrorCode; import org.apache.kylin.common.exception.KylinException; +import org.apache.kylin.common.exception.KylinTimeoutException; import org.apache.kylin.common.exception.NewQueryRefuseException; import org.apache.kylin.common.exception.QueryErrorCode; +import org.apache.kylin.common.exception.ServerErrorCode; import org.apache.kylin.common.exception.TargetSegmentNotFoundException; import org.apache.kylin.common.persistence.InMemResourceStore; import org.apache.kylin.common.persistence.ResourceStore; @@ -39,8 +45,12 @@ import org.apache.kylin.common.util.NLocalFileMetadataTestCase; import org.apache.kylin.metadata.realization.NoStreamingRealizationFoundException; import org.apache.kylin.query.QueryExtension; import org.apache.kylin.query.engine.data.QueryResult; +import org.apache.kylin.query.exception.BusyQueryException; import org.apache.kylin.query.exception.NotSupportedSQLException; +import org.apache.kylin.query.exception.UserStopQueryException; +import org.apache.kylin.query.util.PushDownQueryRequestLimits; import org.apache.kylin.query.util.QueryParams; +import org.apache.kylin.query.util.SlowQueryDetector; import org.apache.kylin.source.adhocquery.PushdownResult; import org.apache.spark.SparkException; import org.junit.After; @@ -48,6 +58,8 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; +import org.mockito.MockSettings; +import org.mockito.MockedStatic; import org.mockito.Mockito; public class QueryRoutingEngineTest extends NLocalFileMetadataTestCase { @@ -320,4 +332,140 @@ public class QueryRoutingEngineTest extends NLocalFileMetadataTestCase { Assert.assertThrows(NotSupportedSQLException.class, () -> queryRoutingEngine.queryWithSqlMassage(queryParams)); } + + @Test + public void testQueryPushDownFail() { + final String sql = "SELECT 1"; + final String project = "tpch"; + KylinConfig kylinconfig = KylinConfig.getInstanceFromEnv(); + kylinconfig.setProperty("kylin.query.timeout-seconds", "5"); + Semaphore semaphore = new Semaphore(0, true); + try (MockedStatic<PushDownQueryRequestLimits> pushRequest = Mockito + .mockStatic(PushDownQueryRequestLimits.class)) { + pushRequest.when((MockedStatic.Verification) PushDownQueryRequestLimits.getSingletonInstance()) + .thenReturn(semaphore); + QueryParams queryParams = new QueryParams(); + queryParams.setForcedToPushDown(true); + queryParams.setProject(project); + queryParams.setSql(sql); + queryParams.setKylinConfig(kylinconfig); + queryParams.setSelect(true); + QueryRoutingEngine queryRoutingEngine = Mockito.spy(QueryRoutingEngine.class); + try { + queryRoutingEngine.tryPushDownSelectQuery(queryParams, null, true); + Assert.fail("Query rejected. Caused by PushDown query server is too busy"); + } catch (Exception e) { + Assert.assertTrue(e instanceof BusyQueryException); + Assert.assertEquals(QueryErrorCode.BUSY_QUERY.toErrorCode(), ((BusyQueryException) e).getErrorCode()); + } + } + } + + @Test + public void testQueryPushDownSuccess() { + final String sql = "SELECT 1"; + final String project = "tpch"; + KylinConfig kylinconfig = KylinConfig.getInstanceFromEnv(); + QueryParams queryParams = new QueryParams(); + queryParams.setForcedToPushDown(true); + queryParams.setProject(project); + queryParams.setSql(sql); + queryParams.setKylinConfig(kylinconfig); + queryParams.setSelect(true); + try { + queryRoutingEngine.tryPushDownSelectQuery(queryParams, null, true); + Assert.fail("Can't complete the operation. Please check the Spark environment and try again."); + } catch (Exception e) { + Assert.assertTrue(e instanceof KylinException); + Assert.assertEquals(ServerErrorCode.SPARK_FAILURE.toErrorCode(), ((KylinException) e).getErrorCode()); + } + } + + @Test + public void testQueryAsync() { + final String sql = "SELECT 1"; + final String project = "tpch"; + KylinConfig kylinconfig = KylinConfig.getInstanceFromEnv(); + QueryParams queryParams = new QueryParams(); + queryParams.setForcedToPushDown(true); + queryParams.setProject(project); + queryParams.setSql(sql); + queryParams.setKylinConfig(kylinconfig); + queryParams.setSelect(true); + try { + QueryContext.current().getQueryTagInfo().setAsyncQuery(true); + queryRoutingEngine.tryPushDownSelectQuery(queryParams, null, true); + Assert.fail("Can't complete the operation. Please check the Spark environment and try again."); + } catch (Exception e) { + Assert.assertTrue(e instanceof KylinException); + Assert.assertEquals(ServerErrorCode.SPARK_FAILURE.toErrorCode(), ((KylinException) e).getErrorCode()); + } + } + + @Test + public void testQueryInterruptedTimeOut() { + final String sql = "SELECT 1"; + final String project = "tpch"; + KylinConfig kylinconfig = KylinConfig.getInstanceFromEnv(); + QueryParams queryParams = new QueryParams(); + queryParams.setForcedToPushDown(true); + queryParams.setProject(project); + queryParams.setSql(sql); + queryParams.setKylinConfig(kylinconfig); + queryParams.setSelect(true); + MockSettings mockSettings = Mockito.withSettings().defaultAnswer(Mockito.RETURNS_DEFAULTS); + Semaphore semaphore = Mockito.mock(Semaphore.class, mockSettings); + SlowQueryDetector slowQueryDetector = new SlowQueryDetector(); + try (MockedStatic<PushDownQueryRequestLimits> pushRequest = Mockito + .mockStatic(PushDownQueryRequestLimits.class)) { + pushRequest.when(PushDownQueryRequestLimits::getSingletonInstance).thenReturn(semaphore); + try { + UUID uuid = UUID.randomUUID(); + slowQueryDetector.queryStart(uuid.toString()); + Mockito.doThrow(new InterruptedException()).when(semaphore).tryAcquire(Mockito.anyLong(), + Mockito.any(TimeUnit.class)); + QueryRoutingEngine queryRoutingEngine = Mockito.spy(QueryRoutingEngine.class); + queryRoutingEngine.tryPushDownSelectQuery(queryParams, null, true); + } catch (Exception e) { + Assert.assertTrue(e instanceof KylinTimeoutException); + Assert.assertEquals(CommonErrorCode.TIMEOUT.toErrorCode(), ((KylinException) e).getErrorCode()); + } + } finally { + slowQueryDetector.queryEnd(); + } + } + + @Test + public void testQueryInterruptedUserStop() { + final String sql = "SELECT 1"; + final String project = "tpch"; + KylinConfig kylinconfig = KylinConfig.getInstanceFromEnv(); + QueryParams queryParams = new QueryParams(); + queryParams.setForcedToPushDown(true); + queryParams.setProject(project); + queryParams.setSql(sql); + queryParams.setKylinConfig(kylinconfig); + queryParams.setSelect(true); + MockSettings mockSettings = Mockito.withSettings().defaultAnswer(Mockito.RETURNS_DEFAULTS); + Semaphore semaphore = Mockito.mock(Semaphore.class, mockSettings); + SlowQueryDetector slowQueryDetector = new SlowQueryDetector(); + try (MockedStatic<PushDownQueryRequestLimits> pushRequest = Mockito + .mockStatic(PushDownQueryRequestLimits.class)) { + pushRequest.when(PushDownQueryRequestLimits::getSingletonInstance).thenReturn(semaphore); + try { + UUID uuid = UUID.randomUUID(); + slowQueryDetector.queryStart(uuid.toString()); + SlowQueryDetector.getRunningQueries().get(Thread.currentThread()).setStopByUser(true); + Mockito.doThrow(new InterruptedException()).when(semaphore).tryAcquire(Mockito.anyLong(), + Mockito.any(TimeUnit.class)); + QueryRoutingEngine queryRoutingEngine = Mockito.spy(QueryRoutingEngine.class); + queryRoutingEngine.tryPushDownSelectQuery(queryParams, null, true); + } catch (Exception e) { + Assert.assertTrue(e instanceof UserStopQueryException); + Assert.assertEquals(QueryErrorCode.USER_STOP_QUERY.toErrorCode(), ((KylinException) e).getErrorCode()); + } + } finally { + slowQueryDetector.queryEnd(); + } + } } diff --git a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/sql/hive/utils/TestResourceDetectUtilsByMock.scala b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/sql/hive/utils/TestResourceDetectUtilsByMock.scala index b93a98cb20..164022af61 100644 --- a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/sql/hive/utils/TestResourceDetectUtilsByMock.scala +++ b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/sql/hive/utils/TestResourceDetectUtilsByMock.scala @@ -21,7 +21,7 @@ package org.apache.spark.sql.hive.utils import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.common.SharedSparkSession -import org.apache.spark.sql.execution.FileSourceScanExec +import org.apache.spark.sql.execution.{FileSourceScanExec, LayoutFileSourceScanExec} import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, PartitionDirectory} import org.apache.spark.sql.types.{IntegerType, StructField, StructType} import org.scalamock.scalatest.MockFactory @@ -60,4 +60,36 @@ class TestResourceDetectUtilsByMock extends AnyWordSpec with MockFactory with Sh } } } + + "getPaths" when { + "LayoutFileSourceScanExec" should { + "get root paths" in { + val paths = Seq(new Path("test")) + val fileIndex = mock[FileIndex] + (fileIndex.rootPaths _).expects().returning(paths).anyNumberOfTimes() + (fileIndex.partitionSchema _).expects().returning(new StructType()).anyNumberOfTimes() + val relation = HadoopFsRelation(fileIndex, new StructType(), new StructType(), null, null, null)(spark) + val sparkPlan = LayoutFileSourceScanExec(relation, Nil, relation.schema, Nil, None, None, Nil, None) + assert(paths == ResourceDetectUtils.getPaths(sparkPlan)) + } + } + } + + "getPaths" when { + "LayoutFileSourceScanExec" should { + "get partition paths" in { + val path = new Path("test") + val paths = Seq(path) + val fileIndex = mock[FileIndex] + val relation = HadoopFsRelation(fileIndex, new StructType(), new StructType(), null, null, null)(spark) + val sparkPlan = LayoutFileSourceScanExec(relation, Nil, relation.schema, null, None, None, Nil, None) + val dataFilters = Seq.empty + val fileStatus = new FileStatus() + fileStatus.setPath(path) + (fileIndex.partitionSchema _).expects().returning(StructType(StructField("f1", IntegerType, true) :: Nil)).anyNumberOfTimes() + (fileIndex.listFiles _).expects(null, dataFilters).returning(Seq(PartitionDirectory(null, Seq(fileStatus)))).anyNumberOfTimes() + assert(paths == ResourceDetectUtils.getPaths(sparkPlan)) + } + } + } } diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala index b5640d7c29..48ef13da3b 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala @@ -18,8 +18,10 @@ package org.apache.kylin.query.pushdown -import com.google.common.collect.ImmutableList -import io.kyligence.kap.guava20.shaded.common.collect.Lists +import java.sql.Timestamp +import java.util +import java.util.{UUID, List => JList} + import org.apache.commons.lang3.StringUtils import org.apache.kylin.common.util.{DateFormat, HadoopUtil, Pair} import org.apache.kylin.common.{KapConfig, KylinConfig, QueryContext} @@ -28,7 +30,7 @@ import org.apache.kylin.metadata.query.StructField import org.apache.kylin.query.mask.QueryResultMasks import org.apache.kylin.query.runtime.plan.QueryToExecutionIDCache import org.apache.kylin.query.runtime.plan.ResultPlan.saveAsyncQueryResult -import org.apache.kylin.query.util.{QueryUtil, SparkJobTrace} +import org.apache.kylin.query.util.{QueryInterruptChecker, SparkJobTrace} import org.apache.spark.network.util.JavaUtils import org.apache.spark.sql.hive.QueryMetricUtils import org.apache.spark.sql.hive.utils.ResourceDetectUtils @@ -36,12 +38,13 @@ import org.apache.spark.sql.util.SparderTypeUtil import org.apache.spark.sql.{DataFrame, Row, SparderEnv, SparkSession} import org.slf4j.{Logger, LoggerFactory} -import java.sql.Timestamp -import java.util -import java.util.{UUID, List => JList} import scala.collection.JavaConverters._ import scala.collection.{immutable, mutable} +import com.google.common.collect.ImmutableList + +import io.kyligence.kap.guava20.shaded.common.collect.Lists + object SparkSqlClient { val DEFAULT_DB: String = "spark.sql.default.database" @@ -138,7 +141,7 @@ object SparkSqlClient { if (e.isInstanceOf[InterruptedException]) { Thread.currentThread.interrupt() ss.sparkContext.cancelJobGroup(jobGroup) - QueryUtil.checkThreadInterrupted("Interrupted at the stage of collecting result in SparkSqlClient.", + QueryInterruptChecker.checkThreadInterrupted("Interrupted at the stage of collecting result in SparkSqlClient.", "Current step: Collecting dataset of push-down.") } throw e @@ -164,7 +167,7 @@ object SparkSqlClient { val row = resultRows.next() readRowSize += 1; if (checkInterrupt && readRowSize % checkInterruptSize == 0) { - QueryUtil.checkThreadInterrupted("Interrupted at the stage of collecting result in SparkSqlClient.", + QueryInterruptChecker.checkThreadInterrupted("Interrupted at the stage of collecting result in SparkSqlClient.", "Current step: Collecting dataset of push-down.") } row.toSeq.map(rawValueToString(_)).asJava diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala index 8cc3a6ba25..7815e981b0 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/ResultPlan.scala @@ -18,8 +18,11 @@ package org.apache.kylin.query.runtime.plan -import com.google.common.cache.{Cache, CacheBuilder} -import io.kyligence.kap.secondstorage.SecondStorageUtil +import java.io.{File, FileOutputStream, OutputStreamWriter} +import java.nio.charset.StandardCharsets +import java.util.concurrent.atomic.AtomicLong +import java.{lang, util} + import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField} import org.apache.commons.io.IOUtils import org.apache.hadoop.fs.Path @@ -33,7 +36,7 @@ import org.apache.kylin.query.engine.RelColumnMetaDataExtractor import org.apache.kylin.query.engine.exec.ExecuteResult import org.apache.kylin.query.pushdown.SparkSqlClient.readPushDownResultRow import org.apache.kylin.query.relnode.OLAPContext -import org.apache.kylin.query.util.{AsyncQueryUtil, QueryUtil, SparkJobTrace, SparkQueryJobManager} +import org.apache.kylin.query.util.{AsyncQueryUtil, QueryInterruptChecker, SparkJobTrace, SparkQueryJobManager} import org.apache.poi.xssf.usermodel.{XSSFSheet, XSSFWorkbook} import org.apache.spark.SparkConf import org.apache.spark.sql.execution._ @@ -41,14 +44,14 @@ import org.apache.spark.sql.hive.QueryMetricUtils import org.apache.spark.sql.util.SparderTypeUtil import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparderEnv} -import java.io.{File, FileOutputStream, OutputStreamWriter} -import java.nio.charset.StandardCharsets -import java.util.concurrent.atomic.AtomicLong -import java.{lang, util} import scala.collection.JavaConverters._ import scala.collection.convert.ImplicitConversions.`iterator asScala` import scala.collection.mutable +import com.google.common.cache.{Cache, CacheBuilder} + +import io.kyligence.kap.secondstorage.SecondStorageUtil + // scalastyle:off object ResultType extends Enumeration { type ResultType = Value @@ -153,7 +156,7 @@ object ResultPlan extends LogEx { if (e.isInstanceOf[InterruptedException]) { Thread.currentThread.interrupt() sparkContext.cancelJobGroup(jobGroup) - QueryUtil.checkThreadInterrupted("Interrupted at the stage of collecting result in ResultPlan.", + QueryInterruptChecker.checkThreadInterrupted("Interrupted at the stage of collecting result in ResultPlan.", "Current step: Collecting dataset for sparder.") } throw e diff --git a/src/spark-project/sparder/src/test/java/org/apache/kylin/query/pushdown/PushdownJobCancelTest.java b/src/spark-project/sparder/src/test/java/org/apache/kylin/query/pushdown/PushdownJobCancelTest.java index d5a2d5df51..cd1e9e9956 100644 --- a/src/spark-project/sparder/src/test/java/org/apache/kylin/query/pushdown/PushdownJobCancelTest.java +++ b/src/spark-project/sparder/src/test/java/org/apache/kylin/query/pushdown/PushdownJobCancelTest.java @@ -22,10 +22,10 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import org.apache.kylin.common.util.NLocalFileMetadataTestCase; import org.apache.kylin.common.util.RandomUtil; -import org.apache.kylin.query.SlowQueryDetector; import org.apache.kylin.query.exception.UserStopQueryException; -import org.apache.kylin.common.util.NLocalFileMetadataTestCase; +import org.apache.kylin.query.util.SlowQueryDetector; import org.apache.spark.scheduler.JobFailed; import org.apache.spark.scheduler.SparkListener; import org.apache.spark.scheduler.SparkListenerJobEnd; diff --git a/src/spark-project/sparder/src/test/java/org/apache/kylin/query/runtime/plan/TestResultPlan.java b/src/spark-project/sparder/src/test/java/org/apache/kylin/query/runtime/plan/TestResultPlan.java index 5dfcc6e66e..917bc3ecf9 100644 --- a/src/spark-project/sparder/src/test/java/org/apache/kylin/query/runtime/plan/TestResultPlan.java +++ b/src/spark-project/sparder/src/test/java/org/apache/kylin/query/runtime/plan/TestResultPlan.java @@ -27,14 +27,14 @@ import org.apache.kylin.common.KapConfig; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.QueryContext; import org.apache.kylin.common.exception.NewQueryRefuseException; -import org.apache.kylin.query.SlowQueryDetector; -import org.apache.kylin.query.exception.UserStopQueryException; import org.apache.kylin.common.state.StateSwitchConstant; import org.apache.kylin.common.util.AddressUtil; import org.apache.kylin.common.util.NLocalFileMetadataTestCase; import org.apache.kylin.metadata.query.BigQueryThresholdUpdater; import org.apache.kylin.metadata.state.QueryShareStateManager; import org.apache.kylin.query.MockContext; +import org.apache.kylin.query.exception.UserStopQueryException; +import org.apache.kylin.query.util.SlowQueryDetector; import org.apache.spark.SparkConf; import org.apache.spark.scheduler.JobFailed; import org.apache.spark.scheduler.SparkListener; @@ -63,8 +63,7 @@ public class TestResultPlan extends NLocalFileMetadataTestCase { "test@jdbc,driverClassName=org.h2.Driver,url=jdbc:h2:mem:db_default;DB_CLOSE_DELAY=-1,username=sa,password="); getTestConfig().setProperty("kylin.query.share-state-switch-implement", "jdbc"); getTestConfig().setProperty("kylin.query.big-query-source-scan-rows-threshold", "100000000"); - ss = SparkSession.builder().appName("local").master("local[1]") - .getOrCreate(); + ss = SparkSession.builder().appName("local").master("local[1]").getOrCreate(); SparderEnv.setSparkSession(ss); StructType schema = new StructType(); schema = schema.add("TRANS_ID", DataTypes.LongType, false); @@ -134,8 +133,8 @@ public class TestResultPlan extends NLocalFileMetadataTestCase { QueryShareStateManager.getInstance().setState( Collections.singletonList(AddressUtil.concatInstanceName()), StateSwitchConstant.QUERY_LIMIT_STATE, "false"); - QueryContext.current().getMetrics() - .addAccumSourceScanRows(KapConfig.getInstanceFromEnv().getBigQuerySourceScanRowsThreshold() + 1); + QueryContext.current().getMetrics().addAccumSourceScanRows( + KapConfig.getInstanceFromEnv().getBigQuerySourceScanRowsThreshold() + 1); String sql = "select * from TEST_KYLIN_FACT"; ResultPlan.getResult(ss.sql(sql), null); } catch (Exception e) { @@ -148,7 +147,8 @@ public class TestResultPlan extends NLocalFileMetadataTestCase { queryThread.join(); isJobEnd.await(10, TimeUnit.SECONDS); Assert.assertTrue(sparkJobEnd.get().jobResult() instanceof JobFailed); - Assert.assertTrue(((JobFailed)sparkJobEnd.get().jobResult()).exception().getMessage().contains("cancelled part of cancelled job group")); + Assert.assertTrue(((JobFailed) sparkJobEnd.get().jobResult()).exception().getMessage() + .contains("cancelled part of cancelled job group")); Thread queryThread2 = new Thread(() -> { try { diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/hive/utils/ResourceDetectUtils.scala b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/hive/utils/ResourceDetectUtils.scala index e0bdefb303..2a27fc27ec 100644 --- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/hive/utils/ResourceDetectUtils.scala +++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/hive/utils/ResourceDetectUtils.scala @@ -24,18 +24,18 @@ import java.util.concurrent.ForkJoinPool import java.util.concurrent.atomic.AtomicLong import java.util.{Map => JMap} -import com.google.common.collect.Maps -import com.google.gson.Gson -import com.google.gson.reflect.TypeToken import org.apache.hadoop.conf.Configuration -import org.apache.kylin.metadata.cube.model.{DimensionRangeInfo, LayoutEntity} import org.apache.hadoop.fs._ import org.apache.kylin.common.KylinConfig import org.apache.kylin.common.util.HadoopUtil +import org.apache.kylin.metadata.cube.model.{DimensionRangeInfo, LayoutEntity} +import org.apache.kylin.query.util.QueryInterruptChecker import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec +import org.apache.spark.sql.execution.datasources.FileIndex import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec} -import org.apache.spark.sql.execution._ import org.apache.spark.sql.hive.execution.HiveTableScanExec import org.apache.spark.sql.sources.NBaseRelation @@ -43,36 +43,32 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.parallel.ForkJoinTaskSupport +import com.google.common.collect.Maps +import com.google.gson.Gson +import com.google.gson.reflect.TypeToken + object ResourceDetectUtils extends Logging { private val json = new Gson() + private val errorMsgLog: String = "Interrupted at the stage of get paths in ResourceDetectUtils." + def getPaths(plan: SparkPlan): Seq[Path] = { var paths = Seq.empty[Path] plan.foreach { case plan: FileSourceScanExec => - if (plan.relation.location.partitionSchema.nonEmpty) { - val selectedPartitions = plan.relation.location.listFiles(plan.partitionFilters, plan.dataFilters) - selectedPartitions.flatMap(partition => partition.files).foreach(file => { - paths :+= file.getPath - }) - } else { - paths ++= plan.relation.location.rootPaths - } + val info = "Current step: get Partition file status of FileSourceScanExec." + paths ++= getFilePaths(plan.relation.location, plan.partitionFilters, plan.dataFilters, info) case plan: LayoutFileSourceScanExec => - if (plan.relation.location.partitionSchema.nonEmpty) { - val selectedPartitions = plan.relation.location.listFiles(plan.partitionFilters, plan.dataFilters) - selectedPartitions.flatMap(partition => partition.files).foreach(file => { - paths :+= file.getPath - }) - } else { - paths ++= plan.relation.location.rootPaths - } + val info = "Current step: get Partition file status of LayoutFileSourceScanExec." + paths ++= getFilePaths(plan.relation.location, plan.partitionFilters, plan.dataFilters, info) case plan: InMemoryTableScanExec => val _plan = plan.relation.cachedPlan paths ++= getPaths(_plan) case plan: HiveTableScanExec => if (plan.relation.isPartitioned) { plan.rawPartitions.foreach { partition => + QueryInterruptChecker.checkThreadInterrupted(errorMsgLog, + "Current step: get Partition file status of HiveTableScanExec.") paths ++= partition.getPath } } else { @@ -89,6 +85,22 @@ object ResourceDetectUtils extends Logging { paths } + def getFilePaths(fileIndex: FileIndex, partitionFilters: Seq[Expression], dataFilters: Seq[Expression], info: String): Seq[Path] = { + var paths = Seq.empty[Path] + if (fileIndex.partitionSchema.nonEmpty) { + val selectedPartitions = fileIndex.listFiles(partitionFilters, dataFilters) + selectedPartitions.flatMap(partition => { + QueryInterruptChecker.checkThreadInterrupted(errorMsgLog, info) + partition.files + }).foreach(file => { + paths :+= file.getPath + }) + } else { + paths ++= fileIndex.rootPaths + } + paths + } + def getPartitions(plan: SparkPlan): String = { val leafNodePartitionsLengthMap: mutable.Map[String, Int] = mutable.Map() var pNum = 0