This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/main by this push:
new c9a3140729 KYLIN-5111 Refine query metrics
c9a3140729 is described below
commit c9a3140729da6d28ddb8359b035567d2c4d1eca3
Author: XiaoxiangYu <[email protected]>
AuthorDate: Wed Apr 20 08:52:11 2022 +0800
KYLIN-5111 Refine query metrics
---
.../org/apache/kylin/common/KylinConfigBase.java | 4 +
.../java/org/apache/kylin/common/QueryTrace.java | 1 +
.../kylin/query/runtime/plans/ResultPlan.scala | 12 +-
.../apache/kylin/query/util/SparkJobTrace.scala | 8 +-
.../apache/kylin/query/util/SparkJobTraceV2.scala | 128 +++++++++++++++++++++
.../org/apache/spark/sql/metrics/AppStatus.scala | 60 +++++++++-
6 files changed, 206 insertions(+), 7 deletions(-)
diff --git
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 53f0193423..18dd2ab4e0 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -3339,6 +3339,10 @@ public abstract class KylinConfigBase implements
Serializable {
return
Boolean.parseBoolean(this.getOptional("kylin.query.auto-sparder-context-enabled",
"false"));
}
+ public Integer sparkQueryMetrics() {
+ return Integer.parseInt(this.getOptional("kylin.query.spark.metrics",
"2"));
+ }
+
/**
* whether to enable sparder monitor function
*/
diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryTrace.java
b/core-common/src/main/java/org/apache/kylin/common/QueryTrace.java
index 4387e0a65d..b3e3b8a093 100644
--- a/core-common/src/main/java/org/apache/kylin/common/QueryTrace.java
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryTrace.java
@@ -34,6 +34,7 @@ public class QueryTrace {
public static final String WAIT_FOR_EXECUTION = "WAIT_FOR_EXECUTION";
public static final String EXECUTION = "EXECUTION";
public static final String FETCH_RESULT = "FETCH_RESULT";
+ public static final String CALCULATE_STAT = "CALCULATE_STAT";
// group name
static final String PREPARATION = "PREPARATION";
diff --git
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
index c0558b58b4..7c95636e31 100644
---
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
+++
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
@@ -27,13 +27,14 @@ import org.apache.kylin.common.{KylinConfig, QueryContext,
QueryContextFacade}
import org.apache.kylin.common.util.HadoopUtil
import org.apache.kylin.metadata.project.ProjectManager
import org.apache.kylin.query.runtime.plans.ResultType.ResultType
-import org.apache.kylin.query.util.SparkJobTrace
+import org.apache.kylin.query.util.{AbstractSparkJobTrace, SparkJobTrace,
SparkJobTraceV2}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SparderContext}
import org.apache.spark.sql.hive.utils.QueryMetricUtils
import org.apache.spark.sql.utils.SparkTypeUtil
import org.apache.spark.utils.SparderUtils
+import java.util.TimeZone
import scala.collection.JavaConverters._
// scalastyle:off
@@ -107,7 +108,14 @@ object ResultPlan extends Logging {
interruptOnCancel = true)
val currentTrace = QueryContextFacade.current().getQueryTrace
currentTrace.endLastSpan()
- val jobTrace = new SparkJobTrace(jobGroup, currentTrace, sparkContext)
+
+ val jobTrace = if(kylinConfig.sparkQueryMetrics == 2) {
+ new SparkJobTraceV2(jobGroup, currentTrace, sparkContext,
TimeZone.getTimeZone(kylinConfig.getTimeZone).toZoneId)
+ } else if(kylinConfig.sparkQueryMetrics == 1) {
+ new SparkJobTrace(jobGroup, currentTrace, sparkContext)
+ } else {
+ new AbstractSparkJobTrace()
+ }
try {
val rows = df.collect()
jobTrace.jobFinished()
diff --git
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/util/SparkJobTrace.scala
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/util/SparkJobTrace.scala
index e6de31a695..99bad7f047 100644
---
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/util/SparkJobTrace.scala
+++
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/util/SparkJobTrace.scala
@@ -29,7 +29,7 @@ import org.apache.spark.utils.LogEx
class SparkJobTrace(jobGroup: String,
queryTrace: QueryTrace,
sparkContext: SparkContext,
- startAt: Long = System.currentTimeMillis()) extends LogEx {
+ startAt: Long = System.currentTimeMillis()) extends
AbstractSparkJobTrace {
val appStatus = new AppStatus(sparkContext)
@@ -53,7 +53,7 @@ class SparkJobTrace(jobGroup: String,
* Long, it may imply the executor-core config is not insufficient for the
number of tasks,
* or the cluster is in heavy work load
*/
- def jobFinished(): Unit = {
+ override def jobFinished(): Unit = {
try {
val jobDataSeq = appStatus.getJobData(jobGroup)
@@ -116,3 +116,7 @@ class SparkJobTrace(jobGroup: String,
}
}
+class AbstractSparkJobTrace extends LogEx {
+ def jobFinished(): Unit = {}
+}
+
diff --git
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/util/SparkJobTraceV2.scala
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/util/SparkJobTraceV2.scala
new file mode 100644
index 0000000000..396a73d650
--- /dev/null
+++
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/util/SparkJobTraceV2.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.query.util
+
+import org.apache.kylin.common.QueryTrace
+import org.apache.spark.SparkContext
+
+import java.time.ZoneId
+import java.time.format.DateTimeFormatter
+import java.util.{Date, Locale}
+
+class SparkJobTraceV2(jobGroup: String,
+ queryTrace: QueryTrace,
+ sparkContext: SparkContext,
+ zoneId: ZoneId,
+ detailEnabled: Boolean = false,
+ startAt: Long = System.currentTimeMillis()) extends
SparkJobTrace(jobGroup: String,
+ queryTrace: QueryTrace,
+ sparkContext: SparkContext,
+ startAt: Long) {
+
+ val PATTERN = "HH:mm:ss.SSS"
+ val bytesRead = "bytesRead"
+ val stageInfo = "stageInfo"
+ val runningTime = "runningTime"
+ val launchTime = "launchTime"
+
+
+ def dateStr(ts: Long): String = {
+ new
Date(ts).toInstant.atZone(zoneId).toLocalDateTime.format(DateTimeFormatter.ofPattern(PATTERN,
Locale.ROOT))
+ }
+
+ def dateStr(ts: java.util.Date): String = {
+
ts.toInstant.atZone(zoneId).toLocalDateTime.format(DateTimeFormatter.ofPattern(PATTERN,
Locale.ROOT))
+ }
+
+ override def jobFinished(): Unit = {
+ logDebug("Query job finished.")
+ val finishedAt = System.currentTimeMillis()
+ try {
+ val jobDataSeq = appStatus.getJobData(jobGroup)
+ val firstSubmissionTime = jobDataSeq.map(_.submissionTime).min
+ val lastCompletionTime = jobDataSeq.map(_.completionTime).max
+ var firstLaunchTime = -1L
+
+ if (jobDataSeq.isEmpty) {
+ endAbnormalExecutionTrace()
+ return
+ }
+
+ jobDataSeq.foreach(
+ job => {
+ val submissionTime = dateStr(job.submissionTime.getOrElse(new
java.util.Date(0)))
+ val completionTime = dateStr(job.completionTime.getOrElse(new
java.util.Date(0)))
+ val killedDesc = if (job.killedTasksSummary.isEmpty) {
+ "EMPTY"
+ } else {
+ job.killedTasksSummary.mkString("|")
+ }
+ val stageMetrics = job.stageIds.flatMap(stageId =>
appStatus.calStageMetrics(stageId)).map(
+ stage => if (stage(bytesRead).nonEmpty &&
stage(runningTime).nonEmpty) {
+ if (firstLaunchTime == -1 || stage(launchTime).apply(0) <
firstLaunchTime) firstLaunchTime = stage(launchTime).apply(0)
+ ("Stage:%d,%d, launch time of first and last task are %s and %s,
" +
+ " bytesRead is [%s], runningTime is [%s]").format(
+ stage(stageInfo).apply(0),
+ stage(stageInfo).apply(1),
+ dateStr(stage(launchTime).apply(0)),
+ dateStr(stage(launchTime).apply(1)),
+ stage(bytesRead).mkString(","),
+ stage(runningTime).mkString(", ")
+ )
+ } else {
+ ("Stage:%d,%d").format(stage(stageInfo).apply(0),
stage(stageInfo).apply(1))
+ }).mkString(";")
+
+ logInfo(
+ s"Job ${job.jobId} is submitted at ${submissionTime} and completed
at ${completionTime}.It has ${job.numTasks} tasks, " +
+ s"succeed ${job.numCompletedTasks} tasks, ${job.numFailedTasks}
failed tasks,${job.numKilledTasks} killed tasks. " +
+ s"Killed tasks info: ${killedDesc}. Stages ${stageMetrics}.")
+ }
+ )
+
+ var jobExecutionTimeByKylin = finishedAt - startAt
+ if (firstSubmissionTime.isDefined) {
+ queryTrace.amendLast(QueryTrace.PREPARE_AND_SUBMIT_JOB,
firstSubmissionTime.get.getTime)
+ } else {
+ logInfo("No firstSubmissionTime")
+ }
+ if (firstSubmissionTime.isDefined && lastCompletionTime.isDefined) {
+ val jobExecutionTimeBySpark = lastCompletionTime.get.getTime -
firstSubmissionTime.get.getTime
+ logInfo(s"jobExecutionTime will change from ${jobExecutionTimeByKylin}
to ${jobExecutionTimeBySpark} .")
+ logInfo(s"Kylin submit query at ${dateStr(startAt)}" +
+ s", first spark job submitted at
${dateStr(firstSubmissionTime.get.getTime)}" +
+ s", last spark job finished at
${dateStr(lastCompletionTime.get.getTime)}" +
+ s", query finished at ${dateStr(finishedAt)} .")
+ jobExecutionTimeByKylin = jobExecutionTimeBySpark
+
+ queryTrace.appendSpan(QueryTrace.WAIT_FOR_EXECUTION, firstLaunchTime -
firstSubmissionTime.get.getTime)
+ queryTrace.appendSpan(QueryTrace.EXECUTION,
lastCompletionTime.get.getTime - firstLaunchTime)
+ queryTrace.appendSpan(QueryTrace.FETCH_RESULT, finishedAt.longValue -
lastCompletionTime.get.getTime)
+ queryTrace.appendSpan(QueryTrace.CALCULATE_STAT,
System.currentTimeMillis - finishedAt)
+ } else {
+ logInfo("No firstSubmissionTime or lastCompletionTime")
+ }
+ } catch {
+ case e =>
+ logWarning(s"Failed trace spark job execution for $jobGroup", e)
+ endAbnormalExecutionTrace()
+ }
+ }
+}
+
diff --git
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/metrics/AppStatus.scala
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/metrics/AppStatus.scala
index e1eb6f82dc..084faead3c 100644
---
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/metrics/AppStatus.scala
+++
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/metrics/AppStatus.scala
@@ -18,12 +18,66 @@
package org.apache.spark.sql.metrics
-import org.apache.spark.{SparkContext, SparkStageInfo}
+import org.apache.spark.status.api.v1
import org.apache.spark.status.{TaskDataWrapper, TaskIndexNames}
import org.apache.spark.util.Utils
-import org.apache.spark.status.api.v1
+import org.apache.spark.utils.LogEx
+import org.apache.spark.{SparkContext, SparkStageInfo}
+
+class AppStatus(sparkContext: SparkContext) extends LogEx {
+
+ val defaultUnsortedQuantiles: Array[Double] = Array(0, 0.25, 0.5, 0.75, 1.0)
+
+ def calStageMetrics(stageId: Int, unsortedQuantiles: Array[Double] =
defaultUnsortedQuantiles):
+ Seq[Map[String, IndexedSeq[Long]]] = {
+ val stageDataList: Seq[v1.StageData] =
sparkContext.statusStore.stageData(stageId)
+ stageDataList.map(s => {
+
+ // 1. Get launch time of first and last task
+ val stageKey = Array(s.stageId, s.attemptId)
+ val taskIterator =
sparkContext.statusStore.store.view(classOf[TaskDataWrapper])
+ .parent(stageKey)
+ .index(TaskIndexNames.LAUNCH_TIME)
+ .iterator()
+
+ var firstLaunchTime = -1L
+ var lastLaunchTime = -1L
+ while (taskIterator.hasNext) {
+ val lt = taskIterator.next().launchTime
+ if (firstLaunchTime == -1) {
+ firstLaunchTime = lt
+ } else if (lt < firstLaunchTime) {
+ firstLaunchTime = lt
+ }
+ if (lastLaunchTime == -1) {
+ lastLaunchTime = lt
+ } else if (lt > lastLaunchTime) {
+ lastLaunchTime = lt
+ }
+ }
+
+ // 2. Get distribution of bytesRead & runningTime
+ val metricsDistribution =
sparkContext.statusStore.taskSummary(s.stageId, s.attemptId, unsortedQuantiles)
+ val bytesRead: IndexedSeq[Long] = if (metricsDistribution.nonEmpty) {
+ metricsDistribution.get.inputMetrics.bytesRead.map(x => x.toLong)
+ } else {
+ IndexedSeq.empty
+ }
-class AppStatus(sparkContext: SparkContext) {
+ val runningTime: IndexedSeq[Long] = if (metricsDistribution.nonEmpty) {
+ metricsDistribution.get.executorRunTime.map(x => x.toLong)
+ } else {
+ IndexedSeq.empty
+ }
+
+ Map[String, IndexedSeq[Long]](
+ "stageInfo" -> IndexedSeq(s.stageId, s.attemptId),
+ "launchTime" -> IndexedSeq(firstLaunchTime, lastLaunchTime),
+ "bytesRead" -> bytesRead,
+ "runningTime" -> runningTime
+ )
+ }).seq
+ }
def getTaskLaunchTime(stageId: Int, quantile: Double): Double = {
scanTasks(stageId, TaskIndexNames.LAUNCH_TIME, quantile) { t =>
t.launchTime }