Repository: spark
Updated Branches:
  refs/heads/branch-1.1 eac740e9a -> babcafaa8


[SPARK-1010] Clean up uses of System.setProperty in unit tests

Several of our tests call System.setProperty (or test code which implicitly 
sets system properties) and don't always reset/clear the modified properties, 
which can create ordering dependencies between tests and cause hard-to-diagnose 
failures.

This patch removes most uses of System.setProperty from our tests, since in 
most cases we can use SparkConf to set these configurations (there are a few 
exceptions, including the tests of SparkConf itself).

For the cases where we continue to use System.setProperty, this patch 
introduces a `ResetSystemProperties` ScalaTest mixin class which snapshots the 
system properties before individual tests and to automatically restores them on 
test completion / failure.  See the block comment at the top of the 
ResetSystemProperties class for more details.

Author: Josh Rosen <[email protected]>

Closes #3739 from JoshRosen/cleanup-system-properties-in-tests and squashes the 
following commits:

0236d66 [Josh Rosen] Replace setProperty uses in two example programs / tools
3888fe3 [Josh Rosen] Remove setProperty use in LocalJavaStreamingContext
4f4031d [Josh Rosen] Add note on why SparkSubmitSuite needs 
ResetSystemProperties
4742a5b [Josh Rosen] Clarify ResetSystemProperties trait inheritance ordering.
0eaf0b6 [Josh Rosen] Remove setProperty call in TaskResultGetterSuite.
7a3d224 [Josh Rosen] Fix trait ordering
3fdb554 [Josh Rosen] Remove setProperty call in TaskSchedulerImplSuite
bee20df [Josh Rosen] Remove setProperty calls in 
SparkContextSchedulerCreationSuite
655587c [Josh Rosen] Remove setProperty calls in JobCancellationSuite
3f2f955 [Josh Rosen] Remove System.setProperty calls in DistributedSuite
cfe9cce [Josh Rosen] Remove use of system properties in SparkContextSuite
8783ab0 [Josh Rosen] Remove TestUtils.setSystemProperty, since it is subsumed 
by the ResetSystemProperties trait.
633a84a [Josh Rosen] Remove use of system properties in FileServerSuite
25bfce2 [Josh Rosen] Use ResetSystemProperties in UtilsSuite
1d1aa5a [Josh Rosen] Use ResetSystemProperties in SizeEstimatorSuite
dd9492b [Josh Rosen] Use ResetSystemProperties in AkkaUtilsSuite
b0daff2 [Josh Rosen] Use ResetSystemProperties in BlockManagerSuite
e9ded62 [Josh Rosen] Use ResetSystemProperties in TaskSchedulerImplSuite
5b3cb54 [Josh Rosen] Use ResetSystemProperties in SparkListenerSuite
0995c4b [Josh Rosen] Use ResetSystemProperties in 
SparkContextSchedulerCreationSuite
c83ded8 [Josh Rosen] Use ResetSystemProperties in SparkConfSuite
51aa870 [Josh Rosen] Use withSystemProperty in ShuffleSuite
60a63a1 [Josh Rosen] Use ResetSystemProperties in JobCancellationSuite
14a92e4 [Josh Rosen] Use withSystemProperty in FileServerSuite
628f46c [Josh Rosen] Use ResetSystemProperties in DistributedSuite
9e3e0dd [Josh Rosen] Add ResetSystemProperties test fixture mixin; use it in 
SparkSubmitSuite.
4dcea38 [Josh Rosen] Move withSystemProperty to TestUtils class.

(cherry picked from commit 352ed6bbe3c3b67e52e298e7c535ae414d96beca)
Signed-off-by: Josh Rosen <[email protected]>

Conflicts:
        core/src/test/scala/org/apache/spark/ShuffleSuite.scala
        core/src/test/scala/org/apache/spark/SparkConfSuite.scala
        
core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
        core/src/test/scala/org/apache/spark/SparkContextSuite.scala
        core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
        core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
        
external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
        
external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
        
external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
        
external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
        tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/babcafaa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/babcafaa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/babcafaa

Branch: refs/heads/branch-1.1
Commit: babcafaa8695a0599939c5844a0570489eb5f27d
Parents: eac740e
Author: Josh Rosen <[email protected]>
Authored: Tue Dec 30 18:12:20 2014 -0800
Committer: Josh Rosen <[email protected]>
Committed: Wed Dec 31 11:31:44 2014 -0800

