This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this 
push:
     new 3f0b2a8  KYLIN-4713 Support use diff spark schedule pool for diff query
3f0b2a8 is described below

commit 3f0b2a8eaa14e42a36df314b16263b1e90efc44d
Author: yaqian.zhang <598593...@qq.com>
AuthorDate: Mon Aug 24 17:02:34 2020 +0800

    KYLIN-4713 Support use diff spark schedule pool for diff query
---
 build/bin/check-migration-acl.sh                   |  2 +-
 build/bin/kylin.sh                                 | 44 ++++++++++++++++++++++
 .../org/apache/kylin/common/KylinConfigBase.java   |  4 ++
 .../java/org/apache/kylin/common/QueryContext.java | 18 +++++++++
 .../apache/kylin/common/debug/BackdoorToggles.java |  6 +++
 .../kylin/query/runtime/plans/ResultPlan.scala     | 19 ++++++++--
 .../apache/kylin/rest/response/SQLResponse.java    | 10 +++++
 .../apache/kylin/rest/service/QueryService.java    |  3 ++
 .../kylin/rest/response/SQLResponseTest.java       |  2 +-
 9 files changed, 103 insertions(+), 5 deletions(-)

diff --git a/build/bin/check-migration-acl.sh b/build/bin/check-migration-acl.sh
index 70f3337..27f8971 100755
--- a/build/bin/check-migration-acl.sh
+++ b/build/bin/check-migration-acl.sh
@@ -25,7 +25,7 @@ metadataUrl=`${dir}/get-properties.sh kylin.metadata.url`
 
 if [[ "${metadataUrl##*@}" != "hbase" ]]
 then
-    echo "Not HBase metadata ${metadataUrl}. Skip check."
+    echo "Not HBase metadata. Skip check."
     exit 0
 fi
 
diff --git a/build/bin/kylin.sh b/build/bin/kylin.sh
index 7f7e2e5..bb33eae 100755
--- a/build/bin/kylin.sh
+++ b/build/bin/kylin.sh
@@ -141,6 +141,48 @@ function checkBasicKylinProps() {
     fi
 }
 
