This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push: new 39086e3 KYLIN-4846 Set the related query id to sparder job description 39086e3 is described below commit 39086e3a279cd92447c9f919147edec6db058685 Author: Zhichao Zhang <441586...@qq.com> AuthorDate: Tue Dec 22 08:45:47 2020 +0800 KYLIN-4846 Set the related query id to sparder job description --- .../org/apache/kylin/query/pushdown/SparkSubmitter.java | 6 ++---- .../org/apache/kylin/query/pushdown/SparkSqlClient.scala | 15 ++++++--------- .../org/apache/kylin/query/runtime/plans/ResultPlan.scala | 3 +-- 3 files changed, 9 insertions(+), 15 deletions(-) diff --git a/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/pushdown/SparkSubmitter.java b/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/pushdown/SparkSubmitter.java index 2d31822..29259aa 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/pushdown/SparkSubmitter.java +++ b/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/pushdown/SparkSubmitter.java @@ -21,19 +21,17 @@ package org.apache.kylin.query.pushdown; import org.apache.kylin.common.util.Pair; import org.apache.kylin.engine.spark.metadata.cube.StructField; import org.apache.spark.sql.SparderContext; -import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; -import java.util.UUID; public class SparkSubmitter { public static final Logger logger = LoggerFactory.getLogger(SparkSubmitter.class); public static PushdownResponse submitPushDownTask(String sql) { - SparkSession ss = SparderContext.getSparkSession(); - Pair<List<List<String>>, List<StructField>> pair = SparkSqlClient.executeSql(ss, sql, UUID.randomUUID()); + Pair<List<List<String>>, List<StructField>> pair = + SparkSqlClient.executeSql(SparderContext.getSparkSession(), sql); SparderContext.closeThreadSparkSession(); return new PushdownResponse(pair.getSecond(), pair.getFirst()); } diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala index 0d8b769..a4064fe 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala @@ -38,22 +38,18 @@ import scala.collection.JavaConverters._ object SparkSqlClient { val logger: Logger = LoggerFactory.getLogger(classOf[SparkSqlClient]) - def executeSql(ss: SparkSession, sql: String, uuid: UUID): Pair[JList[JList[String]], JList[StructField]] = { + def executeSql(ss: SparkSession, sql: String): Pair[JList[JList[String]], JList[StructField]] = { ss.sparkContext.setLocalProperty("spark.scheduler.pool", "query_pushdown") HadoopUtil.setCurrentConfiguration(ss.sparkContext.hadoopConfiguration) - val s = "Start to run sql with SparkSQL..." val queryId = QueryContextFacade.current().getQueryId ss.sparkContext.setLocalProperty(QueryToExecutionIDCache.KYLIN_QUERY_ID_KEY, queryId) - logger.info(s) + logger.info("Start to run sql with SparkSQL...") val df = ss.sql(sql) autoSetShufflePartitions(ss, df) - val msg = "SparkSQL returned result DataFrame" - logger.info(msg) - - DFToList(ss, sql, uuid, df) + DFToList(ss, sql, df) } private def autoSetShufflePartitions(ss: SparkSession, df: DataFrame) = { @@ -74,9 +70,10 @@ object SparkSqlClient { } } - private def DFToList(ss: SparkSession, sql: String, uuid: UUID, df: DataFrame): Pair[JList[JList[String]], JList[StructField]] = { + private def DFToList(ss: SparkSession, sql: String, df: DataFrame): Pair[JList[JList[String]], JList[StructField]] = { val jobGroup = Thread.currentThread.getName - ss.sparkContext.setJobGroup(jobGroup, s"Push down: $sql", interruptOnCancel = true) + ss.sparkContext.setJobGroup(jobGroup, + "Pushdown Query Id: " + QueryContextFacade.current().getQueryId, interruptOnCancel = true) try { val temporarySchema = df.schema.fields.zipWithIndex.map { case (_, index) => s"temporary_$index" diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala index d840207..991a7f2 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala @@ -102,8 +102,7 @@ object ResultPlan extends Logging { QueryContextFacade.current().setDataset(df) sparkContext.setJobGroup(jobGroup, - // QueryContextFacade.current().getSql, - "sparder", + "Query Id: " + QueryContextFacade.current().getQueryId, interruptOnCancel = true) try { val rows = df.collect()