----------------------------------------------------------------------
 .../org/apache/spark/DistributedSuite.scala     | 21 ++------
 .../org/apache/spark/FileServerSuite.scala      | 16 +++---
 .../org/apache/spark/JobCancellationSuite.scala | 21 ++++----
 .../scala/org/apache/spark/ShuffleSuite.scala   | 22 ++++----
 .../scala/org/apache/spark/SparkConfSuite.scala | 52 +++++++-----------
 .../SparkContextSchedulerCreationSuite.scala    | 31 +++++------
 .../apache/spark/deploy/SparkSubmitSuite.scala  |  6 ++-
 .../spark/scheduler/SparkListenerSuite.scala    |  9 ++--
 .../spark/scheduler/TaskResultGetterSuite.scala | 23 +++-----
 .../scheduler/TaskSchedulerImplSuite.scala      |  6 +--
 .../spark/storage/BlockManagerSuite.scala       | 24 +++------
 .../org/apache/spark/util/AkkaUtilsSuite.scala  |  2 +-
 .../spark/util/ResetSystemProperties.scala      | 57 ++++++++++++++++++++
 .../apache/spark/util/SizeEstimatorSuite.scala  | 38 +++----------
 .../org/apache/spark/util/UtilsSuite.scala      |  2 +-
 .../apache/spark/examples/BroadcastTest.scala   |  6 +--
 .../streaming/LocalJavaStreamingContext.java    |  8 ++-
 .../apache/spark/tools/StoragePerfTester.scala  |  9 ++--
 18 files changed, 170 insertions(+), 183 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/babcafaa/core/src/test/scala/org/apache/spark/DistributedSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala 
b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 41c294f..a36eef2 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark
 
-import org.scalatest.BeforeAndAfter
 import org.scalatest.FunSuite
 import org.scalatest.concurrent.Timeouts._
 import org.scalatest.Matchers
