Repository: spark
Updated Branches:
  refs/heads/master bfda53f63 -> eca58755f


[SPARK-16927][SPARK-16923] Override task properties at dispatcher.

## What changes were proposed in this pull request?

- enable setting default properties for all jobs submitted through the 
dispatcher [SPARK-16927]
- remove duplication of conf vars on cluster submitted jobs [SPARK-16923] (this 
is a small fix, so I'm including in the same PR)

## How was this patch tested?

mesos/spark integration test suite
manual testing

Author: Timothy Chen <[email protected]>

Closes #14511 from mgummelt/override-props.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eca58755
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eca58755
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eca58755

Branch: refs/heads/master
Commit: eca58755fbbc11937b335ad953a3caff89b818e6
Parents: bfda53f
Author: Timothy Chen <[email protected]>
Authored: Wed Aug 10 10:11:03 2016 +0100
Committer: Sean Owen <[email protected]>
Committed: Wed Aug 10 10:11:03 2016 +0100

----------------------------------------------------------------------
 .../cluster/mesos/MesosClusterScheduler.scala   | 44 ++++++++++----------
 docs/running-on-mesos.md                        | 11 +++++
 2 files changed, 33 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/eca58755/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 2189fca..bb6f6b3 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -363,26 +363,21 @@ private[spark] class MesosClusterScheduler(
       .orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
   }
 
-  private def adjust[A, B](m: collection.Map[A, B], k: A, default: B)(f: B => 
B) = {
-    m.updated(k, f(m.getOrElse(k, default)))
-  }
-
   private def getDriverFrameworkID(desc: MesosDriverDescription): String = {
     s"${frameworkId}-${desc.submissionId}"
   }
 
-  private def getDriverEnvironment(desc: MesosDriverDescription): Environment 
= {
-    val env = {
-      val executorOpts = desc.conf.getAll.map { case (k, v) => s"-D$k=$v" 
}.mkString(" ")
-      val executorEnv = Map("SPARK_EXECUTOR_OPTS" -> executorOpts)
-      val driverEnv = desc.conf.getAllWithPrefix("spark.mesos.driverEnv.")
+  private def adjust[A, B](m: collection.Map[A, B], k: A, default: B)(f: B => 
B) = {
+    m.updated(k, f(m.getOrElse(k, default)))
+  }
 
-      var commandEnv = adjust(desc.command.environment, "SPARK_SUBMIT_OPTS", 
"")(
-        v => s"$v 
-Dspark.mesos.driver.frameworkId=${getDriverFrameworkID(desc)}"
-      )
+  private def getDriverEnvironment(desc: MesosDriverDescription): Environment 
= {
+    // TODO(mgummelt): Don't do this here.  This should be passed as a --conf
+    val commandEnv = adjust(desc.command.environment, "SPARK_SUBMIT_OPTS", "")(
+      v => s"$v -Dspark.mesos.driver.frameworkId=${getDriverFrameworkID(desc)}"
+    )
 
-      driverEnv ++ executorEnv ++ commandEnv
-    }
+    val env = desc.conf.getAllWithPrefix("spark.mesos.driverEnv.") ++ 
commandEnv
 
     val envBuilder = Environment.newBuilder()
     env.foreach { case (k, v) =>
@@ -457,12 +452,6 @@ private[spark] class MesosClusterScheduler(
       "--driver-cores", desc.cores.toString,
       "--driver-memory", s"${desc.mem}M")
 
-    val replicatedOptionsBlacklist = Set(
-      "spark.jars", // Avoids duplicate classes in classpath
-      "spark.submit.deployMode", // this would be set to `cluster`, but we 
need client
-      "spark.master" // this contains the address of the dispatcher, not master
-    )
-
     // Assume empty main class means we're running python
     if (!desc.command.mainClass.equals("")) {
       options ++= Seq("--class", desc.command.mainClass)
@@ -480,9 +469,20 @@ private[spark] class MesosClusterScheduler(
         .mkString(",")
       options ++= Seq("--py-files", formattedFiles)
     }
-    desc.conf.getAll
+
+    // --conf
+    val replicatedOptionsBlacklist = Set(
+      "spark.jars", // Avoids duplicate classes in classpath
+      "spark.submit.deployMode", // this would be set to `cluster`, but we 
need client
+      "spark.master" // this contains the address of the dispatcher, not master
+    )
+    val defaultConf = 
conf.getAllWithPrefix("spark.mesos.dispatcher.driverDefault.").toMap
+    val driverConf = desc.conf.getAll
       .filter { case (key, _) => !replicatedOptionsBlacklist.contains(key) }
-      .foreach { case (key, value) => options ++= Seq("--conf", 
s"$key=${shellEscape(value)}") }
+      .toMap
+    (defaultConf ++ driverConf).foreach { case (key, value) =>
+      options ++= Seq("--conf", s"$key=${shellEscape(value)}") }
+
     options
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/eca58755/docs/running-on-mesos.md
----------------------------------------------------------------------
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 613da68..a6ce34c 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -467,6 +467,17 @@ See the [configuration page](configuration.html) for 
information on Spark config
     Set the Spark Mesos dispatcher webui_url for interacting with the 
framework.
     If unset it will point to Spark's internal web UI.
   </td>
+  </tr>
+<tr>
+  <td><code>spark.mesos.dispatcher.driverDefault.[PropertyName]</code></td>
+  <td><code>(none)</code></td>
+  <td>
+    Set default properties for drivers submitted through the
+    dispatcher.  For example,
+    spark.mesos.dispatcher.driverProperty.spark.executor.memory=32g
+    results in the executors for all drivers submitted in cluster mode
+    to run in 32g containers.
+</td>
 </tr>
 <tr>
   <td><code>spark.mesos.dispatcher.historyServer.url</code></td>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to