Repository: spark Updated Branches: refs/heads/master 57e9f29e1 -> 9f1f9b103
[SPARK-7007] [CORE] Add a metric source for ExecutorAllocationManager Add a metric source to expose the internal status of ExecutorAllocationManager to better monitoring the resource usage of executors when dynamic allocation is enable. Please help to review, thanks a lot. Author: jerryshao <[email protected]> Closes #5589 from jerryshao/dynamic-allocation-source and squashes the following commits: 104d155 [jerryshao] rebase and address the comments c501a2c [jerryshao] Address the comments d237ba5 [jerryshao] Address the comments 2c3540f [jerryshao] Add a metric source for ExecutorAllocationManager Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9f1f9b10 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9f1f9b10 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9f1f9b10 Branch: refs/heads/master Commit: 9f1f9b1037ee003a07ff09d60bb360cf32c8a564 Parents: 57e9f29 Author: jerryshao <[email protected]> Authored: Tue May 5 09:43:49 2015 -0700 Committer: Andrew Or <[email protected]> Committed: Tue May 5 09:43:49 2015 -0700 ---------------------------------------------------------------------- .../spark/ExecutorAllocationManager.scala | 29 ++++++++++++++++++++ .../scala/org/apache/spark/SparkContext.scala | 3 ++ 2 files changed, 32 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/9f1f9b10/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 228d914..66bda68 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -21,7 +21,10 @@ import java.util.concurrent.TimeUnit import scala.collection.mutable +import com.codahale.metrics.{Gauge, MetricRegistry} + import org.apache.spark.scheduler._ +import org.apache.spark.metrics.source.Source import org.apache.spark.util.{ThreadUtils, Clock, SystemClock, Utils} /** @@ -144,6 +147,9 @@ private[spark] class ExecutorAllocationManager( private val executor = ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-dynamic-executor-allocation") + // Metric source for ExecutorAllocationManager to expose internal status to MetricsSystem. + val executorAllocationManagerSource = new ExecutorAllocationManagerSource + /** * Verify that the settings specified through the config are valid. * If not, throw an appropriate exception. @@ -579,6 +585,29 @@ private[spark] class ExecutorAllocationManager( } } + /** + * Metric source for ExecutorAllocationManager to expose its internal executor allocation + * status to MetricsSystem. + * Note: These metrics heavily rely on the internal implementation of + * ExecutorAllocationManager, metrics or value of metrics will be changed when internal + * implementation is changed, so these metrics are not stable across Spark version. + */ + private[spark] class ExecutorAllocationManagerSource extends Source { + val sourceName = "ExecutorAllocationManager" + val metricRegistry = new MetricRegistry() + + private def registerGauge[T](name: String, value: => T, defaultValue: T): Unit = { + metricRegistry.register(MetricRegistry.name("executors", name), new Gauge[T] { + override def getValue: T = synchronized { Option(value).getOrElse(defaultValue) } + }) + } + + registerGauge("numberExecutorsToAdd", numExecutorsToAdd, 0) + registerGauge("numberExecutorsPendingToRemove", executorsPendingToRemove.size, 0) + registerGauge("numberAllExecutors", executorIds.size, 0) + registerGauge("numberTargetExecutors", numExecutorsTarget, 0) + registerGauge("numberMaxNeededExecutors", maxNumExecutorsNeeded(), 0) + } } private object ExecutorAllocationManager { http://git-wip-us.apache.org/repos/asf/spark/blob/9f1f9b10/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 00eb432..2ca6882 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -537,6 +537,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli _taskScheduler.postStartHook() _env.metricsSystem.registerSource(new DAGSchedulerSource(dagScheduler)) _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager)) + _executorAllocationManager.foreach { e => + _env.metricsSystem.registerSource(e.executorAllocationManagerSource) + } // Make sure the context is stopped if the user forgets about it. This avoids leaving // unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
