Repository: spark
Updated Branches:
  refs/heads/master 186b497c9 -> 9b6de6fbc


SPARK-3178  setting SPARK_WORKER_MEMORY to a value without a label (m or g) 
sets the worker memory limit to zero

Validate the memory is greater than zero when set from the SPARK_WORKER_MEMORY 
environment variable or command line without a g or m label.  Added unit tests. 
If memory is 0 an IllegalStateException is thrown. Updated unit tests to mock 
environment variables by subclassing SparkConf (tip provided by Josh Rosen).   
Updated WorkerArguments to use SparkConf.getenv instead of System.getenv for 
reading the SPARK_WORKER_MEMORY environment variable.

Author: Bill Bejeck <[email protected]>

Closes #2309 from bbejeck/spark-memory-worker and squashes the following 
commits:

51cf915 [Bill Bejeck] SPARK-3178 - Validate the memory is greater than zero 
when set from the SPARK_WORKER_MEMORY environment variable or command line 
without a g or m label.  Added unit tests. If memory is 0 an 
IllegalStateException is thrown. Updated unit tests to mock environment 
variables by subclassing SparkConf (tip provided by Josh Rosen).   Updated 
WorkerArguments to use SparkConf.getenv instead of System.getenv for reading 
the SPARK_WORKER_MEMORY environment variable.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9b6de6fb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9b6de6fb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9b6de6fb

Branch: refs/heads/master
Commit: 9b6de6fbc00b184d81fc28ac160d03451fad80ec
Parents: 186b497
Author: Bill Bejeck <[email protected]>
Authored: Tue Oct 14 12:12:38 2014 -0700
Committer: Josh Rosen <[email protected]>
Committed: Tue Oct 14 12:12:38 2014 -0700

----------------------------------------------------------------------
 .../spark/deploy/worker/WorkerArguments.scala   | 13 +++-
 .../deploy/worker/WorkerArgumentsTest.scala     | 82 ++++++++++++++++++++
 2 files changed, 93 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9b6de6fb/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
index 1e295aa..54e3937 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
@@ -41,8 +41,8 @@ private[spark] class WorkerArguments(args: Array[String], 
conf: SparkConf) {
   if (System.getenv("SPARK_WORKER_CORES") != null) {
     cores = System.getenv("SPARK_WORKER_CORES").toInt
   }
-  if (System.getenv("SPARK_WORKER_MEMORY") != null) {
-    memory = Utils.memoryStringToMb(System.getenv("SPARK_WORKER_MEMORY"))
+  if (conf.getenv("SPARK_WORKER_MEMORY") != null) {
+    memory = Utils.memoryStringToMb(conf.getenv("SPARK_WORKER_MEMORY"))
   }
   if (System.getenv("SPARK_WORKER_WEBUI_PORT") != null) {
     webUiPort = System.getenv("SPARK_WORKER_WEBUI_PORT").toInt
@@ -56,6 +56,8 @@ private[spark] class WorkerArguments(args: Array[String], 
conf: SparkConf) {
 
   parse(args.toList)
 
+  checkWorkerMemory()
+
   def parse(args: List[String]): Unit = args match {
     case ("--ip" | "-i") :: value :: tail =>
       Utils.checkHost(value, "ip no longer supported, please use hostname " + 
value)
@@ -153,4 +155,11 @@ private[spark] class WorkerArguments(args: Array[String], 
conf: SparkConf) {
     // Leave out 1 GB for the operating system, but don't return a negative 
memory size
     math.max(totalMb - 1024, 512)
   }
+
+  def checkWorkerMemory(): Unit = {
+    if (memory <= 0) {
+      val message = "Memory can't be 0, missing a M or G on the end of the 
memory specification?"
+      throw new IllegalStateException(message)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9b6de6fb/core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala 
b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala
new file mode 100644
index 0000000..1a28a9a
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.deploy.worker
+
+import org.apache.spark.SparkConf
+import org.scalatest.FunSuite
+
+
+class WorkerArgumentsTest extends FunSuite {
+
+  test("Memory can't be set to 0 when cmd line args leave off M or G") {
+    val conf = new SparkConf
+    val args = Array("-m", "10000", "spark://localhost:0000  ")
+    intercept[IllegalStateException] {
+      new WorkerArguments(args, conf)
+    }
+  }
+
+
+  test("Memory can't be set to 0 when SPARK_WORKER_MEMORY env property leaves 
off M or G") {
+    val args = Array("spark://localhost:0000  ")
+
+    class MySparkConf extends SparkConf(false) {
+      override def getenv(name: String) = {
+        if (name == "SPARK_WORKER_MEMORY") "50000"
+        else super.getenv(name)
+      }
+
+      override def clone: SparkConf = {
+        new MySparkConf().setAll(settings)
+      }
+    }
+    val conf = new MySparkConf()
+    intercept[IllegalStateException] {
+      new WorkerArguments(args, conf)
+    }
+  }
+
+  test("Memory correctly set when SPARK_WORKER_MEMORY env property appends G") 
{
+    val args = Array("spark://localhost:0000  ")
+
+    class MySparkConf extends SparkConf(false) {
+      override def getenv(name: String) = {
+        if (name == "SPARK_WORKER_MEMORY") "5G"
+        else super.getenv(name)
+      }
+
+      override def clone: SparkConf = {
+        new MySparkConf().setAll(settings)
+      }
+    }
+    val conf = new MySparkConf()
+    val workerArgs =  new WorkerArguments(args, conf)
+    assert(workerArgs.memory === 5120)
+  }
+
+  test("Memory correctly set from args with M appended to memory value") {
+    val conf = new SparkConf
+    val args = Array("-m", "10000M", "spark://localhost:0000  ")
+
+    val workerArgs = new WorkerArguments(args, conf)
+    assert(workerArgs.memory === 10000)
+
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to