This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push: new 3f0b2a8 KYLIN-4713 Support use diff spark schedule pool for diff query 3f0b2a8 is described below commit 3f0b2a8eaa14e42a36df314b16263b1e90efc44d Author: yaqian.zhang <598593...@qq.com> AuthorDate: Mon Aug 24 17:02:34 2020 +0800 KYLIN-4713 Support use diff spark schedule pool for diff query --- build/bin/check-migration-acl.sh | 2 +- build/bin/kylin.sh | 44 ++++++++++++++++++++++ .../org/apache/kylin/common/KylinConfigBase.java | 4 ++ .../java/org/apache/kylin/common/QueryContext.java | 18 +++++++++ .../apache/kylin/common/debug/BackdoorToggles.java | 6 +++ .../kylin/query/runtime/plans/ResultPlan.scala | 19 ++++++++-- .../apache/kylin/rest/response/SQLResponse.java | 10 +++++ .../apache/kylin/rest/service/QueryService.java | 3 ++ .../kylin/rest/response/SQLResponseTest.java | 2 +- 9 files changed, 103 insertions(+), 5 deletions(-) diff --git a/build/bin/check-migration-acl.sh b/build/bin/check-migration-acl.sh index 70f3337..27f8971 100755 --- a/build/bin/check-migration-acl.sh +++ b/build/bin/check-migration-acl.sh @@ -25,7 +25,7 @@ metadataUrl=`${dir}/get-properties.sh kylin.metadata.url` if [[ "${metadataUrl##*@}" != "hbase" ]] then - echo "Not HBase metadata ${metadataUrl}. Skip check." + echo "Not HBase metadata. Skip check." exit 0 fi diff --git a/build/bin/kylin.sh b/build/bin/kylin.sh index 7f7e2e5..bb33eae 100755 --- a/build/bin/kylin.sh +++ b/build/bin/kylin.sh @@ -141,6 +141,48 @@ function checkBasicKylinProps() { fi } +function prepareFairScheduler() { + cat > ${KYLIN_HOME}/conf/fairscheduler.xml <<EOL +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<allocations> + <pool name="query_pushdown"> + <schedulingMode>FAIR</schedulingMode> + <weight>1</weight> + <minShare>1</minShare> + </pool> + <pool name="heavy_tasks"> + <schedulingMode>FAIR</schedulingMode> + <weight>5</weight> + <minShare>1</minShare> + </pool> + <pool name="lightweight_tasks"> + <schedulingMode>FAIR</schedulingMode> + <weight>10</weight> + <minShare>1</minShare> + </pool> + <pool name="vip_tasks"> + <schedulingMode>FAIR</schedulingMode> + <weight>15</weight> + <minShare>1</minShare> + </pool> +</allocations> +EOL +} + function checkRestPort() { kylin_rest_address_arr=(${KYLIN_REST_ADDRESS//:/ }) inuse=`netstat -tlpn | grep "\b${kylin_rest_address_arr[1]}\b"` @@ -196,6 +238,8 @@ then checkRestPort + prepareFairScheduler + ${KYLIN_HOME}/bin/check-migration-acl.sh || { exit 1; } # get KYLIN_EXTRA_START_OPTS diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index f317ba2..ac93fd4 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -1800,6 +1800,10 @@ public abstract class KylinConfigBase implements Serializable { return Boolean.parseBoolean(getOptional("kylin.query.stream-aggregate-enabled", TRUE)); } + public String getProjectQuerySparkPool() { + return getOptional("kylin.query.spark.pool", null); + } + public boolean isProjectIsolationEnabled() { return Boolean.parseBoolean(getOptional("kylin.storage.project-isolation-enable", TRUE)); } diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java index 6e71e6a..801eb52 100644 --- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java +++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java @@ -55,6 +55,8 @@ public class QueryContext { private final String queryId; private String username; private Set<String> groups; + private String project; + private String sparkPool; private AtomicLong scannedRows = new AtomicLong(); private AtomicLong returnedRows = new AtomicLong(); private AtomicLong scannedBytes = new AtomicLong(); @@ -120,6 +122,22 @@ public class QueryContext { this.groups = groups; } + public void setProject(String project) { + this.project = project; + } + + public String getProject() { + return project; + } + + public void setSparkPool(String sparkPool) { + this.sparkPool = sparkPool; + } + + public String getSparkPool() { + return this.sparkPool; + } + public Object getCalcitePlan() { return calcitePlan; } diff --git a/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java b/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java index d6674a2..088adc2 100644 --- a/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java +++ b/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java @@ -75,6 +75,10 @@ public class BackdoorToggles { return getString(DEBUG_TOGGLE_HIT_CUBE); } + public static String getDebugToggleSparkPool(){ + return getString(DEBUG_TOGGLE_SPARK_POOL); + } + public static String getCoprocessorBehavior() { return getString(DEBUG_TOGGLE_COPROCESSOR_BEHAVIOR); } @@ -395,4 +399,6 @@ public class BackdoorToggles { * check https://issues.apache.org/jira/browse/KYLIN-4312 for information. */ public final static String DEBUG_TOGGLE_HIT_CUBE = "DEBUG_TOGGLE_HIT_CUBE"; + + public final static String DEBUG_TOGGLE_SPARK_POOL = "DEBUG_TOGGLE_SPARK_POOL"; } diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala index f12ac2f..5430723 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala @@ -21,9 +21,11 @@ import com.google.common.cache.{Cache, CacheBuilder} import com.google.common.collect.Lists import org.apache.calcite.linq4j.{Enumerable, Linq4j} import org.apache.calcite.rel.`type`.RelDataType +import org.apache.kylin.common.debug.BackdoorToggles import org.apache.kylin.common.exceptions.KylinTimeoutException import org.apache.kylin.common.{KylinConfig, QueryContext, QueryContextFacade} import org.apache.kylin.common.util.HadoopUtil +import org.apache.kylin.metadata.project.ProjectManager import org.apache.kylin.query.runtime.plans.ResultType.ResultType import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, SparderContext} @@ -62,7 +64,11 @@ object ResultPlan extends Logging { val resultTypes = rowType.getFieldList.asScala val jobGroup = Thread.currentThread().getName val sparkContext = SparderContext.getSparkSession.sparkContext - val kylinConfig = KylinConfig.getInstanceFromEnv + val projectName = QueryContextFacade.current().getProject + var kylinConfig = KylinConfig.getInstanceFromEnv + if (projectName != null) { + kylinConfig = ProjectManager.getInstance(kylinConfig).getProject(projectName).getConfig + } var pool = "heavy_tasks" val partitionsNum = if (kylinConfig.getSparkSqlShufflePartitions != -1) { @@ -71,16 +77,23 @@ object ResultPlan extends Logging { Math.min(QueryContextFacade.current().getSourceScanBytes / PARTITION_SPLIT_BYTES + 1, SparderContext.getTotalCore).toInt } + if (QueryContextFacade.current().isHighPriorityQuery) { pool = "vip_tasks" - } else if (QueryContextFacade.current().isTableIndex) { - pool = "extreme_heavy_tasks" } else if (partitionsNum <= SparderContext.getTotalCore) { pool = "lightweight_tasks" } + if (kylinConfig.getProjectQuerySparkPool != null) { + pool = kylinConfig.getProjectQuerySparkPool + } + if (BackdoorToggles.getDebugToggleSparkPool != null) { + pool = BackdoorToggles.getDebugToggleSparkPool + } + // set priority sparkContext.setLocalProperty("spark.scheduler.pool", pool) + QueryContextFacade.current().setSparkPool(pool) val queryId = QueryContextFacade.current().getQueryId sparkContext.setLocalProperty(QueryToExecutionIDCache.KYLIN_QUERY_ID_KEY, queryId) //df.sparkSession.sessionState.conf.setLocalProperty("spark.sql.shuffle.partitions", diff --git a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java index 1721efe..8fa57c6 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java +++ b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java @@ -72,6 +72,8 @@ public class SQLResponse implements Serializable { protected boolean queryPushDown = false; + protected String querySparkPool; + protected byte[] queryStatistics; protected String traceUrl = null; @@ -139,6 +141,14 @@ public class SQLResponse implements Serializable { isException = v; } + public String getSparkPool() { + return querySparkPool; + } + + public void setSparkPool(String sparkPool) { + querySparkPool = sparkPool; + } + public String getExceptionMessage() { return exceptionMessage; } diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java index e8cd931..fd686b5 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -356,6 +356,7 @@ public class QueryService extends BasicService { stringBuilder.append("Storage cache used: ").append(storageCacheUsed).append(newLine); stringBuilder.append("Is Query Push-Down: ").append(isPushDown).append(newLine); stringBuilder.append("Is Prepare: ").append(BackdoorToggles.getPrepareOnly()).append(newLine); + stringBuilder.append("Used Spark pool: ").append(response.getSparkPool()).append(newLine); stringBuilder.append("Trace URL: ").append(response.getTraceUrl()).append(newLine); stringBuilder.append("Message: ").append(response.getExceptionMessage()).append(newLine); stringBuilder.append("==========================[QUERY]===============================").append(newLine); @@ -632,6 +633,7 @@ public class QueryService extends BasicService { QueryContext context = QueryContextFacade.current(); context.setUsername(userInfo); context.setGroups(AclPermissionUtil.getCurrentUserGroups()); + context.setProject(sqlRequest.getProject()); final Collection<? extends GrantedAuthority> grantedAuthorities = SecurityContextHolder.getContext() .getAuthentication().getAuthorities(); for (GrantedAuthority grantedAuthority : grantedAuthorities) { @@ -1163,6 +1165,7 @@ public class QueryService extends BasicService { response.setTotalScanCount(queryContext.getScannedRows()); response.setTotalScanBytes(queryContext.getScannedBytes()); response.setCubeSegmentStatisticsList(queryContext.getCubeSegmentStatisticsResultList()); + response.setSparkPool(queryContext.getSparkPool()); if (getConfig().isQueryCacheSignatureEnabled()) { response.setSignature(SQLResponseSignatureUtil.createSignature(getConfig(), response, projectName)); } diff --git a/server-base/src/test/java/org/apache/kylin/rest/response/SQLResponseTest.java b/server-base/src/test/java/org/apache/kylin/rest/response/SQLResponseTest.java index 4055a27..f939eb0 100644 --- a/server-base/src/test/java/org/apache/kylin/rest/response/SQLResponseTest.java +++ b/server-base/src/test/java/org/apache/kylin/rest/response/SQLResponseTest.java @@ -34,7 +34,7 @@ public class SQLResponseTest { public void testInterfaceConsistency() throws IOException { String[] attrArray = new String[] { "columnMetas", "results", "cube", "affectedRowCount", "isException", "exceptionMessage", "duration", "partial", "totalScanCount", "hitExceptionCache", "storageCacheUsed", - "pushDown", "traceUrl", "totalScanBytes" }; + "sparkPool", "pushDown", "traceUrl", "totalScanBytes" }; SQLResponse sqlResponse = new SQLResponse(null, null, "learn_cube", 100, false, null, false, false); String jsonStr = JsonUtil.writeValueAsString(sqlResponse);