Repository: spark
Updated Branches:
  refs/heads/master fc26f32cf -> 16906ef23


[SPARK-11120] Allow sane default number of executor failures when dynamically 
allocating in YARN

I also added some information to container-failure error msgs about what host 
they failed on, which would have helped me identify the problem that lead me to 
this JIRA and PR sooner.

Author: Ryan Williams <[email protected]>

Closes #9147 from ryan-williams/dyn-exec-failures.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/16906ef2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/16906ef2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/16906ef2

Branch: refs/heads/master
Commit: 16906ef23a7aa2854c8cdcaa3bb3808ab39e0eec
Parents: fc26f32
Author: Ryan Williams <[email protected]>
Authored: Mon Oct 19 16:34:15 2015 -0700
Committer: Andrew Or <[email protected]>
Committed: Mon Oct 19 16:34:15 2015 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/SparkConf.scala  |  4 +++-
 .../spark/deploy/yarn/ApplicationMaster.scala    | 19 +++++++++++++++----
 .../apache/spark/deploy/yarn/YarnAllocator.scala | 19 +++++++++++--------
 3 files changed, 29 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/16906ef2/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala 
b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 1a0ac3d..58d3b84 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -595,7 +595,9 @@ private[spark] object SparkConf extends Logging {
     "spark.rpc.lookupTimeout" -> Seq(
       AlternateConfig("spark.akka.lookupTimeout", "1.4")),
     "spark.streaming.fileStream.minRememberDuration" -> Seq(
-      AlternateConfig("spark.streaming.minRememberDuration", "1.5"))
+      AlternateConfig("spark.streaming.minRememberDuration", "1.5")),
+    "spark.yarn.max.executor.failures" -> Seq(
+      AlternateConfig("spark.yarn.max.worker.failures", "1.5"))
     )
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/16906ef2/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index d1d248b..4b4d999 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -62,10 +62,21 @@ private[spark] class ApplicationMaster(
     .asInstanceOf[YarnConfiguration]
   private val isClusterMode = args.userClass != null
 
-  // Default to numExecutors * 2, with minimum of 3
-  private val maxNumExecutorFailures = 
sparkConf.getInt("spark.yarn.max.executor.failures",
-    sparkConf.getInt("spark.yarn.max.worker.failures",
-      math.max(sparkConf.getInt("spark.executor.instances", 0) *  2, 3)))
+  // Default to twice the number of executors (twice the maximum number of 
executors if dynamic
+  // allocation is enabled), with a minimum of 3.
+
+  private val maxNumExecutorFailures = {
+    val defaultKey =
+      if (Utils.isDynamicAllocationEnabled(sparkConf)) {
+        "spark.dynamicAllocation.maxExecutors"
+      } else {
+        "spark.executor.instances"
+      }
+    val effectiveNumExecutors = sparkConf.getInt(defaultKey, 0)
+    val defaultMaxNumExecutorFailures = math.max(3, 2 * effectiveNumExecutors)
+
+    sparkConf.getInt("spark.yarn.max.executor.failures", 
defaultMaxNumExecutorFailures)
+  }
 
   @volatile private var exitCode = 0
   @volatile private var unregistered = false

http://git-wip-us.apache.org/repos/asf/spark/blob/16906ef2/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 9e1ef1b..1deaa37 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -430,17 +430,20 @@ private[yarn] class YarnAllocator(
     for (completedContainer <- completedContainers) {
       val containerId = completedContainer.getContainerId
       val alreadyReleased = releasedContainers.remove(containerId)
+      val hostOpt = allocatedContainerToHostMap.get(containerId)
+      val onHostStr = hostOpt.map(host => s" on host: $host").getOrElse("")
       val exitReason = if (!alreadyReleased) {
         // Decrement the number of executors running. The next iteration of
         // the ApplicationMaster's reporting thread will take care of 
allocating.
         numExecutorsRunning -= 1
-        logInfo("Completed container %s (state: %s, exit status: %s)".format(
+        logInfo("Completed container %s%s (state: %s, exit status: %s)".format(
           containerId,
+          onHostStr,
           completedContainer.getState,
           completedContainer.getExitStatus))
         // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
         // there are some exit status' we shouldn't necessarily count against 
us, but for
-        // now I think its ok as none of the containers are expected to exit
+        // now I think its ok as none of the containers are expected to exit.
         val exitStatus = completedContainer.getExitStatus
         val (isNormalExit, containerExitReason) = exitStatus match {
           case ContainerExitStatus.SUCCESS =>
@@ -449,7 +452,7 @@ private[yarn] class YarnAllocator(
             // Preemption should count as a normal exit, since YARN preempts 
containers merely
             // to do resource sharing, and tasks that fail due to preempted 
executors could
             // just as easily finish on any other executor. See SPARK-8167.
-            (true, s"Container $containerId was preempted.")
+            (true, s"Container ${containerId}${onHostStr} was preempted.")
           // Should probably still count memory exceeded exit codes towards 
task failures
           case VMEM_EXCEEDED_EXIT_CODE =>
             (false, memLimitExceededLogMessage(
@@ -461,7 +464,7 @@ private[yarn] class YarnAllocator(
               PMEM_EXCEEDED_PATTERN))
           case unknown =>
             numExecutorsFailed += 1
-            (false, "Container marked as failed: " + containerId +
+            (false, "Container marked as failed: " + containerId + onHostStr +
               ". Exit status: " + completedContainer.getExitStatus +
               ". Diagnostics: " + completedContainer.getDiagnostics)
 
@@ -479,10 +482,10 @@ private[yarn] class YarnAllocator(
           s"Container $containerId exited from explicit termination request.")
       }
 
-      if (allocatedContainerToHostMap.contains(containerId)) {
-        val host = allocatedContainerToHostMap.get(containerId).get
-        val containerSet = allocatedHostToContainersMap.get(host).get
-
+      for {
+        host <- hostOpt
+        containerSet <- allocatedHostToContainersMap.get(host)
+      } {
         containerSet.remove(containerId)
         if (containerSet.isEmpty) {
           allocatedHostToContainersMap.remove(host)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to