This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this 
push:
     new 39086e3  KYLIN-4846 Set the related query id to sparder job description
39086e3 is described below

commit 39086e3a279cd92447c9f919147edec6db058685
Author: Zhichao Zhang <441586...@qq.com>
AuthorDate: Tue Dec 22 08:45:47 2020 +0800

    KYLIN-4846 Set the related query id to sparder job description
---
 .../org/apache/kylin/query/pushdown/SparkSubmitter.java   |  6 ++----
 .../org/apache/kylin/query/pushdown/SparkSqlClient.scala  | 15 ++++++---------
 .../org/apache/kylin/query/runtime/plans/ResultPlan.scala |  3 +--
 3 files changed, 9 insertions(+), 15 deletions(-)

diff --git 
a/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/pushdown/SparkSubmitter.java
 
b/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/pushdown/SparkSubmitter.java
index 2d31822..29259aa 100644
--- 
a/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/pushdown/SparkSubmitter.java
+++ 
b/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/pushdown/SparkSubmitter.java
@@ -21,19 +21,17 @@ package org.apache.kylin.query.pushdown;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.engine.spark.metadata.cube.StructField;
 import org.apache.spark.sql.SparderContext;
-import org.apache.spark.sql.SparkSession;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
-import java.util.UUID;
 
 public class SparkSubmitter {
     public static final Logger logger = 
LoggerFactory.getLogger(SparkSubmitter.class);
 
     public static PushdownResponse submitPushDownTask(String sql) {
-        SparkSession ss = SparderContext.getSparkSession();
-        Pair<List<List<String>>, List<StructField>> pair = 
SparkSqlClient.executeSql(ss, sql, UUID.randomUUID());
+        Pair<List<List<String>>, List<StructField>> pair =
+                SparkSqlClient.executeSql(SparderContext.getSparkSession(), 
sql);
         SparderContext.closeThreadSparkSession();
         return new PushdownResponse(pair.getSecond(), pair.getFirst());
     }
diff --git 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
index 0d8b769..a4064fe 100644
--- 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
+++ 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
@@ -38,22 +38,18 @@ import scala.collection.JavaConverters._
 object SparkSqlClient {
        val logger: Logger = LoggerFactory.getLogger(classOf[SparkSqlClient])
 
-       def executeSql(ss: SparkSession, sql: String, uuid: UUID): 
Pair[JList[JList[String]], JList[StructField]] = {
+       def executeSql(ss: SparkSession, sql: String): 
Pair[JList[JList[String]], JList[StructField]] = {
                ss.sparkContext.setLocalProperty("spark.scheduler.pool", 
"query_pushdown")
                
HadoopUtil.setCurrentConfiguration(ss.sparkContext.hadoopConfiguration)
-               val s = "Start to run sql with SparkSQL..."
                val queryId = QueryContextFacade.current().getQueryId
                
ss.sparkContext.setLocalProperty(QueryToExecutionIDCache.KYLIN_QUERY_ID_KEY, 
queryId)
-               logger.info(s)
+               logger.info("Start to run sql with SparkSQL...")
 
                val df = ss.sql(sql)
 
                autoSetShufflePartitions(ss, df)
 
-               val msg = "SparkSQL returned result DataFrame"
-               logger.info(msg)
-
-               DFToList(ss, sql, uuid, df)
+               DFToList(ss, sql, df)
        }
 
        private def autoSetShufflePartitions(ss: SparkSession, df: DataFrame) = 
{
@@ -74,9 +70,10 @@ object SparkSqlClient {
                }
        }
 
-       private def DFToList(ss: SparkSession, sql: String, uuid: UUID, df: 
DataFrame): Pair[JList[JList[String]], JList[StructField]] = {
+       private def DFToList(ss: SparkSession, sql: String, df: DataFrame): 
Pair[JList[JList[String]], JList[StructField]] = {
                val jobGroup = Thread.currentThread.getName
-               ss.sparkContext.setJobGroup(jobGroup, s"Push down: $sql", 
interruptOnCancel = true)
+               ss.sparkContext.setJobGroup(jobGroup,
+                       "Pushdown Query Id: " + 
QueryContextFacade.current().getQueryId, interruptOnCancel = true)
                try {
                        val temporarySchema = df.schema.fields.zipWithIndex.map 
{
                                case (_, index) => s"temporary_$index"
diff --git 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
index d840207..991a7f2 100644
--- 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
+++ 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
@@ -102,8 +102,7 @@ object ResultPlan extends Logging {
     QueryContextFacade.current().setDataset(df)
 
     sparkContext.setJobGroup(jobGroup,
-      //      QueryContextFacade.current().getSql,
-      "sparder",
+      "Query Id: " + QueryContextFacade.current().getQueryId,
       interruptOnCancel = true)
     try {
       val rows = df.collect()

Reply via email to