This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/main by this push:
     new c9a3140729 KYLIN-5111 Refine query metrics
c9a3140729 is described below

commit c9a3140729da6d28ddb8359b035567d2c4d1eca3
Author: XiaoxiangYu <[email protected]>
AuthorDate: Wed Apr 20 08:52:11 2022 +0800

    KYLIN-5111 Refine query metrics
---
 .../org/apache/kylin/common/KylinConfigBase.java   |   4 +
 .../java/org/apache/kylin/common/QueryTrace.java   |   1 +
 .../kylin/query/runtime/plans/ResultPlan.scala     |  12 +-
 .../apache/kylin/query/util/SparkJobTrace.scala    |   8 +-
 .../apache/kylin/query/util/SparkJobTraceV2.scala  | 128 +++++++++++++++++++++
 .../org/apache/spark/sql/metrics/AppStatus.scala   |  60 +++++++++-
 6 files changed, 206 insertions(+), 7 deletions(-)

diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 53f0193423..18dd2ab4e0 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -3339,6 +3339,10 @@ public abstract class KylinConfigBase implements 
Serializable {
         return 
Boolean.parseBoolean(this.getOptional("kylin.query.auto-sparder-context-enabled",
 "false"));
     }
 
+    public Integer sparkQueryMetrics() {
+        return Integer.parseInt(this.getOptional("kylin.query.spark.metrics", 
"2"));
+    }
+
     /**
      * whether to enable sparder monitor function
      */
diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryTrace.java 
b/core-common/src/main/java/org/apache/kylin/common/QueryTrace.java
index 4387e0a65d..b3e3b8a093 100644
--- a/core-common/src/main/java/org/apache/kylin/common/QueryTrace.java
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryTrace.java
@@ -34,6 +34,7 @@ public class QueryTrace {
     public static final String WAIT_FOR_EXECUTION = "WAIT_FOR_EXECUTION";
     public static final String EXECUTION = "EXECUTION";
     public static final String FETCH_RESULT = "FETCH_RESULT";
+    public static final String CALCULATE_STAT = "CALCULATE_STAT";
 
     // group name
     static final String PREPARATION = "PREPARATION";
diff --git 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
index c0558b58b4..7c95636e31 100644
--- 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
+++ 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
@@ -27,13 +27,14 @@ import org.apache.kylin.common.{KylinConfig, QueryContext, 
QueryContextFacade}
 import org.apache.kylin.common.util.HadoopUtil
 import org.apache.kylin.metadata.project.ProjectManager
 import org.apache.kylin.query.runtime.plans.ResultType.ResultType
-import org.apache.kylin.query.util.SparkJobTrace
+import org.apache.kylin.query.util.{AbstractSparkJobTrace, SparkJobTrace, 
SparkJobTraceV2}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{DataFrame, SparderContext}
 import org.apache.spark.sql.hive.utils.QueryMetricUtils
 import org.apache.spark.sql.utils.SparkTypeUtil
 import org.apache.spark.utils.SparderUtils
 
+import java.util.TimeZone
 import scala.collection.JavaConverters._
 
 // scalastyle:off
@@ -107,7 +108,14 @@ object ResultPlan extends Logging {
       interruptOnCancel = true)
     val currentTrace = QueryContextFacade.current().getQueryTrace
     currentTrace.endLastSpan()
-    val jobTrace = new SparkJobTrace(jobGroup, currentTrace, sparkContext)
+
+    val jobTrace = if(kylinConfig.sparkQueryMetrics == 2) {
+      new SparkJobTraceV2(jobGroup, currentTrace, sparkContext, 
TimeZone.getTimeZone(kylinConfig.getTimeZone).toZoneId)
+    } else if(kylinConfig.sparkQueryMetrics == 1)  {
+      new SparkJobTrace(jobGroup, currentTrace, sparkContext)
+    } else {
+      new AbstractSparkJobTrace()
+    }
     try {
       val rows = df.collect()
       jobTrace.jobFinished()
diff --git 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/util/SparkJobTrace.scala
 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/util/SparkJobTrace.scala
index e6de31a695..99bad7f047 100644
--- 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/util/SparkJobTrace.scala
+++ 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/util/SparkJobTrace.scala
@@ -29,7 +29,7 @@ import org.apache.spark.utils.LogEx
 class SparkJobTrace(jobGroup: String,
                     queryTrace: QueryTrace,
                     sparkContext: SparkContext,
-                    startAt: Long = System.currentTimeMillis()) extends LogEx {
+                    startAt: Long = System.currentTimeMillis()) extends 
AbstractSparkJobTrace {
 
   val appStatus = new AppStatus(sparkContext)
 
@@ -53,7 +53,7 @@ class SparkJobTrace(jobGroup: String,
    * Long, it may imply the executor-core config is not insufficient for the 
number of tasks,
    * or the cluster is in heavy work load
    */
-  def jobFinished(): Unit = {
+  override def jobFinished(): Unit = {
     try {
       val jobDataSeq = appStatus.getJobData(jobGroup)
 
@@ -116,3 +116,7 @@ class SparkJobTrace(jobGroup: String,
   }
 }
 
+class AbstractSparkJobTrace extends LogEx {
+  def jobFinished(): Unit = {}
+}
+
diff --git 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/util/SparkJobTraceV2.scala
 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/util/SparkJobTraceV2.scala
new file mode 100644
index 0000000000..396a73d650
--- /dev/null
+++ 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/util/SparkJobTraceV2.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.query.util
+
+import org.apache.kylin.common.QueryTrace
+import org.apache.spark.SparkContext
+
+import java.time.ZoneId
+import java.time.format.DateTimeFormatter
+import java.util.{Date, Locale}
+
+class SparkJobTraceV2(jobGroup: String,
+                      queryTrace: QueryTrace,
+                      sparkContext: SparkContext,
+                      zoneId: ZoneId,
+                      detailEnabled: Boolean = false,
+                      startAt: Long = System.currentTimeMillis()) extends 
SparkJobTrace(jobGroup: String,
+  queryTrace: QueryTrace,
+  sparkContext: SparkContext,
+  startAt: Long) {
+
+  val PATTERN = "HH:mm:ss.SSS"
+  val bytesRead = "bytesRead"
+  val stageInfo = "stageInfo"
+  val runningTime = "runningTime"
+  val launchTime = "launchTime"
+
+
+  def dateStr(ts: Long): String = {
+    new 
Date(ts).toInstant.atZone(zoneId).toLocalDateTime.format(DateTimeFormatter.ofPattern(PATTERN,
 Locale.ROOT))
+  }
+
+  def dateStr(ts: java.util.Date): String = {
+    
ts.toInstant.atZone(zoneId).toLocalDateTime.format(DateTimeFormatter.ofPattern(PATTERN,
 Locale.ROOT))
+  }
+
+  override def jobFinished(): Unit = {
+    logDebug("Query job finished.")
+    val finishedAt = System.currentTimeMillis()
+    try {
+      val jobDataSeq = appStatus.getJobData(jobGroup)
+      val firstSubmissionTime = jobDataSeq.map(_.submissionTime).min
+      val lastCompletionTime = jobDataSeq.map(_.completionTime).max
+      var firstLaunchTime = -1L
+
+      if (jobDataSeq.isEmpty) {
+        endAbnormalExecutionTrace()
+        return
+      }
+
+      jobDataSeq.foreach(
+        job => {
+          val submissionTime = dateStr(job.submissionTime.getOrElse(new 
java.util.Date(0)))
+          val completionTime = dateStr(job.completionTime.getOrElse(new 
java.util.Date(0)))
+          val killedDesc = if (job.killedTasksSummary.isEmpty) {
+            "EMPTY"
+          } else {
+            job.killedTasksSummary.mkString("|")
+          }
+          val stageMetrics = job.stageIds.flatMap(stageId => 
appStatus.calStageMetrics(stageId)).map(
+            stage => if (stage(bytesRead).nonEmpty && 
stage(runningTime).nonEmpty) {
+              if (firstLaunchTime == -1 || stage(launchTime).apply(0) < 
firstLaunchTime) firstLaunchTime = stage(launchTime).apply(0)
+              ("Stage:%d,%d, launch time of first and last task are %s and %s, 
" +
+                " bytesRead is [%s], runningTime is [%s]").format(
+                stage(stageInfo).apply(0),
+                stage(stageInfo).apply(1),
+                dateStr(stage(launchTime).apply(0)),
+                dateStr(stage(launchTime).apply(1)),
+                stage(bytesRead).mkString(","),
+                stage(runningTime).mkString(", ")
+              )
+            } else {
+              ("Stage:%d,%d").format(stage(stageInfo).apply(0), 
stage(stageInfo).apply(1))
+            }).mkString(";")
+
+          logInfo(
+            s"Job ${job.jobId} is submitted at ${submissionTime} and completed 
at ${completionTime}.It has ${job.numTasks} tasks, " +
+              s"succeed ${job.numCompletedTasks} tasks, ${job.numFailedTasks} 
failed tasks,${job.numKilledTasks} killed tasks. " +
+              s"Killed tasks info: ${killedDesc}. Stages ${stageMetrics}.")
+        }
+      )
+
+      var jobExecutionTimeByKylin = finishedAt - startAt
+      if (firstSubmissionTime.isDefined) {
+        queryTrace.amendLast(QueryTrace.PREPARE_AND_SUBMIT_JOB, 
firstSubmissionTime.get.getTime)
+      } else {
+        logInfo("No firstSubmissionTime")
+      }
+      if (firstSubmissionTime.isDefined && lastCompletionTime.isDefined) {
+        val jobExecutionTimeBySpark = lastCompletionTime.get.getTime - 
firstSubmissionTime.get.getTime
+        logInfo(s"jobExecutionTime will change from ${jobExecutionTimeByKylin} 
to ${jobExecutionTimeBySpark} .")
+        logInfo(s"Kylin submit query at ${dateStr(startAt)}" +
+          s", first spark job submitted at 
${dateStr(firstSubmissionTime.get.getTime)}" +
+          s", last spark job finished at 
${dateStr(lastCompletionTime.get.getTime)}" +
+          s", query finished at ${dateStr(finishedAt)} .")
+        jobExecutionTimeByKylin = jobExecutionTimeBySpark
+
+        queryTrace.appendSpan(QueryTrace.WAIT_FOR_EXECUTION, firstLaunchTime - 
firstSubmissionTime.get.getTime)
+        queryTrace.appendSpan(QueryTrace.EXECUTION, 
lastCompletionTime.get.getTime - firstLaunchTime)
+        queryTrace.appendSpan(QueryTrace.FETCH_RESULT, finishedAt.longValue - 
lastCompletionTime.get.getTime)
+        queryTrace.appendSpan(QueryTrace.CALCULATE_STAT, 
System.currentTimeMillis - finishedAt)
+      } else {
+        logInfo("No firstSubmissionTime or lastCompletionTime")
+      }
+    } catch {
+      case e =>
+        logWarning(s"Failed trace spark job execution for $jobGroup", e)
+        endAbnormalExecutionTrace()
+    }
+  }
+}
+
diff --git 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/metrics/AppStatus.scala
 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/metrics/AppStatus.scala
index e1eb6f82dc..084faead3c 100644
--- 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/metrics/AppStatus.scala
+++ 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/metrics/AppStatus.scala
@@ -18,12 +18,66 @@
 
 package org.apache.spark.sql.metrics
 
-import org.apache.spark.{SparkContext, SparkStageInfo}
+import org.apache.spark.status.api.v1
 import org.apache.spark.status.{TaskDataWrapper, TaskIndexNames}
 import org.apache.spark.util.Utils
-import org.apache.spark.status.api.v1
+import org.apache.spark.utils.LogEx
+import org.apache.spark.{SparkContext, SparkStageInfo}
+
+class AppStatus(sparkContext: SparkContext) extends LogEx {
+
+  val defaultUnsortedQuantiles: Array[Double] = Array(0, 0.25, 0.5, 0.75, 1.0)
+
+  def calStageMetrics(stageId: Int, unsortedQuantiles: Array[Double] = 
defaultUnsortedQuantiles):
+  Seq[Map[String, IndexedSeq[Long]]] = {
+    val stageDataList: Seq[v1.StageData] = 
sparkContext.statusStore.stageData(stageId)
+    stageDataList.map(s => {
+
+      // 1. Get launch time of first and last task
+      val stageKey = Array(s.stageId, s.attemptId)
+      val taskIterator = 
sparkContext.statusStore.store.view(classOf[TaskDataWrapper])
+        .parent(stageKey)
+        .index(TaskIndexNames.LAUNCH_TIME)
+        .iterator()
+
+      var firstLaunchTime = -1L
+      var lastLaunchTime = -1L
+      while (taskIterator.hasNext) {
+        val lt = taskIterator.next().launchTime
+        if (firstLaunchTime == -1) {
+          firstLaunchTime = lt
+        } else if (lt < firstLaunchTime) {
+          firstLaunchTime = lt
+        }
+        if (lastLaunchTime == -1) {
+          lastLaunchTime = lt
+        } else if (lt > lastLaunchTime) {
+          lastLaunchTime = lt
+        }
+      }
+
+      // 2. Get distribution of bytesRead & runningTime
+      val metricsDistribution = 
sparkContext.statusStore.taskSummary(s.stageId, s.attemptId, unsortedQuantiles)
+      val bytesRead: IndexedSeq[Long] = if (metricsDistribution.nonEmpty) {
+        metricsDistribution.get.inputMetrics.bytesRead.map(x => x.toLong)
+      } else {
+        IndexedSeq.empty
+      }
 
-class AppStatus(sparkContext: SparkContext) {
+      val runningTime: IndexedSeq[Long] = if (metricsDistribution.nonEmpty) {
+        metricsDistribution.get.executorRunTime.map(x => x.toLong)
+      } else {
+        IndexedSeq.empty
+      }
+
+      Map[String, IndexedSeq[Long]](
+        "stageInfo" -> IndexedSeq(s.stageId, s.attemptId),
+        "launchTime" -> IndexedSeq(firstLaunchTime, lastLaunchTime),
+        "bytesRead" -> bytesRead,
+        "runningTime" -> runningTime
+      )
+    }).seq
+  }
 
   def getTaskLaunchTime(stageId: Int, quantile: Double): Double = {
     scanTasks(stageId, TaskIndexNames.LAUNCH_TIME, quantile) { t => 
t.launchTime }

Reply via email to