Repository: spark
Updated Branches:
  refs/heads/master c9e05a315 -> 37bf76a2d


[SPARK-8302] Support heterogeneous cluster install paths on YARN.

Some users have Hadoop installations on different paths across
their cluster. Currently, that makes it hard to set up some
configuration in Spark since that requires hardcoding paths to
jar files or native libraries, which wouldn't work on such a cluster.

This change introduces a couple of YARN-specific configurations
that instruct the backend to replace certain paths when launching
remote processes. That way, if the configuration says the Spark
jar is in "/spark/spark.jar", and also says that "/spark" should be
replaced with "{{SPARK_INSTALL_DIR}}", YARN will start containers
in the NMs with "{{SPARK_INSTALL_DIR}}/spark.jar" as the location
of the jar.

Coupled with YARN's environment whitelist (which allows certain
env variables to be exposed to containers), this allows users to
support such heterogeneous environments, as long as a single
replacement is enough. (Otherwise, this feature would need to be
extended to support multiple path replacements.)

Author: Marcelo Vanzin <[email protected]>

Closes #6752 from vanzin/SPARK-8302 and squashes the following commits:

4bff8d4 [Marcelo Vanzin] Add docs, rename configs.
0aa2a02 [Marcelo Vanzin] Only do replacement for paths that need it.
2e9cc9d [Marcelo Vanzin] Style.
a5e1f68 [Marcelo Vanzin] [SPARK-8302] Support heterogeneous cluster install 
paths on YARN.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/37bf76a2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/37bf76a2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/37bf76a2

Branch: refs/heads/master
Commit: 37bf76a2de2143ec6348a3d43b782227849520cc
Parents: c9e05a3
Author: Marcelo Vanzin <[email protected]>
Authored: Fri Jun 26 08:45:22 2015 -0500
Committer: Imran Rashid <[email protected]>
Committed: Fri Jun 26 08:45:22 2015 -0500

----------------------------------------------------------------------
 docs/running-on-yarn.md                         | 26 +++++++++++
 .../org/apache/spark/deploy/yarn/Client.scala   | 47 +++++++++++++++-----
 .../spark/deploy/yarn/ExecutorRunnable.scala    |  4 +-
 .../apache/spark/deploy/yarn/ClientSuite.scala  | 19 ++++++++
 4 files changed, 84 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/37bf76a2/docs/running-on-yarn.md
----------------------------------------------------------------------
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 96cf612..3f8a093 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -258,6 +258,32 @@ Most of the configs are the same for Spark on YARN as for 
other deployment modes
   Principal to be used to login to KDC, while running on secure HDFS.
   </td>
 </tr>
+<tr>
+  <td><code>spark.yarn.config.gatewayPath</code></td>
+  <td>(none)</td>
+  <td>
+  A path that is valid on the gateway host (the host where a Spark application 
is started) but may
+  differ for paths for the same resource in other nodes in the cluster. 
Coupled with
+  <code>spark.yarn.config.replacementPath</code>, this is used to support 
clusters with
+  heterogeneous configurations, so that Spark can correctly launch remote 
processes.
+  <p/>
+  The replacement path normally will contain a reference to some environment 
variable exported by
+  YARN (and, thus, visible to Spark containers).
+  <p/>
+  For example, if the gateway node has Hadoop libraries installed on 
<code>/disk1/hadoop</code>, and
+  the location of the Hadoop install is exported by YARN as the  
<code>HADOOP_HOME</code>
+  environment variable, setting this value to <code>/disk1/hadoop</code> and 
the replacement path to
+  <code>$HADOOP_HOME</code> will make sure that paths used to launch remote 
processes properly
+  reference the local YARN configuration.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.yarn.config.replacementPath</code></td>
+  <td>(none)</td>
+  <td>
+  See <code>spark.yarn.config.gatewayPath</code>.
+  </td>
+</tr>
 </table>
 
 # Launching Spark on YARN

