This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new a7c58b1 [SPARK-31275][WEBUI] Improve the metrics format in
ExecutionPage for StageId
a7c58b1 is described below
commit a7c58b1ae5a05f509a034ca410a50c41ce94cf5f
Author: Kousuke Saruta <[email protected]>
AuthorDate: Fri Mar 27 13:35:28 2020 +0800
[SPARK-31275][WEBUI] Improve the metrics format in ExecutionPage for StageId
### What changes were proposed in this pull request?
In ExecutionPage, metrics format for stageId, attemptId and taskId are
displayed like `(stageId (attemptId): taskId)` for now.
I changed this format like `(stageId.attemptId taskId)`.
### Why are the changes needed?
As cloud-fan suggested
[here](https://github.com/apache/spark/pull/27927#discussion_r398591519),
`stageId.attemptId` is more standard in Spark.
### Does this PR introduce any user-facing change?
Yes. Before applying this change, we can see the UI like as follows.

And after this change applied, we can like as follows.

### How was this patch tested?
Modified `SQLMetricsSuite` and manual test.
Closes #28039 from sarutak/improve-metrics-format.
Authored-by: Kousuke Saruta <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit bc37fdc77130ce4f60806db0bb2b1b8914452040)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/execution/ui/static/spark-sql-viz.js | 2 +-
.../spark/sql/execution/metric/SQLMetrics.scala | 12 ++++++------
.../spark/sql/execution/ui/ExecutionPage.scala | 2 +-
.../spark/sql/execution/metric/SQLMetricsSuite.scala | 20 ++++++++++----------
.../sql/execution/metric/SQLMetricsTestUtils.scala | 18 +++++++++---------
5 files changed, 27 insertions(+), 27 deletions(-)
diff --git
a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.js
b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.js
index b23ae9a..bded921 100644
---
a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.js
+++
b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.js
@@ -78,7 +78,7 @@ function setupTooltipForSparkPlanNode(nodeId) {
// labelSeparator should be a non-graphical character in order not to affect
the width of boxes.
var labelSeparator = "\x01";
-var stageAndTaskMetricsPattern = "^(.*)(\\(stage.*attempt.*task[^)]*\\))(.*)$";
+var stageAndTaskMetricsPattern = "^(.*)(\\(stage.*task[^)]*\\))(.*)$";
/*
* Helper function to pre-process the graph layout.
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index 65aabe0..1394e0f 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -116,7 +116,7 @@ object SQLMetrics {
// data size total (min, med, max):
// 100GB (100MB, 1GB, 10GB)
val acc = new SQLMetric(SIZE_METRIC, -1)
- acc.register(sc, name = Some(s"$name total (min, med, max (stageId
(attemptId): taskId))"),
+ acc.register(sc, name = Some(s"$name total (min, med, max (stageId:
taskId))"),
countFailedValues = false)
acc
}
@@ -126,7 +126,7 @@ object SQLMetrics {
// duration(min, med, max):
// 5s (800ms, 1s, 2s)
val acc = new SQLMetric(TIMING_METRIC, -1)
- acc.register(sc, name = Some(s"$name total (min, med, max (stageId
(attemptId): taskId))"),
+ acc.register(sc, name = Some(s"$name total (min, med, max (stageId:
taskId))"),
countFailedValues = false)
acc
}
@@ -134,7 +134,7 @@ object SQLMetrics {
def createNanoTimingMetric(sc: SparkContext, name: String): SQLMetric = {
// Same with createTimingMetric, just normalize the unit of time to
millisecond.
val acc = new SQLMetric(NS_TIMING_METRIC, -1)
- acc.register(sc, name = Some(s"$name total (min, med, max (stageId
(attemptId): taskId))"),
+ acc.register(sc, name = Some(s"$name total (min, med, max (stageId:
taskId))"),
countFailedValues = false)
acc
}
@@ -150,7 +150,7 @@ object SQLMetrics {
// probe avg (min, med, max):
// (1.2, 2.2, 6.3)
val acc = new SQLMetric(AVERAGE_METRIC)
- acc.register(sc, name = Some(s"$name (min, med, max (stageId (attemptId):
taskId))"),
+ acc.register(sc, name = Some(s"$name (min, med, max (stageId: taskId))"),
countFailedValues = false)
acc
}
@@ -169,11 +169,11 @@ object SQLMetrics {
* and represent it in string for a SQL physical operator.
*/
def stringValue(metricsType: String, values: Array[Long], maxMetrics:
Array[Long]): String = {
- // stringMetric = "(driver)" OR (stage $stageId (attempt $attemptId): task
$taskId))
+ // stringMetric = "(driver)" OR (stage ${stageId}.${attemptId}: task
$taskId)
val stringMetric = if (maxMetrics.isEmpty) {
"(driver)"
} else {
- s"(stage ${maxMetrics(1)} (attempt ${maxMetrics(2)}): task
${maxMetrics(3)})"
+ s"(stage ${maxMetrics(1)}.${maxMetrics(2)}: task ${maxMetrics(3)})"
}
if (metricsType == SUM_METRIC) {
val numberFormat = NumberFormat.getIntegerInstance(Locale.US)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
index ca146e7..e3e2278 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
@@ -73,7 +73,7 @@ class ExecutionPage(parent: SQLTab) extends
WebUIPage("execution") with Logging
</div>
<div>
<input type="checkbox" id="stageId-and-taskId-checkbox"></input>
- <span>Show the Stage (Stage Attempt): Task ID that corresponds to
the max metric</span>
+ <span>Show the Stage ID and Task ID that corresponds to the max
metric</span>
</div>
val metrics = sqlStore.executionMetrics(executionId)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 7d09577..11f93c8 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -98,7 +98,7 @@ class SQLMetricsSuite extends SharedSparkSession with
SQLMetricsTestUtils {
val ds = spark.range(10).filter('id < 5)
testSparkPlanMetricsWithPredicates(ds.toDF(), 1, Map(
0L -> (("WholeStageCodegen (1)", Map(
- "duration total (min, med, max (stageId (attemptId): taskId))" -> {
+ "duration total (min, med, max (stageId: taskId))" -> {
_.toString.matches(timingMetricPattern)
})))), true)
}
@@ -110,10 +110,10 @@ class SQLMetricsSuite extends SharedSparkSession with
SQLMetricsTestUtils {
val df = testData2.groupBy().count() // 2 partitions
val expected1 = Seq(
Map("number of output rows" -> 2L,
- "avg hash probe bucket list iters (min, med, max (stageId (attemptId):
taskId))" ->
+ "avg hash probe bucket list iters (min, med, max (stageId: taskId))" ->
aggregateMetricsPattern),
Map("number of output rows" -> 1L,
- "avg hash probe bucket list iters (min, med, max (stageId (attemptId):
taskId))" ->
+ "avg hash probe bucket list iters (min, med, max (stageId: taskId))" ->
aggregateMetricsPattern))
val shuffleExpected1 = Map(
"records read" -> 2L,
@@ -130,10 +130,10 @@ class SQLMetricsSuite extends SharedSparkSession with
SQLMetricsTestUtils {
val df2 = testData2.groupBy('a).count()
val expected2 = Seq(
Map("number of output rows" -> 4L,
- "avg hash probe bucket list iters (min, med, max (stageId (attemptId):
taskId))" ->
+ "avg hash probe bucket list iters (min, med, max (stageId: taskId))" ->
aggregateMetricsPattern),
Map("number of output rows" -> 3L,
- "avg hash probe bucket list iters (min, med, max (stageId (attemptId):
taskId))" ->
+ "avg hash probe bucket list iters (min, med, max (stageId: taskId))" ->
aggregateMetricsPattern))
val shuffleExpected2 = Map(
@@ -181,8 +181,8 @@ class SQLMetricsSuite extends SharedSparkSession with
SQLMetricsTestUtils {
}
val metrics = getSparkPlanMetrics(df, 1, nodeIds, enableWholeStage).get
nodeIds.foreach { nodeId =>
- val probes = metrics(nodeId)._2("avg hash probe bucket list iters
(min, med, max (stageId" +
- " (attemptId): taskId))")
+ val probes =
+ metrics(nodeId)._2("avg hash probe bucket list iters (min, med, max
(stageId: taskId))")
// Extract min, med, max from the string and strip off everthing else.
val index =
probes.toString.stripPrefix("\n(").stripSuffix(")").indexOf(" (", 0)
probes.toString.stripPrefix("\n(").stripSuffix(")").slice(0,
index).split(", ").foreach {
@@ -231,13 +231,13 @@ class SQLMetricsSuite extends SharedSparkSession with
SQLMetricsTestUtils {
val df = Seq(1, 3, 2).toDF("id").sort('id)
testSparkPlanMetricsWithPredicates(df, 2, Map(
0L -> (("Sort", Map(
- "sort time total (min, med, max (stageId (attemptId): taskId))" -> {
+ "sort time total (min, med, max (stageId: taskId))" -> {
_.toString.matches(timingMetricPattern)
},
- "peak memory total (min, med, max (stageId (attemptId): taskId))" -> {
+ "peak memory total (min, med, max (stageId: taskId))" -> {
_.toString.matches(sizeMetricPattern)
},
- "spill size total (min, med, max (stageId (attemptId): taskId))" -> {
+ "spill size total (min, med, max (stageId: taskId))" -> {
_.toString.matches(sizeMetricPattern)
})))
))
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
index 0c1148f..766e7a9 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
@@ -41,27 +41,27 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
protected def statusStore: SQLAppStatusStore = spark.sharedState.statusStore
- // Pattern of size SQLMetric value, e.g. "\n96.2 MiB (32.1 MiB, 32.1 MiB,
32.1 MiB (stage 0
- // (attempt 0): task 4))" OR "\n96.2 MiB (32.1 MiB, 32.1 MiB, 32.1 MiB)"
+ // Pattern of size SQLMetric value, e.g. "\n96.2 MiB (32.1 MiB, 32.1 MiB,
32.1 MiB (stage 0.0:
+ // task 4))" OR "\n96.2 MiB (32.1 MiB, 32.1 MiB, 32.1 MiB)"
protected val sizeMetricPattern = {
val bytes = "([0-9]+(\\.[0-9]+)?) (EiB|PiB|TiB|GiB|MiB|KiB|B)"
- val maxMetrics = "\\(stage ([0-9])+ \\(attempt ([0-9])+\\)\\: task
([0-9])+\\)"
+ val maxMetrics = "\\(stage ([0-9])+\\.([0-9])+\\: task ([0-9])+\\)"
s"\\n$bytes \\($bytes, $bytes, $bytes( $maxMetrics)?\\)"
}
- // Pattern of timing SQLMetric value, e.g. "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms
(stage 3 (attempt
- // 0): task 217))" OR "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms)"
+ // Pattern of timing SQLMetric value, e.g. "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms
(stage 3.0):
+ // task 217))" OR "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms)"
protected val timingMetricPattern = {
val duration = "([0-9]+(\\.[0-9]+)?) (ms|s|m|h)"
- val maxMetrics = "\\(stage ([0-9])+ \\(attempt ([0-9])+\\)\\: task
([0-9])+\\)"
+ val maxMetrics = "\\(stage ([0-9])+\\.([0-9])+\\: task ([0-9])+\\)"
s"\\n$duration \\($duration, $duration, $duration( $maxMetrics)?\\)"
}
// Pattern of size SQLMetric value for Aggregate tests.
- // e.g "\n(1, 1, 0.9 (stage 1 (attempt 0): task 8)) OR "\n(1, 1, 0.9 )"
+ // e.g "\n(1, 1, 0.9 (stage 1.0: task 8)) OR "\n(1, 1, 0.9 )"
protected val aggregateMetricsPattern = {
val iters = "([0-9]+(\\.[0-9]+)?)"
- val maxMetrics = "\\(stage ([0-9])+ \\(attempt ([0-9])+\\)\\: task
([0-9])+\\)"
+ val maxMetrics = "\\(stage ([0-9])+\\.([0-9])+\\: task ([0-9])+\\)"
s"\\n\\($iters, $iters, $iters( $maxMetrics)?\\)"
}
@@ -98,7 +98,7 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
}
val totalNumBytesMetric = executedNode.metrics.find(
- _.name == "written output total (min, med, max (stageId (attemptId):
taskId))").get
+ _.name == "written output total (min, med, max (stageId: taskId))").get
val totalNumBytes =
metrics(totalNumBytesMetric.accumulatorId).replaceAll(",", "")
.split(" ").head.trim.toDouble
assert(totalNumBytes > 0)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]