Repository: spark
Updated Branches:
  refs/heads/master 9e785079b -> 8b44bd52f


[SPARK-6735][YARN] Add window based executor failure tracking mechanism for 
long running service

This work is based on twinkle-sachdeva 's proposal. In parallel to such 
mechanism for AM failures, here add similar mechanism for executor failure 
tracking, this is useful for long running Spark service to mitigate the 
executor failure problems.

Please help to review, tgravescs sryza and vanzin

Author: jerryshao <[email protected]>

Closes #10241 from jerryshao/SPARK-6735.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8b44bd52
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8b44bd52
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8b44bd52

Branch: refs/heads/master
Commit: 8b44bd52fa40c0fc7d34798c3654e31533fd3008
Parents: 9e78507
Author: jerryshao <[email protected]>
Authored: Thu Apr 28 12:38:19 2016 -0500
Committer: Tom Graves <[email protected]>
Committed: Thu Apr 28 12:38:19 2016 -0500

----------------------------------------------------------------------
 docs/running-on-yarn.md                         |  8 ++++
 .../org/apache/spark/deploy/yarn/Client.scala   |  4 +-
 .../spark/deploy/yarn/YarnAllocator.scala       | 34 ++++++++++++---
 .../org/apache/spark/deploy/yarn/config.scala   |  9 +++-
 .../spark/deploy/yarn/YarnAllocatorSuite.scala  | 46 ++++++++++++++++++++
 5 files changed, 93 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8b44bd52/docs/running-on-yarn.md
----------------------------------------------------------------------
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 09701ab..3bd16bf 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -372,6 +372,14 @@ If you need a reference to the proper location to put log 
files in the YARN so t
   </td>
 </tr>
 <tr>
+  <td><code>spark.yarn.executor.failuresValidityInterval</code></td>
+  <td>(none)</td>
+  <td>
+  Defines the validity interval for executor failure tracking.
+  Executor failures which are older than the validity interval will be ignored.
+  </td>
+</tr>
+<tr>
   <td><code>spark.yarn.submit.waitAppCompletion</code></td>
   <td><code>true</code></td>
   <td>

http://git-wip-us.apache.org/repos/asf/spark/blob/8b44bd52/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 8b07dc3..6184ad5 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -231,14 +231,14 @@ private[spark] class Client(
           "Cluster's default value will be used.")
     }
 
