This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this
push:
new 3f0b2a8 KYLIN-4713 Support use diff spark schedule pool for diff query
3f0b2a8 is described below
commit 3f0b2a8eaa14e42a36df314b16263b1e90efc44d
Author: yaqian.zhang <[email protected]>
AuthorDate: Mon Aug 24 17:02:34 2020 +0800
KYLIN-4713 Support use diff spark schedule pool for diff query
---
build/bin/check-migration-acl.sh | 2 +-
build/bin/kylin.sh | 44 ++++++++++++++++++++++
.../org/apache/kylin/common/KylinConfigBase.java | 4 ++
.../java/org/apache/kylin/common/QueryContext.java | 18 +++++++++
.../apache/kylin/common/debug/BackdoorToggles.java | 6 +++
.../kylin/query/runtime/plans/ResultPlan.scala | 19 ++++++++--
.../apache/kylin/rest/response/SQLResponse.java | 10 +++++
.../apache/kylin/rest/service/QueryService.java | 3 ++
.../kylin/rest/response/SQLResponseTest.java | 2 +-
9 files changed, 103 insertions(+), 5 deletions(-)
diff --git a/build/bin/check-migration-acl.sh b/build/bin/check-migration-acl.sh
index 70f3337..27f8971 100755
--- a/build/bin/check-migration-acl.sh
+++ b/build/bin/check-migration-acl.sh
@@ -25,7 +25,7 @@ metadataUrl=`${dir}/get-properties.sh kylin.metadata.url`
if [[ "${metadataUrl##*@}" != "hbase" ]]
then
- echo "Not HBase metadata ${metadataUrl}. Skip check."
+ echo "Not HBase metadata. Skip check."
exit 0
fi
diff --git a/build/bin/kylin.sh b/build/bin/kylin.sh
index 7f7e2e5..bb33eae 100755
--- a/build/bin/kylin.sh
+++ b/build/bin/kylin.sh
@@ -141,6 +141,48 @@ function checkBasicKylinProps() {
fi
}
+function prepareFairScheduler() {
+ cat > ${KYLIN_HOME}/conf/fairscheduler.xml <<EOL
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<allocations>
+ <pool name="query_pushdown">
+ <schedulingMode>FAIR</schedulingMode>
+ <weight>1</weight>
+ <minShare>1</minShare>
+ </pool>
+ <pool name="heavy_tasks">
+ <schedulingMode>FAIR</schedulingMode>
+ <weight>5</weight>
+ <minShare>1</minShare>
+ </pool>
+ <pool name="lightweight_tasks">
+ <schedulingMode>FAIR</schedulingMode>
+ <weight>10</weight>
+ <minShare>1</minShare>
+ </pool>
+ <pool name="vip_tasks">
+ <schedulingMode>FAIR</schedulingMode>
+ <weight>15</weight>
+ <minShare>1</minShare>
+ </pool>
+</allocations>
+EOL
+}
+
function checkRestPort() {
kylin_rest_address_arr=(${KYLIN_REST_ADDRESS//:/ })
inuse=`netstat -tlpn | grep "\b${kylin_rest_address_arr[1]}\b"`
@@ -196,6 +238,8 @@ then
checkRestPort
+ prepareFairScheduler
+
${KYLIN_HOME}/bin/check-migration-acl.sh || { exit 1; }
# get KYLIN_EXTRA_START_OPTS
diff --git
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index f317ba2..ac93fd4 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1800,6 +1800,10 @@ public abstract class KylinConfigBase implements
Serializable {
return
Boolean.parseBoolean(getOptional("kylin.query.stream-aggregate-enabled", TRUE));
}
+ public String getProjectQuerySparkPool() {
+ return getOptional("kylin.query.spark.pool", null);
+ }
+
public boolean isProjectIsolationEnabled() {
return
Boolean.parseBoolean(getOptional("kylin.storage.project-isolation-enable",
TRUE));
}
diff --git
a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
index 6e71e6a..801eb52 100644
--- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
@@ -55,6 +55,8 @@ public class QueryContext {
private final String queryId;
private String username;
private Set<String> groups;
+ private String project;
+ private String sparkPool;
private AtomicLong scannedRows = new AtomicLong();
private AtomicLong returnedRows = new AtomicLong();
private AtomicLong scannedBytes = new AtomicLong();
@@ -120,6 +122,22 @@ public class QueryContext {
this.groups = groups;
}
+ public void setProject(String project) {
+ this.project = project;
+ }
+
+ public String getProject() {
+ return project;
+ }
+
+ public void setSparkPool(String sparkPool) {
+ this.sparkPool = sparkPool;
+ }
+
+ public String getSparkPool() {
+ return this.sparkPool;
+ }
+
public Object getCalcitePlan() {
return calcitePlan;
}
diff --git
a/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
b/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
index d6674a2..088adc2 100644
---
a/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
+++
b/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
@@ -75,6 +75,10 @@ public class BackdoorToggles {
return getString(DEBUG_TOGGLE_HIT_CUBE);
}
+ public static String getDebugToggleSparkPool(){
+ return getString(DEBUG_TOGGLE_SPARK_POOL);
+ }
+
public static String getCoprocessorBehavior() {
return getString(DEBUG_TOGGLE_COPROCESSOR_BEHAVIOR);
}
@@ -395,4 +399,6 @@ public class BackdoorToggles {
* check https://issues.apache.org/jira/browse/KYLIN-4312 for information.
*/
public final static String DEBUG_TOGGLE_HIT_CUBE = "DEBUG_TOGGLE_HIT_CUBE";
+
+ public final static String DEBUG_TOGGLE_SPARK_POOL =
"DEBUG_TOGGLE_SPARK_POOL";
}
diff --git
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
index f12ac2f..5430723 100644
---
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
+++
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
@@ -21,9 +21,11 @@ import com.google.common.cache.{Cache, CacheBuilder}
import com.google.common.collect.Lists
import org.apache.calcite.linq4j.{Enumerable, Linq4j}
import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.kylin.common.debug.BackdoorToggles
import org.apache.kylin.common.exceptions.KylinTimeoutException
import org.apache.kylin.common.{KylinConfig, QueryContext, QueryContextFacade}
import org.apache.kylin.common.util.HadoopUtil
+import org.apache.kylin.metadata.project.ProjectManager
import org.apache.kylin.query.runtime.plans.ResultType.ResultType
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SparderContext}
@@ -62,7 +64,11 @@ object ResultPlan extends Logging {
val resultTypes = rowType.getFieldList.asScala
val jobGroup = Thread.currentThread().getName
val sparkContext = SparderContext.getSparkSession.sparkContext
- val kylinConfig = KylinConfig.getInstanceFromEnv
+ val projectName = QueryContextFacade.current().getProject
+ var kylinConfig = KylinConfig.getInstanceFromEnv
+ if (projectName != null) {
+ kylinConfig =
ProjectManager.getInstance(kylinConfig).getProject(projectName).getConfig
+ }
var pool = "heavy_tasks"
val partitionsNum =
if (kylinConfig.getSparkSqlShufflePartitions != -1) {
@@ -71,16 +77,23 @@ object ResultPlan extends Logging {
Math.min(QueryContextFacade.current().getSourceScanBytes /
PARTITION_SPLIT_BYTES + 1,
SparderContext.getTotalCore).toInt
}
+
if (QueryContextFacade.current().isHighPriorityQuery) {
pool = "vip_tasks"
- } else if (QueryContextFacade.current().isTableIndex) {
- pool = "extreme_heavy_tasks"
} else if (partitionsNum <= SparderContext.getTotalCore) {
pool = "lightweight_tasks"
}
+ if (kylinConfig.getProjectQuerySparkPool != null) {
+ pool = kylinConfig.getProjectQuerySparkPool
+ }
+ if (BackdoorToggles.getDebugToggleSparkPool != null) {
+ pool = BackdoorToggles.getDebugToggleSparkPool
+ }
+
// set priority
sparkContext.setLocalProperty("spark.scheduler.pool", pool)
+ QueryContextFacade.current().setSparkPool(pool)
val queryId = QueryContextFacade.current().getQueryId
sparkContext.setLocalProperty(QueryToExecutionIDCache.KYLIN_QUERY_ID_KEY,
queryId)
//df.sparkSession.sessionState.conf.setLocalProperty("spark.sql.shuffle.partitions",
diff --git
a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
index 1721efe..8fa57c6 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
@@ -72,6 +72,8 @@ public class SQLResponse implements Serializable {
protected boolean queryPushDown = false;
+ protected String querySparkPool;
+
protected byte[] queryStatistics;
protected String traceUrl = null;
@@ -139,6 +141,14 @@ public class SQLResponse implements Serializable {
isException = v;
}
+ public String getSparkPool() {
+ return querySparkPool;
+ }
+
+ public void setSparkPool(String sparkPool) {
+ querySparkPool = sparkPool;
+ }
+
public String getExceptionMessage() {
return exceptionMessage;
}
diff --git
a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index e8cd931..fd686b5 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -356,6 +356,7 @@ public class QueryService extends BasicService {
stringBuilder.append("Storage cache used:
").append(storageCacheUsed).append(newLine);
stringBuilder.append("Is Query Push-Down:
").append(isPushDown).append(newLine);
stringBuilder.append("Is Prepare:
").append(BackdoorToggles.getPrepareOnly()).append(newLine);
+ stringBuilder.append("Used Spark pool:
").append(response.getSparkPool()).append(newLine);
stringBuilder.append("Trace URL:
").append(response.getTraceUrl()).append(newLine);
stringBuilder.append("Message:
").append(response.getExceptionMessage()).append(newLine);
stringBuilder.append("==========================[QUERY]===============================").append(newLine);
@@ -632,6 +633,7 @@ public class QueryService extends BasicService {
QueryContext context = QueryContextFacade.current();
context.setUsername(userInfo);
context.setGroups(AclPermissionUtil.getCurrentUserGroups());
+ context.setProject(sqlRequest.getProject());
final Collection<? extends GrantedAuthority> grantedAuthorities =
SecurityContextHolder.getContext()
.getAuthentication().getAuthorities();
for (GrantedAuthority grantedAuthority : grantedAuthorities) {
@@ -1163,6 +1165,7 @@ public class QueryService extends BasicService {
response.setTotalScanCount(queryContext.getScannedRows());
response.setTotalScanBytes(queryContext.getScannedBytes());
response.setCubeSegmentStatisticsList(queryContext.getCubeSegmentStatisticsResultList());
+ response.setSparkPool(queryContext.getSparkPool());
if (getConfig().isQueryCacheSignatureEnabled()) {
response.setSignature(SQLResponseSignatureUtil.createSignature(getConfig(),
response, projectName));
}
diff --git
a/server-base/src/test/java/org/apache/kylin/rest/response/SQLResponseTest.java
b/server-base/src/test/java/org/apache/kylin/rest/response/SQLResponseTest.java
index 4055a27..f939eb0 100644
---
a/server-base/src/test/java/org/apache/kylin/rest/response/SQLResponseTest.java
+++
b/server-base/src/test/java/org/apache/kylin/rest/response/SQLResponseTest.java
@@ -34,7 +34,7 @@ public class SQLResponseTest {
public void testInterfaceConsistency() throws IOException {
String[] attrArray = new String[] { "columnMetas", "results", "cube",
"affectedRowCount", "isException",
"exceptionMessage", "duration", "partial", "totalScanCount",
"hitExceptionCache", "storageCacheUsed",
- "pushDown", "traceUrl", "totalScanBytes" };
+ "sparkPool", "pushDown", "traceUrl", "totalScanBytes" };
SQLResponse sqlResponse = new SQLResponse(null, null, "learn_cube",
100, false, null, false, false);
String jsonStr = JsonUtil.writeValueAsString(sqlResponse);