Repository: spark Updated Branches: refs/heads/master d17e5f2f1 -> bad0f7dbb
[SPARK-16095][YARN] Yarn cluster mode should report correct state to SparkLauncher ## What changes were proposed in this pull request? Yarn cluster mode should return correct state for SparkLauncher ## How was this patch tested? unit test Author: peng.zhang <[email protected]> Closes #13962 from renozhang/SPARK-16095-spark-launcher-wrong-state. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bad0f7db Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bad0f7db Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bad0f7db Branch: refs/heads/master Commit: bad0f7dbba2eda149ee4fc5810674d971d17874a Parents: d17e5f2 Author: peng.zhang <[email protected]> Authored: Fri Jul 1 15:51:21 2016 -0700 Committer: Marcelo Vanzin <[email protected]> Committed: Fri Jul 1 15:51:21 2016 -0700 ---------------------------------------------------------------------- .../org/apache/spark/deploy/yarn/Client.scala | 9 ++++- .../spark/deploy/yarn/YarnClusterSuite.scala | 37 ++++++++++++-------- 2 files changed, 31 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/bad0f7db/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index d63579f..244d1a4 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -1080,7 +1080,14 @@ private[spark] class Client( case YarnApplicationState.RUNNING => reportLauncherState(SparkAppHandle.State.RUNNING) case YarnApplicationState.FINISHED => - reportLauncherState(SparkAppHandle.State.FINISHED) + report.getFinalApplicationStatus match { + case FinalApplicationStatus.FAILED => + reportLauncherState(SparkAppHandle.State.FAILED) + case FinalApplicationStatus.KILLED => + reportLauncherState(SparkAppHandle.State.KILLED) + case _ => + reportLauncherState(SparkAppHandle.State.FINISHED) + } case YarnApplicationState.FAILED => reportLauncherState(SparkAppHandle.State.FAILED) case YarnApplicationState.KILLED => http://git-wip-us.apache.org/repos/asf/spark/blob/bad0f7db/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala ---------------------------------------------------------------------- diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 6b20dea..9085fca 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -120,6 +120,11 @@ class YarnClusterSuite extends BaseYarnClusterSuite { finalState should be (SparkAppHandle.State.FAILED) } + test("run Spark in yarn-cluster mode failure after sc initialized") { + val finalState = runSpark(false, mainClassName(YarnClusterDriverWithFailure.getClass)) + finalState should be (SparkAppHandle.State.FAILED) + } + test("run Python application in yarn-client mode") { testPySpark(true) } @@ -259,6 +264,16 @@ private[spark] class SaveExecutorInfo extends SparkListener { } } +private object YarnClusterDriverWithFailure extends Logging with Matchers { + def main(args: Array[String]): Unit = { + val sc = new SparkContext(new SparkConf() + .set("spark.extraListeners", classOf[SaveExecutorInfo].getName) + .setAppName("yarn test with failure")) + + throw new Exception("exception after sc initialized") + } +} + private object YarnClusterDriver extends Logging with Matchers { val WAIT_TIMEOUT_MILLIS = 10000 @@ -287,19 +302,19 @@ private object YarnClusterDriver extends Logging with Matchers { sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) data should be (Set(1, 2, 3, 4)) result = "success" + + // Verify that the config archive is correctly placed in the classpath of all containers. + val confFile = "/" + Client.SPARK_CONF_FILE + assert(getClass().getResource(confFile) != null) + val configFromExecutors = sc.parallelize(1 to 4, 4) + .map { _ => Option(getClass().getResource(confFile)).map(_.toString).orNull } + .collect() + assert(configFromExecutors.find(_ == null) === None) } finally { Files.write(result, status, StandardCharsets.UTF_8) sc.stop() } - // Verify that the config archive is correctly placed in the classpath of all containers. - val confFile = "/" + Client.SPARK_CONF_FILE - assert(getClass().getResource(confFile) != null) - val configFromExecutors = sc.parallelize(1 to 4, 4) - .map { _ => Option(getClass().getResource(confFile)).map(_.toString).orNull } - .collect() - assert(configFromExecutors.find(_ == null) === None) - // verify log urls are present val listeners = sc.listenerBus.findListenersByClass[SaveExecutorInfo] assert(listeners.size === 1) @@ -330,9 +345,6 @@ private object YarnClusterDriver extends Logging with Matchers { } private object YarnClasspathTest extends Logging { - - var exitCode = 0 - def error(m: String, ex: Throwable = null): Unit = { logError(m, ex) // scalastyle:off println @@ -361,7 +373,6 @@ private object YarnClasspathTest extends Logging { } finally { sc.stop() } - System.exit(exitCode) } private def readResource(resultPath: String): Unit = { @@ -374,8 +385,6 @@ private object YarnClasspathTest extends Logging { } catch { case t: Throwable => error(s"loading test.resource to $resultPath", t) - // set the exit code if not yet set - exitCode = 2 } finally { Files.write(result, new File(resultPath), StandardCharsets.UTF_8) } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
