This is an automated email from the ASF dual-hosted git repository.
dbtsai pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new e892a01 [SPARK-31683][CORE] Make Prometheus output consistent with
DropWizard 4.1 result
e892a01 is described below
commit e892a016699d996b959b4db01242cff934d62f76
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Tue May 12 19:57:48 2020 +0000
[SPARK-31683][CORE] Make Prometheus output consistent with DropWizard 4.1
result
### What changes were proposed in this pull request?
This PR aims to update Prometheus-related output format to be consistent
with DropWizard 4.1 result.
- Add `Number` metrics for gauges metrics.
- Add `type` labels.
### Why are the changes needed?
SPARK-29032 added Prometheus support. After that, SPARK-29674 upgraded
DropWizard for JDK9+ support and this caused difference in output labels and
number of keys for Guage metrics. The current status is different from Apache
Spark 2.4.5. Since we cannot change DropWizard, this PR aims to be consistent
in Apache Spark 3.0.0 only.
**DropWizard 3.x**
```
metrics_master_aliveWorkers_Value 1.0
```
**DropWizard 4.1**
```
metrics_master_aliveWorkers_Value{type="gauges",} 1.0
metrics_master_aliveWorkers_Number{type="gauges",} 1.0
```
### Does this PR introduce _any_ user-facing change?
Yes, but this is a new feature in 3.0.0.
### How was this patch tested?
Manually check the output like the following.
**JMXExporter Result**
```
$ curl -s http://localhost:8088/ | grep "^metrics_master" | sort
metrics_master_aliveWorkers_Number{type="gauges",} 1.0
metrics_master_aliveWorkers_Value{type="gauges",} 1.0
metrics_master_apps_Number{type="gauges",} 0.0
metrics_master_apps_Value{type="gauges",} 0.0
metrics_master_waitingApps_Number{type="gauges",} 0.0
metrics_master_waitingApps_Value{type="gauges",} 0.0
metrics_master_workers_Number{type="gauges",} 1.0
metrics_master_workers_Value{type="gauges",} 1.0
```
**This PR**
```
$ curl -s http://localhost:8080/metrics/master/prometheus/ | grep master
metrics_master_aliveWorkers_Number{type="gauges"} 1
metrics_master_aliveWorkers_Value{type="gauges"} 1
metrics_master_apps_Number{type="gauges"} 0
metrics_master_apps_Value{type="gauges"} 0
metrics_master_waitingApps_Number{type="gauges"} 0
metrics_master_waitingApps_Value{type="gauges"} 0
metrics_master_workers_Number{type="gauges"} 1
metrics_master_workers_Value{type="gauges"} 1
```
Closes #28510 from dongjoon-hyun/SPARK-31683.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: DB Tsai <[email protected]>
(cherry picked from commit 07209f3e2deab824f04484fa6b8bab0ec0a635d6)
Signed-off-by: DB Tsai <[email protected]>
---
.../spark/metrics/sink/PrometheusServlet.scala | 73 ++++++++++++----------
.../spark/status/api/v1/PrometheusResource.scala | 52 +++++++--------
2 files changed, 67 insertions(+), 58 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/metrics/sink/PrometheusServlet.scala
b/core/src/main/scala/org/apache/spark/metrics/sink/PrometheusServlet.scala
index 011c7bc..59b863b 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/PrometheusServlet.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/PrometheusServlet.scala
@@ -56,58 +56,65 @@ private[spark] class PrometheusServlet(
def getMetricsSnapshot(request: HttpServletRequest): String = {
import scala.collection.JavaConverters._
+ val guagesLabel = """{type="gauges"}"""
+ val countersLabel = """{type="counters"}"""
+ val metersLabel = countersLabel
+ val histogramslabels = """{type="histograms"}"""
+ val timersLabels = """{type="timers"}"""
+
val sb = new StringBuilder()
registry.getGauges.asScala.foreach { case (k, v) =>
if (!v.getValue.isInstanceOf[String]) {
- sb.append(s"${normalizeKey(k)}Value ${v.getValue}\n")
+ sb.append(s"${normalizeKey(k)}Number$guagesLabel ${v.getValue}\n")
+ sb.append(s"${normalizeKey(k)}Value$guagesLabel ${v.getValue}\n")
}
}
registry.getCounters.asScala.foreach { case (k, v) =>
- sb.append(s"${normalizeKey(k)}Count ${v.getCount}\n")
+ sb.append(s"${normalizeKey(k)}Count$countersLabel ${v.getCount}\n")
}
registry.getHistograms.asScala.foreach { case (k, h) =>
val snapshot = h.getSnapshot
val prefix = normalizeKey(k)
- sb.append(s"${prefix}Count ${h.getCount}\n")
- sb.append(s"${prefix}Max ${snapshot.getMax}\n")
- sb.append(s"${prefix}Mean ${snapshot.getMean}\n")
- sb.append(s"${prefix}Min ${snapshot.getMin}\n")
- sb.append(s"${prefix}50thPercentile ${snapshot.getMedian}\n")
- sb.append(s"${prefix}75thPercentile ${snapshot.get75thPercentile}\n")
- sb.append(s"${prefix}95thPercentile ${snapshot.get95thPercentile}\n")
- sb.append(s"${prefix}98thPercentile ${snapshot.get98thPercentile}\n")
- sb.append(s"${prefix}99thPercentile ${snapshot.get99thPercentile}\n")
- sb.append(s"${prefix}999thPercentile ${snapshot.get999thPercentile}\n")
- sb.append(s"${prefix}StdDev ${snapshot.getStdDev}\n")
+ sb.append(s"${prefix}Count$histogramslabels ${h.getCount}\n")
+ sb.append(s"${prefix}Max$histogramslabels ${snapshot.getMax}\n")
+ sb.append(s"${prefix}Mean$histogramslabels ${snapshot.getMean}\n")
+ sb.append(s"${prefix}Min$histogramslabels ${snapshot.getMin}\n")
+ sb.append(s"${prefix}50thPercentile$histogramslabels
${snapshot.getMedian}\n")
+ sb.append(s"${prefix}75thPercentile$histogramslabels
${snapshot.get75thPercentile}\n")
+ sb.append(s"${prefix}95thPercentile$histogramslabels
${snapshot.get95thPercentile}\n")
+ sb.append(s"${prefix}98thPercentile$histogramslabels
${snapshot.get98thPercentile}\n")
+ sb.append(s"${prefix}99thPercentile$histogramslabels
${snapshot.get99thPercentile}\n")
+ sb.append(s"${prefix}999thPercentile$histogramslabels
${snapshot.get999thPercentile}\n")
+ sb.append(s"${prefix}StdDev$histogramslabels ${snapshot.getStdDev}\n")
}
registry.getMeters.entrySet.iterator.asScala.foreach { kv =>
val prefix = normalizeKey(kv.getKey)
val meter = kv.getValue
- sb.append(s"${prefix}Count ${meter.getCount}\n")
- sb.append(s"${prefix}MeanRate ${meter.getMeanRate}\n")
- sb.append(s"${prefix}OneMinuteRate ${meter.getOneMinuteRate}\n")
- sb.append(s"${prefix}FiveMinuteRate ${meter.getFiveMinuteRate}\n")
- sb.append(s"${prefix}FifteenMinuteRate ${meter.getFifteenMinuteRate}\n")
+ sb.append(s"${prefix}Count$metersLabel ${meter.getCount}\n")
+ sb.append(s"${prefix}MeanRate$metersLabel ${meter.getMeanRate}\n")
+ sb.append(s"${prefix}OneMinuteRate$metersLabel
${meter.getOneMinuteRate}\n")
+ sb.append(s"${prefix}FiveMinuteRate$metersLabel
${meter.getFiveMinuteRate}\n")
+ sb.append(s"${prefix}FifteenMinuteRate$metersLabel
${meter.getFifteenMinuteRate}\n")
}
registry.getTimers.entrySet.iterator.asScala.foreach { kv =>
val prefix = normalizeKey(kv.getKey)
val timer = kv.getValue
val snapshot = timer.getSnapshot
- sb.append(s"${prefix}Count ${timer.getCount}\n")
- sb.append(s"${prefix}Max ${snapshot.getMax}\n")
- sb.append(s"${prefix}Mean ${snapshot.getMax}\n")
- sb.append(s"${prefix}Min ${snapshot.getMin}\n")
- sb.append(s"${prefix}50thPercentile ${snapshot.getMedian}\n")
- sb.append(s"${prefix}75thPercentile ${snapshot.get75thPercentile}\n")
- sb.append(s"${prefix}95thPercentile ${snapshot.get95thPercentile}\n")
- sb.append(s"${prefix}98thPercentile ${snapshot.get98thPercentile}\n")
- sb.append(s"${prefix}99thPercentile ${snapshot.get99thPercentile}\n")
- sb.append(s"${prefix}999thPercentile ${snapshot.get999thPercentile}\n")
- sb.append(s"${prefix}StdDev ${snapshot.getStdDev}\n")
- sb.append(s"${prefix}FifteenMinuteRate ${timer.getFifteenMinuteRate}\n")
- sb.append(s"${prefix}FiveMinuteRate ${timer.getFiveMinuteRate}\n")
- sb.append(s"${prefix}OneMinuteRate ${timer.getOneMinuteRate}\n")
- sb.append(s"${prefix}MeanRate ${timer.getMeanRate}\n")
+ sb.append(s"${prefix}Count$timersLabels ${timer.getCount}\n")
+ sb.append(s"${prefix}Max$timersLabels ${snapshot.getMax}\n")
+ sb.append(s"${prefix}Mean$timersLabels ${snapshot.getMax}\n")
+ sb.append(s"${prefix}Min$timersLabels ${snapshot.getMin}\n")
+ sb.append(s"${prefix}50thPercentile$timersLabels
${snapshot.getMedian}\n")
+ sb.append(s"${prefix}75thPercentile$timersLabels
${snapshot.get75thPercentile}\n")
+ sb.append(s"${prefix}95thPercentile$timersLabels
${snapshot.get95thPercentile}\n")
+ sb.append(s"${prefix}98thPercentile$timersLabels
${snapshot.get98thPercentile}\n")
+ sb.append(s"${prefix}99thPercentile$timersLabels
${snapshot.get99thPercentile}\n")
+ sb.append(s"${prefix}999thPercentile$timersLabels
${snapshot.get999thPercentile}\n")
+ sb.append(s"${prefix}StdDev$timersLabels ${snapshot.getStdDev}\n")
+ sb.append(s"${prefix}FifteenMinuteRate$timersLabels
${timer.getFifteenMinuteRate}\n")
+ sb.append(s"${prefix}FiveMinuteRate$timersLabels
${timer.getFiveMinuteRate}\n")
+ sb.append(s"${prefix}OneMinuteRate$timersLabels
${timer.getOneMinuteRate}\n")
+ sb.append(s"${prefix}MeanRate$timersLabels ${timer.getMeanRate}\n")
}
sb.toString()
}
diff --git
a/core/src/main/scala/org/apache/spark/status/api/v1/PrometheusResource.scala
b/core/src/main/scala/org/apache/spark/status/api/v1/PrometheusResource.scala
index 2a5f151..4ed3d45 100644
---
a/core/src/main/scala/org/apache/spark/status/api/v1/PrometheusResource.scala
+++
b/core/src/main/scala/org/apache/spark/status/api/v1/PrometheusResource.scala
@@ -50,27 +50,27 @@ private[v1] class PrometheusResource extends
ApiRequestContext {
"application_name" -> store.applicationInfo.name,
"executor_id" -> executor.id
).map { case (k, v) => s"""$k="$v"""" }.mkString("{", ", ", "}")
- sb.append(s"${prefix}rddBlocks_Count$labels ${executor.rddBlocks}\n")
- sb.append(s"${prefix}memoryUsed_Count$labels ${executor.memoryUsed}\n")
- sb.append(s"${prefix}diskUsed_Count$labels ${executor.diskUsed}\n")
- sb.append(s"${prefix}totalCores_Count$labels ${executor.totalCores}\n")
- sb.append(s"${prefix}maxTasks_Count$labels ${executor.maxTasks}\n")
- sb.append(s"${prefix}activeTasks_Count$labels ${executor.activeTasks}\n")
- sb.append(s"${prefix}failedTasks_Count$labels ${executor.failedTasks}\n")
- sb.append(s"${prefix}completedTasks_Count$labels
${executor.completedTasks}\n")
- sb.append(s"${prefix}totalTasks_Count$labels ${executor.totalTasks}\n")
- sb.append(s"${prefix}totalDuration_Value$labels
${executor.totalDuration}\n")
- sb.append(s"${prefix}totalGCTime_Value$labels ${executor.totalGCTime}\n")
- sb.append(s"${prefix}totalInputBytes_Count$labels
${executor.totalInputBytes}\n")
- sb.append(s"${prefix}totalShuffleRead_Count$labels
${executor.totalShuffleRead}\n")
- sb.append(s"${prefix}totalShuffleWrite_Count$labels
${executor.totalShuffleWrite}\n")
- sb.append(s"${prefix}maxMemory_Count$labels ${executor.maxMemory}\n")
+ sb.append(s"${prefix}rddBlocks$labels ${executor.rddBlocks}\n")
+ sb.append(s"${prefix}memoryUsed_bytes$labels ${executor.memoryUsed}\n")
+ sb.append(s"${prefix}diskUsed_bytes$labels ${executor.diskUsed}\n")
+ sb.append(s"${prefix}totalCores$labels ${executor.totalCores}\n")
+ sb.append(s"${prefix}maxTasks$labels ${executor.maxTasks}\n")
+ sb.append(s"${prefix}activeTasks$labels ${executor.activeTasks}\n")
+ sb.append(s"${prefix}failedTasks_total$labels ${executor.failedTasks}\n")
+ sb.append(s"${prefix}completedTasks_total$labels
${executor.completedTasks}\n")
+ sb.append(s"${prefix}totalTasks_total$labels ${executor.totalTasks}\n")
+ sb.append(s"${prefix}totalDuration_seconds_total$labels
${executor.totalDuration * 0.001}\n")
+ sb.append(s"${prefix}totalGCTime_seconds_total$labels
${executor.totalGCTime * 0.001}\n")
+ sb.append(s"${prefix}totalInputBytes_bytes_total$labels
${executor.totalInputBytes}\n")
+ sb.append(s"${prefix}totalShuffleRead_bytes_total$labels
${executor.totalShuffleRead}\n")
+ sb.append(s"${prefix}totalShuffleWrite_bytes_total$labels
${executor.totalShuffleWrite}\n")
+ sb.append(s"${prefix}maxMemory_bytes$labels ${executor.maxMemory}\n")
executor.executorLogs.foreach { case (k, v) => }
executor.memoryMetrics.foreach { m =>
- sb.append(s"${prefix}usedOnHeapStorageMemory_Count$labels
${m.usedOnHeapStorageMemory}\n")
- sb.append(s"${prefix}usedOffHeapStorageMemory_Count$labels
${m.usedOffHeapStorageMemory}\n")
- sb.append(s"${prefix}totalOnHeapStorageMemory_Count$labels
${m.totalOnHeapStorageMemory}\n")
- sb.append(s"${prefix}totalOffHeapStorageMemory_Count$labels " +
+ sb.append(s"${prefix}usedOnHeapStorageMemory_bytes$labels
${m.usedOnHeapStorageMemory}\n")
+ sb.append(s"${prefix}usedOffHeapStorageMemory_bytes$labels
${m.usedOffHeapStorageMemory}\n")
+ sb.append(s"${prefix}totalOnHeapStorageMemory_bytes$labels
${m.totalOnHeapStorageMemory}\n")
+ sb.append(s"${prefix}totalOffHeapStorageMemory_bytes$labels " +
s"${m.totalOffHeapStorageMemory}\n")
}
executor.peakMemoryMetrics.foreach { m =>
@@ -90,14 +90,16 @@ private[v1] class PrometheusResource extends
ApiRequestContext {
"ProcessTreePythonVMemory",
"ProcessTreePythonRSSMemory",
"ProcessTreeOtherVMemory",
- "ProcessTreeOtherRSSMemory",
- "MinorGCCount",
- "MinorGCTime",
- "MajorGCCount",
- "MajorGCTime"
+ "ProcessTreeOtherRSSMemory"
)
names.foreach { name =>
- sb.append(s"$prefix${name}_Count$labels ${m.getMetricValue(name)}\n")
+ sb.append(s"$prefix${name}_bytes$labels ${m.getMetricValue(name)}\n")
+ }
+ Seq("MinorGCCount", "MajorGCCount").foreach { name =>
+ sb.append(s"$prefix${name}_total$labels ${m.getMetricValue(name)}\n")
+ }
+ Seq("MinorGCTime", "MajorGCTime").foreach { name =>
+ sb.append(s"$prefix${name}_seconds_total$labels
${m.getMetricValue(name) * 0.001}\n")
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]