Repository: spark Updated Branches: refs/heads/master 9e785079b -> 8b44bd52f
[SPARK-6735][YARN] Add window based executor failure tracking mechanism for long running service This work is based on twinkle-sachdeva 's proposal. In parallel to such mechanism for AM failures, here add similar mechanism for executor failure tracking, this is useful for long running Spark service to mitigate the executor failure problems. Please help to review, tgravescs sryza and vanzin Author: jerryshao <[email protected]> Closes #10241 from jerryshao/SPARK-6735. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8b44bd52 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8b44bd52 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8b44bd52 Branch: refs/heads/master Commit: 8b44bd52fa40c0fc7d34798c3654e31533fd3008 Parents: 9e78507 Author: jerryshao <[email protected]> Authored: Thu Apr 28 12:38:19 2016 -0500 Committer: Tom Graves <[email protected]> Committed: Thu Apr 28 12:38:19 2016 -0500 ---------------------------------------------------------------------- docs/running-on-yarn.md | 8 ++++ .../org/apache/spark/deploy/yarn/Client.scala | 4 +- .../spark/deploy/yarn/YarnAllocator.scala | 34 ++++++++++++--- .../org/apache/spark/deploy/yarn/config.scala | 9 +++- .../spark/deploy/yarn/YarnAllocatorSuite.scala | 46 ++++++++++++++++++++ 5 files changed, 93 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/8b44bd52/docs/running-on-yarn.md ---------------------------------------------------------------------- diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 09701ab..3bd16bf 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -372,6 +372,14 @@ If you need a reference to the proper location to put log files in the YARN so t </td> </tr> <tr> + <td><code>spark.yarn.executor.failuresValidityInterval</code></td> + <td>(none)</td> + <td> + Defines the validity interval for executor failure tracking. + Executor failures which are older than the validity interval will be ignored. + </td> +</tr> +<tr> <td><code>spark.yarn.submit.waitAppCompletion</code></td> <td><code>true</code></td> <td> http://git-wip-us.apache.org/repos/asf/spark/blob/8b44bd52/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 8b07dc3..6184ad5 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -231,14 +231,14 @@ private[spark] class Client( "Cluster's default value will be used.") } - sparkConf.get(ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).foreach { interval => + sparkConf.get(AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).foreach { interval => try { val method = appContext.getClass().getMethod( "setAttemptFailuresValidityInterval", classOf[Long]) method.invoke(appContext, interval: java.lang.Long) } catch { case e: NoSuchMethodException => - logWarning(s"Ignoring ${ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS.key} because " + + logWarning(s"Ignoring ${AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS.key} because " + "the version of YARN does not support it") } } http://git-wip-us.apache.org/repos/asf/spark/blob/8b44bd52/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index b59e6cf..066c665 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -22,7 +22,7 @@ import java.util.concurrent._ import java.util.regex.Pattern import scala.collection.mutable -import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue} import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration @@ -41,7 +41,7 @@ import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef} import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveLastAllocatedExecutorId -import org.apache.spark.util.ThreadUtils +import org.apache.spark.util.{Clock, SystemClock, ThreadUtils} /** * YarnAllocator is charged with requesting containers from the YARN ResourceManager and deciding @@ -102,7 +102,13 @@ private[yarn] class YarnAllocator( private var executorIdCounter: Int = driverRef.askWithRetry[Int](RetrieveLastAllocatedExecutorId) - @volatile private var numExecutorsFailed = 0 + // Queue to store the timestamp of failed executors + private val failedExecutorsTimeStamps = new Queue[Long]() + + private var clock: Clock = new SystemClock + + private val executorFailuresValidityInterval = + sparkConf.get(EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).getOrElse(-1L) @volatile private var targetNumExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf) @@ -166,9 +172,26 @@ private[yarn] class YarnAllocator( private[yarn] val containerPlacementStrategy = new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource) + /** + * Use a different clock for YarnAllocator. This is mainly used for testing. + */ + def setClock(newClock: Clock): Unit = { + clock = newClock + } + def getNumExecutorsRunning: Int = numExecutorsRunning - def getNumExecutorsFailed: Int = numExecutorsFailed + def getNumExecutorsFailed: Int = synchronized { + val endTime = clock.getTimeMillis() + + while (executorFailuresValidityInterval > 0 + && failedExecutorsTimeStamps.nonEmpty + && failedExecutorsTimeStamps.head < endTime - executorFailuresValidityInterval) { + failedExecutorsTimeStamps.dequeue() + } + + failedExecutorsTimeStamps.size + } /** * A sequence of pending container requests that have not yet been fulfilled. @@ -527,7 +550,8 @@ private[yarn] class YarnAllocator( completedContainer.getDiagnostics, PMEM_EXCEEDED_PATTERN)) case _ => - numExecutorsFailed += 1 + // Enqueue the timestamp of failed executor + failedExecutorsTimeStamps.enqueue(clock.getTimeMillis()) (true, "Container marked as failed: " + containerId + onHostStr + ". Exit status: " + completedContainer.getExitStatus + ". Diagnostics: " + completedContainer.getDiagnostics) http://git-wip-us.apache.org/repos/asf/spark/blob/8b44bd52/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index 3816a84..c4dd320 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -33,13 +33,20 @@ package object config { .toSequence .createOptional - private[spark] val ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS = + private[spark] val AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS = ConfigBuilder("spark.yarn.am.attemptFailuresValidityInterval") .doc("Interval after which AM failures will be considered independent and " + "not accumulate towards the attempt count.") .timeConf(TimeUnit.MILLISECONDS) .createOptional + private[spark] val EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS = + ConfigBuilder("spark.yarn.executor.failuresValidityInterval") + .doc("Interval after which Executor failures will be considered independent and not " + + "accumulate towards the attempt count.") + .timeConf(TimeUnit.MILLISECONDS) + .createOptional + private[spark] val MAX_APP_ATTEMPTS = ConfigBuilder("spark.yarn.maxAppAttempts") .doc("Maximum number of AM attempts before failing the app.") .intConf http://git-wip-us.apache.org/repos/asf/spark/blob/8b44bd52/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala ---------------------------------------------------------------------- diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 6a861d6..f4f8bd4 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -34,6 +34,7 @@ import org.apache.spark.deploy.yarn.YarnAllocator._ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler.SplitInfo +import org.apache.spark.util.ManualClock class MockResolver extends DNSToSwitchMapping { @@ -275,4 +276,49 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter assert(vmemMsg.contains("5.8 GB of 4.2 GB virtual memory used.")) assert(pmemMsg.contains("2.1 MB of 2 GB physical memory used.")) } + + test("window based failure executor counting") { + sparkConf.set("spark.yarn.executor.failuresValidityInterval", "100s") + val handler = createAllocator(4) + val clock = new ManualClock(0L) + handler.setClock(clock) + + handler.updateResourceRequests() + handler.getNumExecutorsRunning should be (0) + handler.getPendingAllocate.size should be (4) + + val containers = Seq( + createContainer("host1"), + createContainer("host2"), + createContainer("host3"), + createContainer("host4") + ) + handler.handleAllocatedContainers(containers) + + val failedStatuses = containers.map { c => + ContainerStatus.newInstance(c.getId, ContainerState.COMPLETE, "Failed", -1) + } + + handler.getNumExecutorsFailed should be (0) + + clock.advance(100 * 1000L) + handler.processCompletedContainers(failedStatuses.slice(0, 1)) + handler.getNumExecutorsFailed should be (1) + + clock.advance(101 * 1000L) + handler.getNumExecutorsFailed should be (0) + + handler.processCompletedContainers(failedStatuses.slice(1, 3)) + handler.getNumExecutorsFailed should be (2) + + clock.advance(50 * 1000L) + handler.processCompletedContainers(failedStatuses.slice(3, 4)) + handler.getNumExecutorsFailed should be (3) + + clock.advance(51 * 1000L) + handler.getNumExecutorsFailed should be (1) + + clock.advance(50 * 1000L) + handler.getNumExecutorsFailed should be (0) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