http://git-wip-us.apache.org/repos/asf/spark/blob/37bf76a2/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index da1ec2a..67a5c95 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -676,7 +676,7 @@ private[spark] class Client(
       val libraryPaths = Seq(sys.props.get("spark.driver.extraLibraryPath"),
         sys.props.get("spark.driver.libraryPath")).flatten
       if (libraryPaths.nonEmpty) {
-        prefixEnv = Some(Utils.libraryPathEnvPrefix(libraryPaths))
+        prefixEnv = Some(getClusterPath(sparkConf, 
Utils.libraryPathEnvPrefix(libraryPaths)))
       }
       if (sparkConf.getOption("spark.yarn.am.extraJavaOptions").isDefined) {
         logWarning("spark.yarn.am.extraJavaOptions will not take effect in 
cluster mode")
@@ -698,7 +698,7 @@ private[spark] class Client(
       }
 
       sparkConf.getOption("spark.yarn.am.extraLibraryPath").foreach { paths =>
-        prefixEnv = Some(Utils.libraryPathEnvPrefix(Seq(paths)))
+        prefixEnv = Some(getClusterPath(sparkConf, 
Utils.libraryPathEnvPrefix(Seq(paths))))
       }
     }
 
@@ -1106,10 +1106,10 @@ object Client extends Logging {
       env: HashMap[String, String],
       isAM: Boolean,
       extraClassPath: Option[String] = None): Unit = {
-    extraClassPath.foreach(addClasspathEntry(_, env))
-    addClasspathEntry(
-      YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), env
-    )
+    extraClassPath.foreach { cp =>
+      addClasspathEntry(getClusterPath(sparkConf, cp), env)
+    }
+    addClasspathEntry(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), 
env)
 
     if (isAM) {
       addClasspathEntry(
@@ -1125,12 +1125,14 @@ object Client extends Logging {
           getUserClasspath(sparkConf)
         }
       userClassPath.foreach { x =>
-        addFileToClasspath(x, null, env)
+        addFileToClasspath(sparkConf, x, null, env)
       }
     }
-    addFileToClasspath(new URI(sparkJar(sparkConf)), SPARK_JAR, env)
+    addFileToClasspath(sparkConf, new URI(sparkJar(sparkConf)), SPARK_JAR, env)
     populateHadoopClasspath(conf, env)
-    sys.env.get(ENV_DIST_CLASSPATH).foreach(addClasspathEntry(_, env))
+    sys.env.get(ENV_DIST_CLASSPATH).foreach { cp =>
+      addClasspathEntry(getClusterPath(sparkConf, cp), env)
+    }
   }
 
   /**
@@ -1159,16 +1161,18 @@ object Client extends Logging {
    *
    * If not a "local:" file and no alternate name, the environment is not 
modified.
    *
+   * @parma conf      Spark configuration.
    * @param uri       URI to add to classpath (optional).
    * @param fileName  Alternate name for the file (optional).
    * @param env       Map holding the environment variables.
    */
   private def addFileToClasspath(
+      conf: SparkConf,
       uri: URI,
       fileName: String,
       env: HashMap[String, String]): Unit = {
     if (uri != null && uri.getScheme == LOCAL_SCHEME) {
-      addClasspathEntry(uri.getPath, env)
+      addClasspathEntry(getClusterPath(conf, uri.getPath), env)
     } else if (fileName != null) {
       addClasspathEntry(buildPath(
         YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), fileName), env)
@@ -1183,6 +1187,29 @@ object Client extends Logging {
     YarnSparkHadoopUtil.addPathToEnvironment(env, Environment.CLASSPATH.name, 
path)
 
   /**
+   * Returns the path to be sent to the NM for a path that is valid on the 
gateway.
+   *
+   * This method uses two configuration values:
+   *
+   * - spark.yarn.config.gatewayPath: a string that identifies a portion of 
the input path that may
+   *   only be valid in the gateway node.
+   * - spark.yarn.config.replacementPath: a string with which to replace the 
gateway path. This may
+   *   contain, for example, env variable references, which will be expanded 
by the NMs when
+   *   starting containers.
+   *
+   * If either config is not available, the input path is returned.
+   */
+  def getClusterPath(conf: SparkConf, path: String): String = {
+    val localPath = conf.get("spark.yarn.config.gatewayPath", null)
+    val clusterPath = conf.get("spark.yarn.config.replacementPath", null)
+    if (localPath != null && clusterPath != null) {
+      path.replace(localPath, clusterPath)
+    } else {
+      path
+    }
+  }
+
+  /**
    * Obtains token for the Hive metastore and adds them to the credentials.
    */
   private def obtainTokenForHiveMetastore(conf: Configuration, credentials: 
Credentials) {

http://git-wip-us.apache.org/repos/asf/spark/blob/37bf76a2/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index b093708..78e27fb 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -146,7 +146,7 @@ class ExecutorRunnable(
       javaOpts ++= 
Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell)
     }
     sys.props.get("spark.executor.extraLibraryPath").foreach { p =>
-      prefixEnv = Some(Utils.libraryPathEnvPrefix(Seq(p)))
+      prefixEnv = Some(Client.getClusterPath(sparkConf, 
Utils.libraryPathEnvPrefix(Seq(p))))
     }
 
     javaOpts += "-Djava.io.tmpdir=" +
@@ -195,7 +195,7 @@ class ExecutorRunnable(
     val userClassPath = Client.getUserClasspath(sparkConf).flatMap { uri =>
       val absPath =
         if (new File(uri.getPath()).isAbsolute()) {
-          uri.getPath()
+          Client.getClusterPath(sparkConf, uri.getPath())
         } else {
           Client.buildPath(Environment.PWD.$(), uri.getPath())
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/37bf76a2/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala 
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 4ec976a..837f8d3 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -151,6 +151,25 @@ class ClientSuite extends SparkFunSuite with Matchers with 
BeforeAndAfterAll {
     }
   }
 
+  test("Cluster path translation") {
+    val conf = new Configuration()
+    val sparkConf = new SparkConf()
+      .set(Client.CONF_SPARK_JAR, "local:/localPath/spark.jar")
+      .set("spark.yarn.config.gatewayPath", "/localPath")
+      .set("spark.yarn.config.replacementPath", "/remotePath")
+
+    Client.getClusterPath(sparkConf, "/localPath") should be ("/remotePath")
+    Client.getClusterPath(sparkConf, "/localPath/1:/localPath/2") should be (
+      "/remotePath/1:/remotePath/2")
+
+    val env = new MutableHashMap[String, String]()
+    Client.populateClasspath(null, conf, sparkConf, env, false,
+      extraClassPath = Some("/localPath/my1.jar"))
+    val cp = classpath(env)
+    cp should contain ("/remotePath/spark.jar")
+    cp should contain ("/remotePath/my1.jar")
+  }
+
   object Fixtures {
 
     val knownDefYarnAppCP: Seq[String] =


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to