Repository: spark
Updated Branches:
  refs/heads/branch-1.4 acc877a98 -> 29350eef3


[SPARK-7007] [CORE] Add a metric source for ExecutorAllocationManager

Add a metric source to expose the internal status of ExecutorAllocationManager 
to better monitoring the resource usage of executors when dynamic allocation is 
enable. Please help to review, thanks a lot.

Author: jerryshao <[email protected]>

Closes #5589 from jerryshao/dynamic-allocation-source and squashes the 
following commits:

104d155 [jerryshao] rebase and address the comments
c501a2c [jerryshao] Address the comments
d237ba5 [jerryshao] Address the comments
2c3540f [jerryshao] Add a metric source for ExecutorAllocationManager

(cherry picked from commit 9f1f9b1037ee003a07ff09d60bb360cf32c8a564)
Signed-off-by: Andrew Or <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/29350eef
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/29350eef
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/29350eef

Branch: refs/heads/branch-1.4
Commit: 29350eef30b0ac11e9ce436c3c7c6e27bb7eb6ce
Parents: acc877a
Author: jerryshao <[email protected]>
Authored: Tue May 5 09:43:49 2015 -0700
Committer: Andrew Or <[email protected]>
Committed: Tue May 5 09:43:55 2015 -0700

----------------------------------------------------------------------
 .../spark/ExecutorAllocationManager.scala       | 29 ++++++++++++++++++++
 .../scala/org/apache/spark/SparkContext.scala   |  3 ++
 2 files changed, 32 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/29350eef/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 228d914..66bda68 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -21,7 +21,10 @@ import java.util.concurrent.TimeUnit
 
 import scala.collection.mutable
 
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
 import org.apache.spark.scheduler._
+import org.apache.spark.metrics.source.Source
 import org.apache.spark.util.{ThreadUtils, Clock, SystemClock, Utils}
 
 /**
@@ -144,6 +147,9 @@ private[spark] class ExecutorAllocationManager(
   private val executor =
     
ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-dynamic-executor-allocation")
 
+  // Metric source for ExecutorAllocationManager to expose internal status to 
MetricsSystem.
+  val executorAllocationManagerSource = new ExecutorAllocationManagerSource
+
   /**
    * Verify that the settings specified through the config are valid.
    * If not, throw an appropriate exception.
@@ -579,6 +585,29 @@ private[spark] class ExecutorAllocationManager(
     }
   }
 
+  /**
+   * Metric source for ExecutorAllocationManager to expose its internal 
executor allocation
+   * status to MetricsSystem.
+   * Note: These metrics heavily rely on the internal implementation of
+   * ExecutorAllocationManager, metrics or value of metrics will be changed 
when internal
+   * implementation is changed, so these metrics are not stable across Spark 
version.
+   */
+  private[spark] class ExecutorAllocationManagerSource extends Source {
+    val sourceName = "ExecutorAllocationManager"
+    val metricRegistry = new MetricRegistry()
+
+    private def registerGauge[T](name: String, value: => T, defaultValue: T): 
Unit = {
+      metricRegistry.register(MetricRegistry.name("executors", name), new 
Gauge[T] {
+        override def getValue: T = synchronized { 
Option(value).getOrElse(defaultValue) }
+      })
+    }
+
+    registerGauge("numberExecutorsToAdd", numExecutorsToAdd, 0)
+    registerGauge("numberExecutorsPendingToRemove", 
executorsPendingToRemove.size, 0)
+    registerGauge("numberAllExecutors", executorIds.size, 0)
+    registerGauge("numberTargetExecutors", numExecutorsTarget, 0)
+    registerGauge("numberMaxNeededExecutors", maxNumExecutorsNeeded(), 0)
+  }
 }
 
 private object ExecutorAllocationManager {

http://git-wip-us.apache.org/repos/asf/spark/blob/29350eef/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 00eb432..2ca6882 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -537,6 +537,9 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
     _taskScheduler.postStartHook()
     _env.metricsSystem.registerSource(new DAGSchedulerSource(dagScheduler))
     _env.metricsSystem.registerSource(new 
BlockManagerSource(_env.blockManager))
+    _executorAllocationManager.foreach { e =>
+      _env.metricsSystem.registerSource(e.executorAllocationManagerSource)
+    }
 
     // Make sure the context is stopped if the user forgets about it. This 
avoids leaving
     // unfinished event logs around after the JVM exits cleanly. It doesn't 
help if the JVM


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to