Repository: spark
Updated Branches:
  refs/heads/master 44daec5ab -> f95ac686b


[SPARK-1516]Throw exception in yarn client instead of run system.exit directly.

All the changes is in  the package of "org.apache.spark.deploy.yarn":
    1) Throw exception in ClinetArguments and ClientBase instead of exit 
directly.
    2) in Client's main method, if exception is caught, it will exit with code 
1, otherwise exit with code 0.

After the fix, if user integrate the spark yarn client into their applications, 
when the argument is wrong or the running is finished, the application won't be 
terminated.

Author: John Zhao <[email protected]>

Closes #490 from codeboyyong/jira_1516_systemexit_inyarnclient and squashes the 
following commits:

138cb48 [John Zhao] [SPARK-1516]Throw exception in yarn clinet instead of run 
system.exit directly. All the changes is in  the package of 
"org.apache.spark.deploy.yarn": 1) Add a ClientException with an exitCode 2) 
Throws exception in ClinetArguments and ClientBase instead of exit directly 3) 
in Client's main method, catch exception and exit with the exitCode.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f95ac686
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f95ac686
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f95ac686

Branch: refs/heads/master
Commit: f95ac686bcba4e677254120735b0eb7a29f20d63
Parents: 44daec5
Author: John Zhao <[email protected]>
Authored: Thu Jun 12 21:39:00 2014 -0700
Committer: Xiangrui Meng <[email protected]>
Committed: Thu Jun 12 21:39:00 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/yarn/Client.scala   | 14 ++++++++---
 .../spark/deploy/yarn/ClientArguments.scala     | 16 +++++-------
 .../apache/spark/deploy/yarn/ClientBase.scala   | 26 ++++++++++++--------
 .../org/apache/spark/deploy/yarn/Client.scala   | 14 ++++++++---
 4 files changed, 44 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f95ac686/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 8226207..4ccddc2 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -85,7 +85,6 @@ class Client(clientArgs: ClientArguments, hadoopConf: 
Configuration, spConf: Spa
   def run() {
     val appId = runApp()
     monitorApplication(appId)
-    System.exit(0)
   }
 
   def logClusterResourceDetails() {
@@ -179,8 +178,17 @@ object Client {
     System.setProperty("SPARK_YARN_MODE", "true")
 
     val sparkConf = new SparkConf
-    val args = new ClientArguments(argStrings, sparkConf)
 
-    new Client(args, sparkConf).run
+    try {
+      val args = new ClientArguments(argStrings, sparkConf)
+      new Client(args, sparkConf).run()
+    } catch {
+      case e: Exception => {
+        Console.err.println(e.getMessage)
+        System.exit(1)
+      }
+    }
+
+    System.exit(0)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f95ac686/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index b2c413b..fd3ef9e 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -125,11 +125,11 @@ class ClientArguments(val args: Array[String], val 
sparkConf: SparkConf) {
 
         case Nil =>
           if (userClass == null) {
-            printUsageAndExit(1)
+            throw new IllegalArgumentException(getUsageMessage())
           }
 
         case _ =>
-          printUsageAndExit(1, args)
+          throw new IllegalArgumentException(getUsageMessage(args))
       }
     }
 
@@ -138,11 +138,10 @@ class ClientArguments(val args: Array[String], val 
sparkConf: SparkConf) {
   }
 
 
-  def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
-    if (unknownParam != null) {
-      System.err.println("Unknown/unsupported param " + unknownParam)
-    }
-    System.err.println(
+  def getUsageMessage(unknownParam: Any = null): String = {
+    val message = if (unknownParam != null) s"Unknown/unsupported param 
$unknownParam\n" else ""
+
+    message +
       "Usage: org.apache.spark.deploy.yarn.Client [options] \n" +
       "Options:\n" +
       "  --jar JAR_PATH             Path to your application's JAR file 
(required in yarn-cluster mode)\n" +
@@ -158,8 +157,5 @@ class ClientArguments(val args: Array[String], val 
sparkConf: SparkConf) {
       "  --addJars jars             Comma separated list of local jars that 
want SparkContext.addJar to work with.\n" +
       "  --files files              Comma separated list of files to be 
distributed with the job.\n" +
       "  --archives archives        Comma separated list of archives to be 
distributed with the job."
-      )
-    System.exit(exitCode)
   }
-
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f95ac686/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 29a3568..6861b50 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -37,7 +37,7 @@ import org.apache.hadoop.yarn.api.protocolrecords._
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.util.Records
-import org.apache.spark.{Logging, SparkConf, SparkContext}
+import org.apache.spark.{SparkException, Logging, SparkConf, SparkContext}
 
 /**
  * The entry point (starting in Client#main() and Client#run()) for launching 
Spark on YARN. The
@@ -79,7 +79,7 @@ trait ClientBase extends Logging {
     ).foreach { case(cond, errStr) =>
       if (cond) {
         logError(errStr)
-        args.printUsageAndExit(1)
+        throw new IllegalArgumentException(args.getUsageMessage())
       }
     }
   }
@@ -94,15 +94,20 @@ trait ClientBase extends Logging {
 
     // If we have requested more then the clusters max for a single resource 
then exit.
     if (args.executorMemory > maxMem) {
-      logError("Required executor memory (%d MB), is above the max threshold 
(%d MB) of this cluster.".
-        format(args.executorMemory, maxMem))
-      System.exit(1)
+      val errorMessage =
+        "Required executor memory (%d MB), is above the max threshold (%d MB) 
of this cluster."
+          .format(args.executorMemory, maxMem)
+
+      logError(errorMessage)
+      throw new IllegalArgumentException(errorMessage)
     }
     val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD
     if (amMem > maxMem) {
-      logError("Required AM memory (%d) is above the max threshold (%d) of 
this cluster".
-        format(args.amMemory, maxMem))
-      System.exit(1)
+
+      val errorMessage = "Required AM memory (%d) is above the max threshold 
(%d) of this cluster."
+        .format(args.amMemory, maxMem)
+      logError(errorMessage)
+      throw new IllegalArgumentException(errorMessage)
     }
 
     // We could add checks to make sure the entire cluster has enough 
resources but that involves
@@ -186,8 +191,9 @@ trait ClientBase extends Logging {
     val delegTokenRenewer = Master.getMasterPrincipal(conf)
     if (UserGroupInformation.isSecurityEnabled()) {
       if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
-        logError("Can't get Master Kerberos principal for use as renewer")
-        System.exit(1)
+        val errorMessage = "Can't get Master Kerberos principal for use as 
renewer"
+        logError(errorMessage)
+        throw new SparkException(errorMessage)
       }
     }
     val dst = new Path(fs.getHomeDirectory(), appStagingDir)

http://git-wip-us.apache.org/repos/asf/spark/blob/f95ac686/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 2402761..80a8bce 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -95,7 +95,6 @@ class Client(clientArgs: ClientArguments, hadoopConf: 
Configuration, spConf: Spa
   def run() {
     val appId = runApp()
     monitorApplication(appId)
-    System.exit(0)
   }
 
   def logClusterResourceDetails() {
@@ -186,9 +185,18 @@ object Client {
     // see Client#setupLaunchEnv().
     System.setProperty("SPARK_YARN_MODE", "true")
     val sparkConf = new SparkConf()
-    val args = new ClientArguments(argStrings, sparkConf)
 
-    new Client(args, sparkConf).run()
+    try {
+      val args = new ClientArguments(argStrings, sparkConf)
+      new Client(args, sparkConf).run()
+    } catch {
+      case e: Exception => {
+        Console.err.println(e.getMessage)
+        System.exit(1)
+      }
+    }
+
+    System.exit(0)
   }
 
 }

Reply via email to