-    sparkConf.get(ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).foreach { interval =>
+    sparkConf.get(AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).foreach { interval 
=>
       try {
         val method = appContext.getClass().getMethod(
           "setAttemptFailuresValidityInterval", classOf[Long])
         method.invoke(appContext, interval: java.lang.Long)
       } catch {
         case e: NoSuchMethodException =>
-          logWarning(s"Ignoring ${ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS.key} 
because " +
+          logWarning(s"Ignoring ${AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS.key} 
because " +
             "the version of YARN does not support it")
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/8b44bd52/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index b59e6cf..066c665 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -22,7 +22,7 @@ import java.util.concurrent._
 import java.util.regex.Pattern
 
 import scala.collection.mutable
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
 import scala.collection.JavaConverters._
 
 import org.apache.hadoop.conf.Configuration
@@ -41,7 +41,7 @@ import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
 import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
 import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
 import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveLastAllocatedExecutorId
-import org.apache.spark.util.ThreadUtils
+import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
 
 /**
  * YarnAllocator is charged with requesting containers from the YARN 
ResourceManager and deciding
@@ -102,7 +102,13 @@ private[yarn] class YarnAllocator(
   private var executorIdCounter: Int =
     driverRef.askWithRetry[Int](RetrieveLastAllocatedExecutorId)
 
-  @volatile private var numExecutorsFailed = 0
+  // Queue to store the timestamp of failed executors
+  private val failedExecutorsTimeStamps = new Queue[Long]()
+
+  private var clock: Clock = new SystemClock
+
+  private val executorFailuresValidityInterval =
+    sparkConf.get(EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).getOrElse(-1L)
 
   @volatile private var targetNumExecutors =
     YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf)
@@ -166,9 +172,26 @@ private[yarn] class YarnAllocator(
   private[yarn] val containerPlacementStrategy =
     new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource)
 
+  /**
+   * Use a different clock for YarnAllocator. This is mainly used for testing.
+   */
+  def setClock(newClock: Clock): Unit = {
+    clock = newClock
+  }
+
   def getNumExecutorsRunning: Int = numExecutorsRunning
 
-  def getNumExecutorsFailed: Int = numExecutorsFailed
+  def getNumExecutorsFailed: Int = synchronized {
+    val endTime = clock.getTimeMillis()
+
+    while (executorFailuresValidityInterval > 0
+      && failedExecutorsTimeStamps.nonEmpty
+      && failedExecutorsTimeStamps.head < endTime - 
executorFailuresValidityInterval) {
+      failedExecutorsTimeStamps.dequeue()
+    }
+
+    failedExecutorsTimeStamps.size
+  }
 
   /**
    * A sequence of pending container requests that have not yet been fulfilled.
@@ -527,7 +550,8 @@ private[yarn] class YarnAllocator(
               completedContainer.getDiagnostics,
               PMEM_EXCEEDED_PATTERN))
           case _ =>
-            numExecutorsFailed += 1
+            // Enqueue the timestamp of failed executor
+            failedExecutorsTimeStamps.enqueue(clock.getTimeMillis())
             (true, "Container marked as failed: " + containerId + onHostStr +
               ". Exit status: " + completedContainer.getExitStatus +
               ". Diagnostics: " + completedContainer.getDiagnostics)

http://git-wip-us.apache.org/repos/asf/spark/blob/8b44bd52/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index 3816a84..c4dd320 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -33,13 +33,20 @@ package object config {
     .toSequence
     .createOptional
 
-  private[spark] val ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS =
+  private[spark] val AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS =
     ConfigBuilder("spark.yarn.am.attemptFailuresValidityInterval")
       .doc("Interval after which AM failures will be considered independent 
and " +
         "not accumulate towards the attempt count.")
       .timeConf(TimeUnit.MILLISECONDS)
       .createOptional
 
+  private[spark] val EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS =
+    ConfigBuilder("spark.yarn.executor.failuresValidityInterval")
+      .doc("Interval after which Executor failures will be considered 
independent and not " +
+        "accumulate towards the attempt count.")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createOptional
+
   private[spark] val MAX_APP_ATTEMPTS = 
ConfigBuilder("spark.yarn.maxAppAttempts")
     .doc("Maximum number of AM attempts before failing the app.")
     .intConf

http://git-wip-us.apache.org/repos/asf/spark/blob/8b44bd52/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala 
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 6a861d6..f4f8bd4 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -34,6 +34,7 @@ import org.apache.spark.deploy.yarn.YarnAllocator._
 import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
 import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.scheduler.SplitInfo
+import org.apache.spark.util.ManualClock
 
 class MockResolver extends DNSToSwitchMapping {
 
@@ -275,4 +276,49 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers with BeforeAndAfter
     assert(vmemMsg.contains("5.8 GB of 4.2 GB virtual memory used."))
     assert(pmemMsg.contains("2.1 MB of 2 GB physical memory used."))
   }
+
+  test("window based failure executor counting") {
+    sparkConf.set("spark.yarn.executor.failuresValidityInterval", "100s")
+    val handler = createAllocator(4)
+    val clock = new ManualClock(0L)
+    handler.setClock(clock)
+
+    handler.updateResourceRequests()
+    handler.getNumExecutorsRunning should be (0)
+    handler.getPendingAllocate.size should be (4)
+
+    val containers = Seq(
+      createContainer("host1"),
+      createContainer("host2"),
+      createContainer("host3"),
+      createContainer("host4")
+    )
+    handler.handleAllocatedContainers(containers)
+
+    val failedStatuses = containers.map { c =>
+      ContainerStatus.newInstance(c.getId, ContainerState.COMPLETE, "Failed", 
-1)
+    }
+
+    handler.getNumExecutorsFailed should be (0)
+
+    clock.advance(100 * 1000L)
+    handler.processCompletedContainers(failedStatuses.slice(0, 1))
+    handler.getNumExecutorsFailed should be (1)
+
+    clock.advance(101 * 1000L)
+    handler.getNumExecutorsFailed should be (0)
+
+    handler.processCompletedContainers(failedStatuses.slice(1, 3))
+    handler.getNumExecutorsFailed should be (2)
+
+    clock.advance(50 * 1000L)
+    handler.processCompletedContainers(failedStatuses.slice(3, 4))
+    handler.getNumExecutorsFailed should be (3)
+
+    clock.advance(51 * 1000L)
+    handler.getNumExecutorsFailed should be (1)
+
+    clock.advance(50 * 1000L)
+    handler.getNumExecutorsFailed should be (0)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to