Repository: spark Updated Branches: refs/heads/branch-1.0 80721fb45 -> 1696a4470
[SPARK-1901] worker should make sure executor has exited before updating executor's info https://issues.apache.org/jira/browse/SPARK-1901 Author: Zhen Peng <[email protected]> Closes #854 from zhpengg/bugfix-worker-kills-executor and squashes the following commits: 21d380b [Zhen Peng] add some error messages 506cea6 [Zhen Peng] add some docs for killProcess() a0b9860 [Zhen Peng] [SPARK-1901] worker should make sure executor has exited before updating executor's info Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1696a447 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1696a447 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1696a447 Branch: refs/heads/branch-1.0 Commit: 1696a44704b8efa6515da9fa311f8acd9dda970e Parents: 80721fb Author: Zhen Peng <[email protected]> Authored: Fri May 30 10:11:02 2014 -0700 Committer: Aaron Davidson <[email protected]> Committed: Fri May 30 10:11:02 2014 -0700 ---------------------------------------------------------------------- .../spark/deploy/worker/ExecutorRunner.scala | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/1696a447/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 2051403..d27e0e1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -61,17 +61,23 @@ private[spark] class ExecutorRunner( // Shutdown hook that kills actors on shutdown. shutdownHook = new Thread() { override def run() { - killProcess() + killProcess(Some("Worker shutting down")) } } Runtime.getRuntime.addShutdownHook(shutdownHook) } - private def killProcess() { + /** + * kill executor process, wait for exit and notify worker to update resource status + * + * @param message the exception message which caused the executor's death + */ + private def killProcess(message: Option[String]) { if (process != null) { logInfo("Killing process!") process.destroy() - process.waitFor() + val exitCode = process.waitFor() + worker ! ExecutorStateChanged(appId, execId, state, message, Some(exitCode)) } } @@ -82,7 +88,6 @@ private[spark] class ExecutorRunner( workerThread.interrupt() workerThread = null state = ExecutorState.KILLED - worker ! ExecutorStateChanged(appId, execId, state, None, None) Runtime.getRuntime.removeShutdownHook(shutdownHook) } } @@ -148,14 +153,13 @@ private[spark] class ExecutorRunner( } catch { case interrupted: InterruptedException => { logInfo("Runner thread for executor " + fullId + " interrupted") - killProcess() + state = ExecutorState.KILLED + killProcess(None) } case e: Exception => { logError("Error running executor", e) - killProcess() state = ExecutorState.FAILED - val message = e.getClass + ": " + e.getMessage - worker ! ExecutorStateChanged(appId, execId, state, Some(message), None) + killProcess(Some(e.toString)) } } }