+function prepareFairScheduler() {
+    cat > ${KYLIN_HOME}/conf/fairscheduler.xml <<EOL
+<?xml version="1.0"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<allocations>
+  <pool name="query_pushdown">
+    <schedulingMode>FAIR</schedulingMode>
+    <weight>1</weight>
+    <minShare>1</minShare>
+  </pool>
+  <pool name="heavy_tasks">
+    <schedulingMode>FAIR</schedulingMode>
+    <weight>5</weight>
+    <minShare>1</minShare>
+  </pool>
+  <pool name="lightweight_tasks">
+    <schedulingMode>FAIR</schedulingMode>
+    <weight>10</weight>
+    <minShare>1</minShare>
+  </pool>
+  <pool name="vip_tasks">
+    <schedulingMode>FAIR</schedulingMode>
+    <weight>15</weight>
+    <minShare>1</minShare>
+  </pool>
+</allocations>
+EOL
+}
+
 function checkRestPort() {
     kylin_rest_address_arr=(${KYLIN_REST_ADDRESS//:/ })
     inuse=`netstat -tlpn | grep "\b${kylin_rest_address_arr[1]}\b"`
@@ -196,6 +238,8 @@ then
 
     checkRestPort
 
+    prepareFairScheduler
+
     ${KYLIN_HOME}/bin/check-migration-acl.sh || { exit 1; }
 
     # get KYLIN_EXTRA_START_OPTS
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index f317ba2..ac93fd4 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1800,6 +1800,10 @@ public abstract class KylinConfigBase implements 
Serializable {
         return 
Boolean.parseBoolean(getOptional("kylin.query.stream-aggregate-enabled", TRUE));
     }
 
+    public String getProjectQuerySparkPool() {
+        return getOptional("kylin.query.spark.pool", null);
+    }
+
     public boolean isProjectIsolationEnabled() {
         return 
Boolean.parseBoolean(getOptional("kylin.storage.project-isolation-enable", 
TRUE));
     }
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java 
b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
index 6e71e6a..801eb52 100644
--- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
@@ -55,6 +55,8 @@ public class QueryContext {
     private final String queryId;
     private String username;
     private Set<String> groups;
+    private String project;
+    private String sparkPool;
     private AtomicLong scannedRows = new AtomicLong();
     private AtomicLong returnedRows = new AtomicLong();
     private AtomicLong scannedBytes = new AtomicLong();
@@ -120,6 +122,22 @@ public class QueryContext {
         this.groups = groups;
     }
 
+    public void setProject(String project) {
+        this.project = project;
+    }
+
+    public String getProject() {
+        return project;
+    }
+
+    public void setSparkPool(String sparkPool) {
+        this.sparkPool = sparkPool;
+    }
+
+    public String getSparkPool() {
+        return this.sparkPool;
+    }
+
     public Object getCalcitePlan() {
         return calcitePlan;
     }
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java 
b/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
index d6674a2..088adc2 100644
--- 
a/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
+++ 
b/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
@@ -75,6 +75,10 @@ public class BackdoorToggles {
         return getString(DEBUG_TOGGLE_HIT_CUBE);
     }
 
+    public static String getDebugToggleSparkPool(){
+        return getString(DEBUG_TOGGLE_SPARK_POOL);
+    }
+
     public static String getCoprocessorBehavior() {
         return getString(DEBUG_TOGGLE_COPROCESSOR_BEHAVIOR);
     }
@@ -395,4 +399,6 @@ public class BackdoorToggles {
      *  check https://issues.apache.org/jira/browse/KYLIN-4312 for information.
      */
     public final static String DEBUG_TOGGLE_HIT_CUBE = "DEBUG_TOGGLE_HIT_CUBE";
+
+    public final static String DEBUG_TOGGLE_SPARK_POOL = 
"DEBUG_TOGGLE_SPARK_POOL";
 }
diff --git 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
index f12ac2f..5430723 100644
--- 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
+++ 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
@@ -21,9 +21,11 @@ import com.google.common.cache.{Cache, CacheBuilder}
 import com.google.common.collect.Lists
 import org.apache.calcite.linq4j.{Enumerable, Linq4j}
 import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.kylin.common.debug.BackdoorToggles
 import org.apache.kylin.common.exceptions.KylinTimeoutException
 import org.apache.kylin.common.{KylinConfig, QueryContext, QueryContextFacade}
 import org.apache.kylin.common.util.HadoopUtil
+import org.apache.kylin.metadata.project.ProjectManager
 import org.apache.kylin.query.runtime.plans.ResultType.ResultType
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{DataFrame, SparderContext}
@@ -62,7 +64,11 @@ object ResultPlan extends Logging {
     val resultTypes = rowType.getFieldList.asScala
     val jobGroup = Thread.currentThread().getName
     val sparkContext = SparderContext.getSparkSession.sparkContext
-    val kylinConfig = KylinConfig.getInstanceFromEnv
+    val projectName = QueryContextFacade.current().getProject
+    var kylinConfig = KylinConfig.getInstanceFromEnv
+    if (projectName != null) {
+      kylinConfig = 
ProjectManager.getInstance(kylinConfig).getProject(projectName).getConfig
+    }
     var pool = "heavy_tasks"
     val partitionsNum =
       if (kylinConfig.getSparkSqlShufflePartitions != -1) {
@@ -71,16 +77,23 @@ object ResultPlan extends Logging {
         Math.min(QueryContextFacade.current().getSourceScanBytes / 
PARTITION_SPLIT_BYTES + 1,
           SparderContext.getTotalCore).toInt
       }
+
     if (QueryContextFacade.current().isHighPriorityQuery) {
       pool = "vip_tasks"
-    } else if (QueryContextFacade.current().isTableIndex) {
-      pool = "extreme_heavy_tasks"
     } else if (partitionsNum <= SparderContext.getTotalCore) {
       pool = "lightweight_tasks"
     }
 
+    if (kylinConfig.getProjectQuerySparkPool != null) {
+      pool = kylinConfig.getProjectQuerySparkPool
+    }
+    if (BackdoorToggles.getDebugToggleSparkPool != null) {
+      pool = BackdoorToggles.getDebugToggleSparkPool
+    }
+
     // set priority
     sparkContext.setLocalProperty("spark.scheduler.pool", pool)
+    QueryContextFacade.current().setSparkPool(pool)
     val queryId = QueryContextFacade.current().getQueryId
     sparkContext.setLocalProperty(QueryToExecutionIDCache.KYLIN_QUERY_ID_KEY, 
queryId)
     
//df.sparkSession.sessionState.conf.setLocalProperty("spark.sql.shuffle.partitions",
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java 
b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
index 1721efe..8fa57c6 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
@@ -72,6 +72,8 @@ public class SQLResponse implements Serializable {
 
     protected boolean queryPushDown = false;
 
+    protected String querySparkPool;
+
     protected byte[] queryStatistics;
     
     protected String traceUrl = null;
@@ -139,6 +141,14 @@ public class SQLResponse implements Serializable {
         isException = v;
     }
 
+    public String getSparkPool() {
+        return querySparkPool;
+    }
+
+    public void setSparkPool(String sparkPool) {
+        querySparkPool = sparkPool;
+    }
+
     public String getExceptionMessage() {
         return exceptionMessage;
     }
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index e8cd931..fd686b5 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -356,6 +356,7 @@ public class QueryService extends BasicService {
         stringBuilder.append("Storage cache used: 
").append(storageCacheUsed).append(newLine);
         stringBuilder.append("Is Query Push-Down: 
").append(isPushDown).append(newLine);
         stringBuilder.append("Is Prepare: 
").append(BackdoorToggles.getPrepareOnly()).append(newLine);
+        stringBuilder.append("Used Spark pool: 
").append(response.getSparkPool()).append(newLine);
         stringBuilder.append("Trace URL: 
").append(response.getTraceUrl()).append(newLine);
         stringBuilder.append("Message: 
").append(response.getExceptionMessage()).append(newLine);
         
stringBuilder.append("==========================[QUERY]===============================").append(newLine);
@@ -632,6 +633,7 @@ public class QueryService extends BasicService {
             QueryContext context = QueryContextFacade.current();
             context.setUsername(userInfo);
             context.setGroups(AclPermissionUtil.getCurrentUserGroups());
+            context.setProject(sqlRequest.getProject());
             final Collection<? extends GrantedAuthority> grantedAuthorities = 
SecurityContextHolder.getContext()
                     .getAuthentication().getAuthorities();
             for (GrantedAuthority grantedAuthority : grantedAuthorities) {
@@ -1163,6 +1165,7 @@ public class QueryService extends BasicService {
         response.setTotalScanCount(queryContext.getScannedRows());
         response.setTotalScanBytes(queryContext.getScannedBytes());
         
response.setCubeSegmentStatisticsList(queryContext.getCubeSegmentStatisticsResultList());
+        response.setSparkPool(queryContext.getSparkPool());
         if (getConfig().isQueryCacheSignatureEnabled()) {
             
response.setSignature(SQLResponseSignatureUtil.createSignature(getConfig(), 
response, projectName));
         }
diff --git 
a/server-base/src/test/java/org/apache/kylin/rest/response/SQLResponseTest.java 
b/server-base/src/test/java/org/apache/kylin/rest/response/SQLResponseTest.java
index 4055a27..f939eb0 100644
--- 
a/server-base/src/test/java/org/apache/kylin/rest/response/SQLResponseTest.java
+++ 
b/server-base/src/test/java/org/apache/kylin/rest/response/SQLResponseTest.java
@@ -34,7 +34,7 @@ public class SQLResponseTest {
     public void testInterfaceConsistency() throws IOException {
         String[] attrArray = new String[] { "columnMetas", "results", "cube", 
"affectedRowCount", "isException",
                 "exceptionMessage", "duration", "partial", "totalScanCount", 
"hitExceptionCache", "storageCacheUsed",
-                "pushDown", "traceUrl", "totalScanBytes" };
+                "sparkPool", "pushDown", "traceUrl", "totalScanBytes" };
 
         SQLResponse sqlResponse = new SQLResponse(null, null, "learn_cube", 
100, false, null, false, false);
         String jsonStr = JsonUtil.writeValueAsString(sqlResponse);

Reply via email to