Repository: spark
Updated Branches:
  refs/heads/master 24c512925 -> 26f092d4e


[SPARK-4138][SPARK-4139] Improve dynamic allocation settings

This should be merged after #2746 (SPARK-3795).

**SPARK-4138**. If the user sets both the number of executors and 
`spark.dynamicAllocation.enabled`, we should throw an exception.

**SPARK-4139**. If the user sets `spark.dynamicAllocation.enabled`, we should 
use the max number of executors as the starting number of executors because the 
first job is likely to run immediately after application startup. If the latter 
is not set, throw an exception.

Author: Andrew Or <[email protected]>

Closes #3002 from andrewor14/yarn-set-executors and squashes the following 
commits:

c528fce [Andrew Or] Merge branch 'master' of github.com:apache/spark into 
yarn-set-executors
55d4699 [Andrew Or] Bug fix: `isDynamicAllocationEnabled` was always false
2b0ccec [Andrew Or] Start the number of executors at the max
022bfde [Andrew Or] Guard against incompatible settings of number of executors


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/26f092d4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/26f092d4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/26f092d4

Branch: refs/heads/master
Commit: 26f092d4e32cc1f7e279646075eaf1e495395923
Parents: 24c5129
Author: Andrew Or <[email protected]>
Authored: Thu Oct 30 15:31:23 2014 -0700
Committer: Andrew Or <[email protected]>
Committed: Thu Oct 30 15:31:23 2014 -0700

----------------------------------------------------------------------
 .../yarn/ApplicationMasterArguments.scala       |  3 +-
 .../spark/deploy/yarn/ClientArguments.scala     | 30 +++++++++++++++-----
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala |  2 ++
 .../cluster/YarnClusterSchedulerBackend.scala   |  4 +--
 4 files changed, 29 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/26f092d4/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
index 5c54e34..104db4f 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.deploy.yarn
 
 import org.apache.spark.util.{MemoryParam, IntParam}
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
 import collection.mutable.ArrayBuffer
 
 class ApplicationMasterArguments(val args: Array[String]) {
@@ -26,7 +27,7 @@ class ApplicationMasterArguments(val args: Array[String]) {
   var userArgs: Seq[String] = Seq[String]()
   var executorMemory = 1024
   var executorCores = 1
-  var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
+  var numExecutors = DEFAULT_NUMBER_EXECUTORS
 
   parseArgs(args.toList)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/26f092d4/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index a12f82d..4d85945 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -20,8 +20,8 @@ package org.apache.spark.deploy.yarn
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.SparkConf
-import org.apache.spark.util.{Utils, IntParam, MemoryParam}
 import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
+import org.apache.spark.util.{Utils, IntParam, MemoryParam}
 
 // TODO: Add code and support for ensuring that yarn resource 'tasks' are 
location aware !
 private[spark] class ClientArguments(args: Array[String], sparkConf: 
SparkConf) {
@@ -33,23 +33,25 @@ private[spark] class ClientArguments(args: Array[String], 
sparkConf: SparkConf)
   var userArgs: Seq[String] = Seq[String]()
   var executorMemory = 1024 // MB
   var executorCores = 1
-  var numExecutors = 2
+  var numExecutors = DEFAULT_NUMBER_EXECUTORS
   var amQueue = sparkConf.get("spark.yarn.queue", "default")
   var amMemory: Int = 512 // MB
   var appName: String = "Spark"
   var priority = 0
 
-  parseArgs(args.toList)
-  loadEnvironmentArgs()
-
   // Additional memory to allocate to containers
   // For now, use driver's memory overhead as our AM container's memory 
overhead
-  val amMemoryOverhead = sparkConf.getInt("spark.yarn.driver.memoryOverhead", 
+  val amMemoryOverhead = sparkConf.getInt("spark.yarn.driver.memoryOverhead",
     math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toInt, MEMORY_OVERHEAD_MIN))
 
-  val executorMemoryOverhead = 
sparkConf.getInt("spark.yarn.executor.memoryOverhead", 
+  val executorMemoryOverhead = 
sparkConf.getInt("spark.yarn.executor.memoryOverhead",
     math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, 
MEMORY_OVERHEAD_MIN))
 
+  private val isDynamicAllocationEnabled =
+    sparkConf.getBoolean("spark.dynamicAllocation.enabled", false)
+
+  parseArgs(args.toList)
+  loadEnvironmentArgs()
   validateArgs()
 
   /** Load any default arguments provided through environment variables and 
Spark properties. */
@@ -64,6 +66,15 @@ private[spark] class ClientArguments(args: Array[String], 
sparkConf: SparkConf)
       .orElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES"))
       .orElse(sparkConf.getOption("spark.yarn.dist.archives").map(p => 
Utils.resolveURIs(p)))
       .orNull
+    // If dynamic allocation is enabled, start at the max number of executors
+    if (isDynamicAllocationEnabled) {
+      val maxExecutorsConf = "spark.dynamicAllocation.maxExecutors"
+      if (!sparkConf.contains(maxExecutorsConf)) {
+        throw new IllegalArgumentException(
+          s"$maxExecutorsConf must be set if dynamic allocation is enabled!")
+      }
+      numExecutors = sparkConf.get(maxExecutorsConf).toInt
+    }
   }
 
   /**
@@ -113,6 +124,11 @@ private[spark] class ClientArguments(args: Array[String], 
sparkConf: SparkConf)
           if (args(0) == "--num-workers") {
             println("--num-workers is deprecated. Use --num-executors 
instead.")
           }
+          // Dynamic allocation is not compatible with this option
+          if (isDynamicAllocationEnabled) {
+            throw new IllegalArgumentException("Explicitly setting the number 
" +
+              "of executors is not compatible with 
spark.dynamicAllocation.enabled!")
+          }
           numExecutors = value
           args = tail
 

http://git-wip-us.apache.org/repos/asf/spark/blob/26f092d4/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index e1e0144..7d453ec 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -93,6 +93,8 @@ object YarnSparkHadoopUtil {
 
   val ANY_HOST = "*"
 
+  val DEFAULT_NUMBER_EXECUTORS = 2
+
   // All RM requests are issued with same priority : we do not (yet) have any 
distinction between
   // request types (like map/reduce in hadoop for example)
   val RM_REQUEST_PRIORITY = 1

http://git-wip-us.apache.org/repos/asf/spark/blob/26f092d4/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
 
b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
index a96a54f..b1de81e 100644
--- 
a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
+++ 
b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.scheduler.cluster
 
 import org.apache.spark.SparkContext
-import org.apache.spark.deploy.yarn.ApplicationMasterArguments
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
 import org.apache.spark.scheduler.TaskSchedulerImpl
 import org.apache.spark.util.IntParam
 
@@ -29,7 +29,7 @@ private[spark] class YarnClusterSchedulerBackend(
 
   override def start() {
     super.start()
-    totalExpectedExecutors = 
ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
+    totalExpectedExecutors = DEFAULT_NUMBER_EXECUTORS
     if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) {
       totalExpectedExecutors = 
IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES"))
         .getOrElse(totalExpectedExecutors)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to