@@ -31,16 +30,10 @@ class NotSerializableClass
 class NotSerializableExn(val notSer: NotSerializableClass) extends Throwable() 
{}
 
 
-class DistributedSuite extends FunSuite with Matchers with BeforeAndAfter
-  with LocalSparkContext {
+class DistributedSuite extends FunSuite with Matchers with LocalSparkContext {
 
   val clusterUrl = "local-cluster[2,1,512]"
 
-  after {
-    System.clearProperty("spark.reducer.maxMbInFlight")
-    System.clearProperty("spark.storage.memoryFraction")
-  }
-
   test("task throws not serializable exception") {
     // Ensures that executors do not crash when an exn is not serializable. If 
executors crash,
     // this test will hang. Correct behavior is that executors don't crash but 
fail tasks
@@ -86,15 +79,14 @@ class DistributedSuite extends FunSuite with Matchers with 
BeforeAndAfter
   }
 
   test("groupByKey where map output sizes exceed maxMbInFlight") {
-    System.setProperty("spark.reducer.maxMbInFlight", "1")
-    sc = new SparkContext(clusterUrl, "test")
+    val conf = new SparkConf().set("spark.reducer.maxMbInFlight", "1")
+    sc = new SparkContext(clusterUrl, "test", conf)
     // This data should be around 20 MB, so even with 4 mappers and 2 
reducers, each map output
     // file should be about 2.5 MB
     val pairs = sc.parallelize(1 to 2000, 4).map(x => (x % 16, new 
Array[Byte](10000)))
     val groups = pairs.groupByKey(2).map(x => (x._1, x._2.size)).collect()
     assert(groups.length === 16)
     assert(groups.map(_._2).sum === 2000)
-    // Note that spark.reducer.maxMbInFlight will be cleared in the test 
suite's after{} block
   }
 
   test("accumulators") {
@@ -211,7 +203,6 @@ class DistributedSuite extends FunSuite with Matchers with 
BeforeAndAfter
   }
 
   test("compute without caching when no partitions fit in memory") {
-    System.setProperty("spark.storage.memoryFraction", "0.0001")
     sc = new SparkContext(clusterUrl, "test")
     // data will be 4 million * 4 bytes = 16 MB in size, but our 
memoryFraction set the cache
     // to only 50 KB (0.0001 of 512 MB), so no partitions should fit in memory
@@ -219,12 +210,11 @@ class DistributedSuite extends FunSuite with Matchers 
with BeforeAndAfter
     assert(data.count() === 4000000)
     assert(data.count() === 4000000)
     assert(data.count() === 4000000)
-    System.clearProperty("spark.storage.memoryFraction")
   }
 
   test("compute when only some partitions fit in memory") {
-    System.setProperty("spark.storage.memoryFraction", "0.01")
-    sc = new SparkContext(clusterUrl, "test")
+    val conf = new SparkConf().set("spark.storage.memoryFraction", "0.01")
+    sc = new SparkContext(clusterUrl, "test", conf)
     // data will be 4 million * 4 bytes = 16 MB in size, but our 
memoryFraction set the cache
     // to only 5 MB (0.01 of 512 MB), so not all of it will fit in memory; we 
use 20 partitions
     // to make sure that *some* of them do fit though
@@ -232,7 +222,6 @@ class DistributedSuite extends FunSuite with Matchers with 
BeforeAndAfter
     assert(data.count() === 4000000)
     assert(data.count() === 4000000)
     assert(data.count() === 4000000)
-    System.clearProperty("spark.storage.memoryFraction")
   }
 
   test("passing environment variables to cluster") {

http://git-wip-us.apache.org/repos/asf/spark/blob/babcafaa/core/src/test/scala/org/apache/spark/FileServerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala 
b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
index 5997e01..9ffe7df 100644
--- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
@@ -33,10 +33,11 @@ class FileServerSuite extends FunSuite with 
LocalSparkContext {
   @transient var tmpFile: File = _
   @transient var tmpJarUrl: String = _
 
+  def newConf: SparkConf = new SparkConf(loadDefaults = 
false).set("spark.authenticate", "false")
+
   override def beforeEach() {
     super.beforeEach()
     resetSparkContext()
-    System.setProperty("spark.authenticate", "false")
   }
 
   override def beforeAll() {
@@ -55,7 +56,6 @@ class FileServerSuite extends FunSuite with LocalSparkContext 
{
     val jarFile = new File(testTempDir, "test.jar")
     val jarStream = new FileOutputStream(jarFile)
     val jar = new JarOutputStream(jarStream, new java.util.jar.Manifest())
-    System.setProperty("spark.authenticate", "false")
 
     val jarEntry = new JarEntry(textFile.getName)
     jar.putNextEntry(jarEntry)
@@ -77,7 +77,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext 
{
   }
 
   test("Distributing files locally") {
-    sc = new SparkContext("local[4]", "test")
+    sc = new SparkContext("local[4]", "test", newConf)
     sc.addFile(tmpFile.toString)
     val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
     val result = sc.parallelize(testData).reduceByKey {
@@ -111,7 +111,7 @@ class FileServerSuite extends FunSuite with 
LocalSparkContext {
 
   test("Distributing files locally using URL as input") {
     // addFile("file:///....")
-    sc = new SparkContext("local[4]", "test")
+    sc = new SparkContext("local[4]", "test", newConf)
     sc.addFile(new File(tmpFile.toString).toURI.toString)
     val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
     val result = sc.parallelize(testData).reduceByKey {
@@ -125,7 +125,7 @@ class FileServerSuite extends FunSuite with 
LocalSparkContext {
   }
 
   test ("Dynamically adding JARS locally") {
-    sc = new SparkContext("local[4]", "test")
+    sc = new SparkContext("local[4]", "test", newConf)
     sc.addJar(tmpJarUrl)
     val testData = Array((1, 1))
     sc.parallelize(testData).foreach { x =>
@@ -136,7 +136,7 @@ class FileServerSuite extends FunSuite with 
LocalSparkContext {
   }
 
   test("Distributing files on a standalone cluster") {
-    sc = new SparkContext("local-cluster[1,1,512]", "test")
+    sc = new SparkContext("local-cluster[1,1,512]", "test", newConf)
     sc.addFile(tmpFile.toString)
     val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
     val result = sc.parallelize(testData).reduceByKey {
@@ -150,7 +150,7 @@ class FileServerSuite extends FunSuite with 
LocalSparkContext {
   }
 
   test ("Dynamically adding JARS on a standalone cluster") {
-    sc = new SparkContext("local-cluster[1,1,512]", "test")
+    sc = new SparkContext("local-cluster[1,1,512]", "test", newConf)
     sc.addJar(tmpJarUrl)
     val testData = Array((1,1))
     sc.parallelize(testData).foreach { x =>
@@ -161,7 +161,7 @@ class FileServerSuite extends FunSuite with 
LocalSparkContext {
   }
 
   test ("Dynamically adding JARS on a standalone cluster using local: URL") {
-    sc = new SparkContext("local-cluster[1,1,512]", "test")
+    sc = new SparkContext("local-cluster[1,1,512]", "test", newConf)
     sc.addJar(tmpJarUrl.replace("file", "local"))
     val testData = Array((1,1))
     sc.parallelize(testData).foreach { x =>

http://git-wip-us.apache.org/repos/asf/spark/blob/babcafaa/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala 
b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
index a57430e..a5d2b42 100644
--- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
@@ -41,12 +41,11 @@ class JobCancellationSuite extends FunSuite with Matchers 
with BeforeAndAfter
   override def afterEach() {
     super.afterEach()
     resetSparkContext()
-    System.clearProperty("spark.scheduler.mode")
   }
 
   test("local mode, FIFO scheduler") {
-    System.setProperty("spark.scheduler.mode", "FIFO")
-    sc = new SparkContext("local[2]", "test")
+    val conf = new SparkConf().set("spark.scheduler.mode", "FIFO")
+    sc = new SparkContext("local[2]", "test", conf)
     testCount()
     testTake()
     // Make sure we can still launch tasks.
@@ -54,10 +53,10 @@ class JobCancellationSuite extends FunSuite with Matchers 
with BeforeAndAfter
   }
 
   test("local mode, fair scheduler") {
-    System.setProperty("spark.scheduler.mode", "FAIR")
+    val conf = new SparkConf().set("spark.scheduler.mode", "FAIR")
     val xmlPath = 
getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
-    System.setProperty("spark.scheduler.allocation.file", xmlPath)
-    sc = new SparkContext("local[2]", "test")
+    conf.set("spark.scheduler.allocation.file", xmlPath)
+    sc = new SparkContext("local[2]", "test", conf)
     testCount()
     testTake()
     // Make sure we can still launch tasks.
@@ -65,8 +64,8 @@ class JobCancellationSuite extends FunSuite with Matchers 
with BeforeAndAfter
   }
 
   test("cluster mode, FIFO scheduler") {
-    System.setProperty("spark.scheduler.mode", "FIFO")
-    sc = new SparkContext("local-cluster[2,1,512]", "test")
+    val conf = new SparkConf().set("spark.scheduler.mode", "FIFO")
+    sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
     testCount()
     testTake()
     // Make sure we can still launch tasks.
@@ -74,10 +73,10 @@ class JobCancellationSuite extends FunSuite with Matchers 
with BeforeAndAfter
   }
 
   test("cluster mode, fair scheduler") {
-    System.setProperty("spark.scheduler.mode", "FAIR")
+    val conf = new SparkConf().set("spark.scheduler.mode", "FAIR")
     val xmlPath = 
getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
-    System.setProperty("spark.scheduler.allocation.file", xmlPath)
-    sc = new SparkContext("local-cluster[2,1,512]", "test")
+    conf.set("spark.scheduler.allocation.file", xmlPath)
+    sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
     testCount()
     testTake()
     // Make sure we can still launch tasks.

http://git-wip-us.apache.org/repos/asf/spark/blob/babcafaa/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala 
b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index 704b382..625702b 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -31,19 +31,15 @@ class ShuffleSuite extends FunSuite with Matchers with 
LocalSparkContext {
   val conf = new SparkConf(loadDefaults = false)
 
   test("groupByKey without compression") {
-    try {
-      System.setProperty("spark.shuffle.compress", "false")
-      sc = new SparkContext("local", "test")
-      val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 4)
-      val groups = pairs.groupByKey(4).collect()
-      assert(groups.size === 2)
-      val valuesFor1 = groups.find(_._1 == 1).get._2
-      assert(valuesFor1.toList.sorted === List(1, 2, 3))
-      val valuesFor2 = groups.find(_._1 == 2).get._2
-      assert(valuesFor2.toList.sorted === List(1))
-    } finally {
-      System.setProperty("spark.shuffle.compress", "true")
-    }
+    val myConf = conf.clone().set("spark.shuffle.compress", "false")
+    sc = new SparkContext("local", "test", myConf)
+    val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 4)
+    val groups = pairs.groupByKey(4).collect()
+    assert(groups.size === 2)
+    val valuesFor1 = groups.find(_._1 == 1).get._2
+    assert(valuesFor1.toList.sorted === List(1, 2, 3))
+    val valuesFor2 = groups.find(_._1 == 2).get._2
+    assert(valuesFor2.toList.sorted === List(1))
   }
 
   test("shuffle non-zero block size") {

http://git-wip-us.apache.org/repos/asf/spark/blob/babcafaa/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 87e9012..ae8a3c7 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -19,25 +19,19 @@ package org.apache.spark
 
 import org.scalatest.FunSuite
 
-class SparkConfSuite extends FunSuite with LocalSparkContext {
+import org.apache.spark.util.ResetSystemProperties
+
+class SparkConfSuite extends FunSuite with LocalSparkContext with 
ResetSystemProperties {
   test("loading from system properties") {
-    try {
-      System.setProperty("spark.test.testProperty", "2")
-      val conf = new SparkConf()
-      assert(conf.get("spark.test.testProperty") === "2")
-    } finally {
-      System.clearProperty("spark.test.testProperty")
-    }
+    System.setProperty("spark.test.testProperty", "2")
+    val conf = new SparkConf()
+    assert(conf.get("spark.test.testProperty") === "2")
   }
 
   test("initializing without loading defaults") {
-    try {
-      System.setProperty("spark.test.testProperty", "2")
-      val conf = new SparkConf(false)
-      assert(!conf.contains("spark.test.testProperty"))
-    } finally {
-      System.clearProperty("spark.test.testProperty")
-    }
+    System.setProperty("spark.test.testProperty", "2")
+    val conf = new SparkConf(false)
+    assert(!conf.contains("spark.test.testProperty"))
   }
 
   test("named set methods") {
@@ -115,22 +109,16 @@ class SparkConfSuite extends FunSuite with 
LocalSparkContext {
 
   test("nested property names") {
     // This wasn't supported by some external conf parsing libraries
-    try {
-      System.setProperty("spark.test.a", "a")
-      System.setProperty("spark.test.a.b", "a.b")
-      System.setProperty("spark.test.a.b.c", "a.b.c")
-      val conf = new SparkConf()
-      assert(conf.get("spark.test.a") === "a")
-      assert(conf.get("spark.test.a.b") === "a.b")
-      assert(conf.get("spark.test.a.b.c") === "a.b.c")
-      conf.set("spark.test.a.b", "A.B")
-      assert(conf.get("spark.test.a") === "a")
-      assert(conf.get("spark.test.a.b") === "A.B")
-      assert(conf.get("spark.test.a.b.c") === "a.b.c")
-    } finally {
-      System.clearProperty("spark.test.a")
-      System.clearProperty("spark.test.a.b")
-      System.clearProperty("spark.test.a.b.c")
-    }
+    System.setProperty("spark.test.a", "a")
+    System.setProperty("spark.test.a.b", "a.b")
+    System.setProperty("spark.test.a.b.c", "a.b.c")
+    val conf = new SparkConf()
+    assert(conf.get("spark.test.a") === "a")
+    assert(conf.get("spark.test.a.b") === "a.b")
+    assert(conf.get("spark.test.a.b.c") === "a.b.c")
+    conf.set("spark.test.a.b", "A.B")
+    assert(conf.get("spark.test.a") === "a")
+    assert(conf.get("spark.test.a.b") === "A.B")
+    assert(conf.get("spark.test.a.b.c") === "a.b.c")
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/babcafaa/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
index 495a0d4..161b715 100644
--- 
a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
@@ -27,10 +27,13 @@ import org.apache.spark.scheduler.local.LocalBackend
 class SparkContextSchedulerCreationSuite
   extends FunSuite with PrivateMethodTester with Logging with 
BeforeAndAfterEach {
 
-  def createTaskScheduler(master: String): TaskSchedulerImpl = {
+  def createTaskScheduler(master: String): TaskSchedulerImpl =
+    createTaskScheduler(master, new SparkConf())
+
+  def createTaskScheduler(master: String, conf: SparkConf): TaskSchedulerImpl 
= {
     // Create local SparkContext to setup a SparkEnv. We don't actually want 
to start() the
     // real schedulers, so we don't want to create a full SparkContext with 
the desired scheduler.
-    val sc = new SparkContext("local", "test")
+    val sc = new SparkContext("local", "test", conf)
     val createTaskSchedulerMethod = 
PrivateMethod[TaskScheduler]('createTaskScheduler)
     val sched = SparkContext invokePrivate createTaskSchedulerMethod(sc, 
master)
     sched.asInstanceOf[TaskSchedulerImpl]
@@ -101,19 +104,13 @@ class SparkContextSchedulerCreationSuite
   }
 
   test("local-default-parallelism") {
-    val defaultParallelism = System.getProperty("spark.default.parallelism")
-    System.setProperty("spark.default.parallelism", "16")
-    val sched = createTaskScheduler("local")
+    val conf = new SparkConf().set("spark.default.parallelism", "16")
+    val sched = createTaskScheduler("local", conf)
 
     sched.backend match {
       case s: LocalBackend => assert(s.defaultParallelism() === 16)
       case _ => fail()
     }
-
-    Option(defaultParallelism) match {
-      case Some(v) => System.setProperty("spark.default.parallelism", v)
-      case _ => System.clearProperty("spark.default.parallelism")
-    }
   }
 
   test("simr") {
@@ -154,9 +151,10 @@ class SparkContextSchedulerCreationSuite
     testYarn("yarn-client", 
"org.apache.spark.scheduler.cluster.YarnClientClusterScheduler")
   }
 
-  def testMesos(master: String, expectedClass: Class[_]) {
+  def testMesos(master: String, expectedClass: Class[_], coarse: Boolean) {
+    val conf = new SparkConf().set("spark.mesos.coarse", coarse.toString)
     try {
-      val sched = createTaskScheduler(master)
+      val sched = createTaskScheduler(master, conf)
       assert(sched.backend.getClass === expectedClass)
     } catch {
       case e: UnsatisfiedLinkError =>
@@ -167,17 +165,14 @@ class SparkContextSchedulerCreationSuite
   }
 
   test("mesos fine-grained") {
-    System.setProperty("spark.mesos.coarse", "false")
-    testMesos("mesos://localhost:1234", classOf[MesosSchedulerBackend])
+    testMesos("mesos://localhost:1234", classOf[MesosSchedulerBackend], coarse 
= false)
   }
 
   test("mesos coarse-grained") {
-    System.setProperty("spark.mesos.coarse", "true")
-    testMesos("mesos://localhost:1234", classOf[CoarseMesosSchedulerBackend])
+    testMesos("mesos://localhost:1234", classOf[CoarseMesosSchedulerBackend], 
coarse = true)
   }
 
   test("mesos with zookeeper") {
-    System.setProperty("spark.mesos.coarse", "false")
-    testMesos("zk://localhost:1234,localhost:2345", 
classOf[MesosSchedulerBackend])
+    testMesos("zk://localhost:1234,localhost:2345", 
classOf[MesosSchedulerBackend], coarse = false)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/babcafaa/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 4cba90e..dba00ed 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -23,12 +23,14 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkException, 
TestUtils}
 import org.apache.spark.deploy.SparkSubmit._
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ResetSystemProperties, Utils}
 import org.scalatest.FunSuite
 import org.scalatest.Matchers
 import com.google.common.io.Files
 
-class SparkSubmitSuite extends FunSuite with Matchers {
+// Note: this suite mixes in ResetSystemProperties because SparkSubmit.main() 
sets a bunch
+// of properties that neeed to be cleared after tests.
+class SparkSubmitSuite extends FunSuite with Matchers with 
ResetSystemProperties {
   def beforeAll() {
     System.setProperty("spark.testing", "true")
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/babcafaa/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 3b0b8e2..9ba4978 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -27,9 +27,10 @@ import org.scalatest.Matchers
 import org.apache.spark.{LocalSparkContext, SparkContext}
 import org.apache.spark.SparkContext._
 import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.util.ResetSystemProperties
 
-class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
-  with BeforeAndAfter with BeforeAndAfterAll {
+class SparkListenerSuite extends FunSuite  with LocalSparkContext with 
Matchers with BeforeAndAfter
+  with BeforeAndAfterAll with ResetSystemProperties {
 
   /** Length of time to wait while draining listener events. */
   val WAIT_TIMEOUT_MILLIS = 10000
@@ -38,10 +39,6 @@ class SparkListenerSuite extends FunSuite with 
LocalSparkContext with Matchers
     sc = new SparkContext("local", "SparkListenerSuite")
   }
 
-  override def afterAll() {
-    System.clearProperty("spark.akka.frameSize")
-  }
-
   test("basic creation and shutdown of LiveListenerBus") {
     val counter = new BasicJobCounter
     val bus = new LiveListenerBus

http://git-wip-us.apache.org/repos/asf/spark/blob/babcafaa/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
index c4e7a4b..8b224ab 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer
 
 import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
 
-import org.apache.spark.{LocalSparkContext, SparkContext, SparkEnv}
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkEnv}
 import org.apache.spark.storage.TaskResultBlockId
 
 /**
@@ -55,27 +55,20 @@ class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, 
scheduler: TaskSchedule
 /**
  * Tests related to handling task results (both direct and indirect).
  */
-class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with 
BeforeAndAfterAll
-  with LocalSparkContext {
+class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with 
LocalSparkContext {
 
-  override def beforeAll {
-    // Set the Akka frame size to be as small as possible (it must be an 
integer, so 1 is as small
-    // as we can make it) so the tests don't take too long.
-    System.setProperty("spark.akka.frameSize", "1")
-  }
-
-  override def afterAll {
-    System.clearProperty("spark.akka.frameSize")
-  }
+  // Set the Akka frame size to be as small as possible (it must be an 
integer, so 1 is as small
+  // as we can make it) so the tests don't take too long.
+  def conf: SparkConf = new SparkConf().set("spark.akka.frameSize", "1")
 
   test("handling results smaller than Akka frame size") {
-    sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test", conf)
     val result = sc.parallelize(Seq(1), 1).map(x => 2 * x).reduce((x, y) => x)
     assert(result === 2)
   }
 
   test("handling results larger than Akka frame size") {
-    sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test", conf)
     val akkaFrameSize =
       
sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt
     val result = sc.parallelize(Seq(1), 1).map(x => 
1.to(akkaFrameSize).toArray).reduce((x, y) => x)
@@ -89,7 +82,7 @@ class TaskResultGetterSuite extends FunSuite with 
BeforeAndAfter with BeforeAndA
   test("task retried if result missing from block manager") {
     // Set the maximum number of task failures to > 0, so that the task set 
isn't aborted
     // after the result is missing.
-    sc = new SparkContext("local[1,2]", "test")
+    sc = new SparkContext("local[1,2]", "test", conf)
     // If this test hangs, it's probably because no resource offers were made 
after the task
     // failed.
     val scheduler: TaskSchedulerImpl = sc.taskScheduler match {

http://git-wip-us.apache.org/repos/asf/spark/blob/babcafaa/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 7532da8..40aaf9d 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -162,12 +162,12 @@ class TaskSchedulerImplSuite extends FunSuite with 
LocalSparkContext with Loggin
   }
 
   test("Fair Scheduler Test") {
-    sc = new SparkContext("local", "TaskSchedulerImplSuite")
+    val xmlPath = 
getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
+    val conf = new SparkConf().set("spark.scheduler.allocation.file", xmlPath)
+    sc = new SparkContext("local", "TaskSchedulerImplSuite", conf)
     val taskScheduler = new TaskSchedulerImpl(sc)
     val taskSet = FakeTask.createTaskSet(1)
 
-    val xmlPath = 
getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
-    System.setProperty("spark.scheduler.allocation.file", xmlPath)
     val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
     val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
     schedulableBuilder.buildPools()

http://git-wip-us.apache.org/repos/asf/spark/blob/babcafaa/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 48c45bf..b212fa7 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -31,10 +31,9 @@ import org.mockito.Matchers.any
 import org.mockito.Mockito.{doAnswer, mock, spy, when}
 import org.mockito.stubbing.Answer
 
-import org.scalatest.{BeforeAndAfter, FunSuite, PrivateMethodTester}
+import org.scalatest._
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.concurrent.Timeouts._
-import org.scalatest.Matchers
 
 import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf}
 import org.apache.spark.executor.DataReadMethod
@@ -42,7 +41,7 @@ import org.apache.spark.network.{Message, ConnectionManagerId}
 import org.apache.spark.scheduler.LiveListenerBus
 import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
 import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
-import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, 
Utils}
+import org.apache.spark.util._
 
 import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.Await
@@ -50,15 +49,14 @@ import scala.concurrent.duration._
 import scala.language.implicitConversions
 import scala.language.postfixOps
 
-class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
-  with PrivateMethodTester {
+class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
+  with PrivateMethodTester with ResetSystemProperties {
 
   private val conf = new SparkConf(false)
   var store: BlockManager = null
   var store2: BlockManager = null
   var actorSystem: ActorSystem = null
   var master: BlockManagerMaster = null
-  var oldArch: String = null
   conf.set("spark.authenticate", "false")
   val securityMgr = new SecurityManager(conf)
   val mapOutputTracker = new MapOutputTrackerMaster(conf)
@@ -77,13 +75,13 @@ class BlockManagerSuite extends FunSuite with Matchers with 
BeforeAndAfter
       mapOutputTracker, shuffleManager)
   }
 
-  before {
+  override def beforeEach(): Unit = {
     val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
       "test", "localhost", 0, conf = conf, securityManager = securityMgr)
     this.actorSystem = actorSystem
 
     // Set the arch to 64-bit and compressedOops to true to get a 
deterministic test-case
-    oldArch = System.setProperty("os.arch", "amd64")
+    System.setProperty("os.arch", "amd64")
     conf.set("os.arch", "amd64")
     conf.set("spark.test.useCompressedOops", "true")
     conf.set("spark.driver.port", boundPort.toString)
@@ -98,7 +96,7 @@ class BlockManagerSuite extends FunSuite with Matchers with 
BeforeAndAfter
     SizeEstimator invokePrivate initialize()
   }
 
-  after {
+  override def afterEach(): Unit = {
     if (store != null) {
       store.stop()
       store = null
@@ -111,14 +109,6 @@ class BlockManagerSuite extends FunSuite with Matchers 
with BeforeAndAfter
     actorSystem.awaitTermination()
     actorSystem = null
     master = null
-
-    if (oldArch != null) {
-      conf.set("os.arch", oldArch)
-    } else {
-      System.clearProperty("os.arch")
-    }
-
-    System.clearProperty("spark.test.useCompressedOops")
   }
 
   test("StorageLevel object caching") {

http://git-wip-us.apache.org/repos/asf/spark/blob/babcafaa/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
index c4765e5..640e45f 100644
--- a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
@@ -28,7 +28,7 @@ import scala.concurrent.Await
 /**
   * Test the AkkaUtils with various security settings.
   */
-class AkkaUtilsSuite extends FunSuite with LocalSparkContext {
+class AkkaUtilsSuite extends FunSuite with LocalSparkContext with 
ResetSystemProperties {
 
   test("remote fetch security bad password") {
     val conf = new SparkConf

http://git-wip-us.apache.org/repos/asf/spark/blob/babcafaa/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala 
b/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala
new file mode 100644
index 0000000..d4b92f3
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.util.Properties
+
+import org.scalatest.{BeforeAndAfterEach, Suite}
+
+/**
+ * Mixin for automatically resetting system properties that are modified in 
ScalaTest tests.
+ * This resets the properties after each individual test.
+ *
+ * The order in which fixtures are mixed in affects the order in which they 
are invoked by tests.
+ * If we have a suite `MySuite extends FunSuite with Foo with Bar`, then
+ * Bar's `super` is Foo, so Bar's beforeEach() will and afterEach() methods 
will be invoked first
+ * by the rest runner.
+ *
+ * This means that ResetSystemProperties should appear as the last trait in 
test suites that it's
+ * mixed into in order to ensure that the system properties snapshot occurs as 
early as possible.
+ * ResetSystemProperties calls super.afterEach() before performing its own 
cleanup, ensuring that
+ * the old properties are restored as late as possible.
+ *
+ * See the "Composing fixtures by stacking traits" section at
+ * http://www.scalatest.org/user_guide/sharing_fixtures for more details about 
this pattern.
+ */
+private[spark] trait ResetSystemProperties extends BeforeAndAfterEach { this: 
Suite =>
+  var oldProperties: Properties = null
+
+  override def beforeEach(): Unit = {
+    oldProperties = new Properties(System.getProperties)
+    super.beforeEach()
+  }
+
+  override def afterEach(): Unit = {
+    try {
+      super.afterEach()
+    } finally {
+      System.setProperties(oldProperties)
+      oldProperties = null
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/babcafaa/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala 
b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
index f9d1af8..c60f434 100644
--- a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
@@ -17,9 +17,7 @@
 
 package org.apache.spark.util
 
-import org.scalatest.BeforeAndAfterAll
-import org.scalatest.FunSuite
-import org.scalatest.PrivateMethodTester
+import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll, FunSuite, 
PrivateMethodTester}
 
 class DummyClass1 {}
 
@@ -46,20 +44,12 @@ class DummyString(val arr: Array[Char]) {
 }
 
 class SizeEstimatorSuite
-  extends FunSuite with BeforeAndAfterAll with PrivateMethodTester {
+  extends FunSuite with BeforeAndAfterEach with PrivateMethodTester with 
ResetSystemProperties {
 
-  var oldArch: String = _
-  var oldOops: String = _
-
-  override def beforeAll() {
+  override def beforeEach() {
     // Set the arch to 64-bit and compressedOops to true to get a 
deterministic test-case
-    oldArch = System.setProperty("os.arch", "amd64")
-    oldOops = System.setProperty("spark.test.useCompressedOops", "true")
-  }
-
-  override def afterAll() {
-    resetOrClear("os.arch", oldArch)
-    resetOrClear("spark.test.useCompressedOops", oldOops)
+    System.setProperty("os.arch", "amd64")
+    System.setProperty("spark.test.useCompressedOops", "true")
   }
 
   test("simple classes") {
@@ -122,7 +112,7 @@ class SizeEstimatorSuite
   }
 
   test("32-bit arch") {
-    val arch = System.setProperty("os.arch", "x86")
+    System.setProperty("os.arch", "x86")
 
     val initialize = PrivateMethod[Unit]('initialize)
     SizeEstimator invokePrivate initialize()
@@ -131,14 +121,13 @@ class SizeEstimatorSuite
     assertResult(48)(SizeEstimator.estimate(DummyString("a")))
     assertResult(48)(SizeEstimator.estimate(DummyString("ab")))
     assertResult(56)(SizeEstimator.estimate(DummyString("abcdefgh")))
-    resetOrClear("os.arch", arch)
   }
 
   // NOTE: The String class definition varies across JDK versions (1.6 vs. 
1.7) and vendors
   // (Sun vs IBM). Use a DummyString class to make tests deterministic.
   test("64-bit arch with no compressed oops") {
-    val arch = System.setProperty("os.arch", "amd64")
-    val oops = System.setProperty("spark.test.useCompressedOops", "false")
+    System.setProperty("os.arch", "amd64")
+    System.setProperty("spark.test.useCompressedOops", "false")
     val initialize = PrivateMethod[Unit]('initialize)
     SizeEstimator invokePrivate initialize()
 
@@ -146,16 +135,5 @@ class SizeEstimatorSuite
     assertResult(64)(SizeEstimator.estimate(DummyString("a")))
     assertResult(64)(SizeEstimator.estimate(DummyString("ab")))
     assertResult(72)(SizeEstimator.estimate(DummyString("abcdefgh")))
-
-    resetOrClear("os.arch", arch)
-    resetOrClear("spark.test.useCompressedOops", oops)
-  }
-
-  def resetOrClear(prop: String, oldValue: String) {
-    if (oldValue != null) {
-      System.setProperty(prop, oldValue)
-    } else {
-      System.clearProperty(prop)
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/babcafaa/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index a530e0b..5ae3da2 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -29,7 +29,7 @@ import com.google.common.base.Charsets
 import com.google.common.io.Files
 import org.scalatest.FunSuite
 
-class UtilsSuite extends FunSuite {
+class UtilsSuite extends FunSuite with ResetSystemProperties {
 
   test("bytesToString") {
     assert(Utils.bytesToString(10) === "10.0 B")

http://git-wip-us.apache.org/repos/asf/spark/blob/babcafaa/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala 
b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
index 973049b..f49a96e 100644
--- a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
@@ -28,11 +28,9 @@ object BroadcastTest {
     val bcName = if (args.length > 2) args(2) else "Http"
     val blockSize = if (args.length > 3) args(3) else "4096"
 
-    System.setProperty("spark.broadcast.factory", 
"org.apache.spark.broadcast." + bcName +
-      "BroadcastFactory")
-    System.setProperty("spark.broadcast.blockSize", blockSize)
     val sparkConf = new SparkConf().setAppName("Broadcast Test")
-
+      .set("spark.broadcast.factory", 
s"org.apache.spark.broadcast.${bcName}BroaddcastFactory")
+      .set("spark.broadcast.blockSize", blockSize)
     val sc = new SparkContext(sparkConf)
 
     val slices = if (args.length > 0) args(0).toInt else 2

http://git-wip-us.apache.org/repos/asf/spark/blob/babcafaa/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
 
b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
index 6e1f019..1e24da7 100644
--- 
a/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ 
b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -17,6 +17,7 @@
 
 package org.apache.spark.streaming;
 
+import org.apache.spark.SparkConf;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
 import org.junit.After;
 import org.junit.Before;
@@ -27,8 +28,11 @@ public abstract class LocalJavaStreamingContext {
 
     @Before
     public void setUp() {
-        System.setProperty("spark.streaming.clock", 
"org.apache.spark.streaming.util.ManualClock");
-        ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+        SparkConf conf = new SparkConf()
+            .setMaster("local[2]")
+            .setAppName("test")
+            .set("spark.streaming.clock", 
"org.apache.spark.streaming.util.ManualClock");
+        ssc = new JavaStreamingContext(conf, new Duration(1000));
         ssc.checkpoint("checkpoint");
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/babcafaa/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
----------------------------------------------------------------------
diff --git 
a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala 
b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
index 17bf7c2..efe2a4f 100644
--- a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
+++ b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
@@ -20,7 +20,7 @@ package org.apache.spark.tools
 import java.util.concurrent.{CountDownLatch, Executors}
 import java.util.concurrent.atomic.AtomicLong
 
-import org.apache.spark.SparkContext
+import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.serializer.KryoSerializer
 import org.apache.spark.util.Utils
 import org.apache.spark.executor.ShuffleWriteMetrics
@@ -48,11 +48,12 @@ object StoragePerfTester {
     val writeData = "1" * recordLength
     val executor = Executors.newFixedThreadPool(numMaps)
 
-    System.setProperty("spark.shuffle.compress", "false")
-    System.setProperty("spark.shuffle.sync", "true")
+    val conf = new SparkConf()
+      .set("spark.shuffle.compress", "false")
+      .set("spark.shuffle.sync", "true")
 
     // This is only used to instantiate a BlockManager. All thread scheduling 
is done manually.
-    val sc = new SparkContext("local[4]", "Write Tester")
+    val sc = new SparkContext("local[4]", "Write Tester", conf)
     val blockManager = sc.env.blockManager
 
     def writeOutputBytes(mapId: Int, total: AtomicLong) = {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to