Repository: spark
Updated Branches:
  refs/heads/branch-1.0 f47e162b9 -> 9f911c93d


[SPARK-1010] Clean up uses of System.setProperty in unit tests

Several of our tests call System.setProperty (or test code which implicitly 
sets system properties) and don't always reset/clear the modified properties, 
which can create ordering dependencies between tests and cause hard-to-diagnose 
failures.

This patch removes most uses of System.setProperty from our tests, since in 
most cases we can use SparkConf to set these configurations (there are a few 
exceptions, including the tests of SparkConf itself).

For the cases where we continue to use System.setProperty, this patch 
introduces a `ResetSystemProperties` ScalaTest mixin class which snapshots the 
system properties before individual tests and to automatically restores them on 
test completion / failure.  See the block comment at the top of the 
ResetSystemProperties class for more details.

Author: Josh Rosen <[email protected]>

Closes #3739 from JoshRosen/cleanup-system-properties-in-tests and squashes the 
following commits:

0236d66 [Josh Rosen] Replace setProperty uses in two example programs / tools
3888fe3 [Josh Rosen] Remove setProperty use in LocalJavaStreamingContext
4f4031d [Josh Rosen] Add note on why SparkSubmitSuite needs 
ResetSystemProperties
4742a5b [Josh Rosen] Clarify ResetSystemProperties trait inheritance ordering.
0eaf0b6 [Josh Rosen] Remove setProperty call in TaskResultGetterSuite.
7a3d224 [Josh Rosen] Fix trait ordering
3fdb554 [Josh Rosen] Remove setProperty call in TaskSchedulerImplSuite
bee20df [Josh Rosen] Remove setProperty calls in 
SparkContextSchedulerCreationSuite
655587c [Josh Rosen] Remove setProperty calls in JobCancellationSuite
3f2f955 [Josh Rosen] Remove System.setProperty calls in DistributedSuite
cfe9cce [Josh Rosen] Remove use of system properties in SparkContextSuite
8783ab0 [Josh Rosen] Remove TestUtils.setSystemProperty, since it is subsumed 
by the ResetSystemProperties trait.
633a84a [Josh Rosen] Remove use of system properties in FileServerSuite
25bfce2 [Josh Rosen] Use ResetSystemProperties in UtilsSuite
1d1aa5a [Josh Rosen] Use ResetSystemProperties in SizeEstimatorSuite
dd9492b [Josh Rosen] Use ResetSystemProperties in AkkaUtilsSuite
b0daff2 [Josh Rosen] Use ResetSystemProperties in BlockManagerSuite
e9ded62 [Josh Rosen] Use ResetSystemProperties in TaskSchedulerImplSuite
5b3cb54 [Josh Rosen] Use ResetSystemProperties in SparkListenerSuite
0995c4b [Josh Rosen] Use ResetSystemProperties in 
SparkContextSchedulerCreationSuite
c83ded8 [Josh Rosen] Use ResetSystemProperties in SparkConfSuite
51aa870 [Josh Rosen] Use withSystemProperty in ShuffleSuite
60a63a1 [Josh Rosen] Use ResetSystemProperties in JobCancellationSuite
14a92e4 [Josh Rosen] Use withSystemProperty in FileServerSuite
628f46c [Josh Rosen] Use ResetSystemProperties in DistributedSuite
9e3e0dd [Josh Rosen] Add ResetSystemProperties test fixture mixin; use it in 
SparkSubmitSuite.
4dcea38 [Josh Rosen] Move withSystemProperty to TestUtils class.

(cherry picked from commit 352ed6bbe3c3b67e52e298e7c535ae414d96beca)
Signed-off-by: Josh Rosen <[email protected]>

Conflicts:
        core/src/test/scala/org/apache/spark/ShuffleSuite.scala
        core/src/test/scala/org/apache/spark/SparkConfSuite.scala
        
core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
        core/src/test/scala/org/apache/spark/SparkContextSuite.scala
        core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
        core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
        
external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
        
external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
        
external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
        
external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
        tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala

Conflicts:
        core/src/test/scala/org/apache/spark/DistributedSuite.scala
        
core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
        core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
        core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
        core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
        
streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9f911c93
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9f911c93
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9f911c93

Branch: refs/heads/branch-1.0
Commit: 9f911c93de62fef6ee152f24c5fd394310cb5306
Parents: f47e162
Author: Josh Rosen <[email protected]>
Authored: Tue Dec 30 18:12:20 2014 -0800
Committer: Josh Rosen <[email protected]>
Committed: Wed Dec 31 11:50:31 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/AkkaUtilsSuite.scala |  4 +-
 .../org/apache/spark/DistributedSuite.scala     | 21 ++------
 .../org/apache/spark/FileServerSuite.scala      | 16 +++---
 .../org/apache/spark/JobCancellationSuite.scala | 21 ++++----
 .../scala/org/apache/spark/ShuffleSuite.scala   | 22 ++++----
 .../scala/org/apache/spark/SparkConfSuite.scala | 52 +++++++-----------
 .../SparkContextSchedulerCreationSuite.scala    | 21 ++++----
 .../apache/spark/deploy/SparkSubmitSuite.scala  |  6 ++-
 .../spark/scheduler/SparkListenerSuite.scala    |  9 ++--
 .../spark/scheduler/TaskResultGetterSuite.scala | 23 +++-----
 .../scheduler/TaskSchedulerImplSuite.scala      |  6 +--
 .../spark/storage/BlockManagerSuite.scala       | 26 +++------
 .../spark/util/ResetSystemProperties.scala      | 57 ++++++++++++++++++++
 .../apache/spark/util/SizeEstimatorSuite.scala  | 38 +++----------
 .../org/apache/spark/util/UtilsSuite.scala      |  2 +-
 .../apache/spark/examples/BroadcastTest.scala   |  6 +--
 .../streaming/LocalJavaStreamingContext.java    |  8 ++-
 .../apache/spark/tools/StoragePerfTester.scala  |  9 ++--
 18 files changed, 170 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9f911c93/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala
index c645e4c..b697193 100644
--- a/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala
@@ -22,13 +22,13 @@ import org.scalatest.FunSuite
 import akka.actor._
 import org.apache.spark.scheduler.MapStatus
 import org.apache.spark.storage.BlockManagerId
-import org.apache.spark.util.AkkaUtils
+import org.apache.spark.util.{ResetSystemProperties, AkkaUtils}
 import scala.concurrent.Await
 
 /**
   * Test the AkkaUtils with various security settings.
   */
-class AkkaUtilsSuite extends FunSuite with LocalSparkContext {
+class AkkaUtilsSuite extends FunSuite with LocalSparkContext with 
ResetSystemProperties {
 
   test("remote fetch security bad password") {
     val conf = new SparkConf

http://git-wip-us.apache.org/repos/asf/spark/blob/9f911c93/core/src/test/scala/org/apache/spark/DistributedSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala 
b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 14ddd6f..04cc69e 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark
 
-import org.scalatest.BeforeAndAfter
 import org.scalatest.FunSuite
 import org.scalatest.concurrent.Timeouts._
 import org.scalatest.matchers.ShouldMatchers
@@ -31,16 +30,10 @@ class NotSerializableClass
 class NotSerializableExn(val notSer: NotSerializableClass) extends Throwable() 
{}
 
 
-class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
-  with LocalSparkContext {
+class DistributedSuite extends FunSuite with ShouldMatchers with 
LocalSparkContext {
 
   val clusterUrl = "local-cluster[2,1,512]"
 
-  after {
-    System.clearProperty("spark.reducer.maxMbInFlight")
-    System.clearProperty("spark.storage.memoryFraction")
-  }
-
   test("task throws not serializable exception") {
     // Ensures that executors do not crash when an exn is not serializable. If 
executors crash,
     // this test will hang. Correct behavior is that executors don't crash but 
fail tasks
@@ -86,15 +79,14 @@ class DistributedSuite extends FunSuite with ShouldMatchers 
with BeforeAndAfter
   }
 
   test("groupByKey where map output sizes exceed maxMbInFlight") {
-    System.setProperty("spark.reducer.maxMbInFlight", "1")
-    sc = new SparkContext(clusterUrl, "test")
+    val conf = new SparkConf().set("spark.reducer.maxMbInFlight", "1")
+    sc = new SparkContext(clusterUrl, "test", conf)
     // This data should be around 20 MB, so even with 4 mappers and 2 
reducers, each map output
     // file should be about 2.5 MB
     val pairs = sc.parallelize(1 to 2000, 4).map(x => (x % 16, new 
Array[Byte](10000)))
     val groups = pairs.groupByKey(2).map(x => (x._1, x._2.size)).collect()
     assert(groups.length === 16)
     assert(groups.map(_._2).sum === 2000)
-    // Note that spark.reducer.maxMbInFlight will be cleared in the test 
suite's after{} block
   }
 
   test("accumulators") {
@@ -211,7 +203,6 @@ class DistributedSuite extends FunSuite with ShouldMatchers 
with BeforeAndAfter
   }
 
   test("compute without caching when no partitions fit in memory") {
-    System.setProperty("spark.storage.memoryFraction", "0.0001")
     sc = new SparkContext(clusterUrl, "test")
     // data will be 4 million * 4 bytes = 16 MB in size, but our 
memoryFraction set the cache
     // to only 50 KB (0.0001 of 512 MB), so no partitions should fit in memory
@@ -219,12 +210,11 @@ class DistributedSuite extends FunSuite with 
ShouldMatchers with BeforeAndAfter
     assert(data.count() === 4000000)
     assert(data.count() === 4000000)
     assert(data.count() === 4000000)
-    System.clearProperty("spark.storage.memoryFraction")
   }
 
   test("compute when only some partitions fit in memory") {
-    System.setProperty("spark.storage.memoryFraction", "0.01")
-    sc = new SparkContext(clusterUrl, "test")
+    val conf = new SparkConf().set("spark.storage.memoryFraction", "0.01")
+    sc = new SparkContext(clusterUrl, "test", conf)
     // data will be 4 million * 4 bytes = 16 MB in size, but our 
memoryFraction set the cache
     // to only 5 MB (0.01 of 512 MB), so not all of it will fit in memory; we 
use 20 partitions
     // to make sure that *some* of them do fit though
@@ -232,7 +222,6 @@ class DistributedSuite extends FunSuite with ShouldMatchers 
with BeforeAndAfter
     assert(data.count() === 4000000)
     assert(data.count() === 4000000)
     assert(data.count() === 4000000)
-    System.clearProperty("spark.storage.memoryFraction")
   }
 
   test("passing environment variables to cluster") {

http://git-wip-us.apache.org/repos/asf/spark/blob/9f911c93/core/src/test/scala/org/apache/spark/FileServerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala 
b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
index 7e18f45..6187bd7 100644
--- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
@@ -32,10 +32,11 @@ class FileServerSuite extends FunSuite with 
LocalSparkContext {
   @transient var tmpFile: File = _
   @transient var tmpJarUrl: String = _
 
+  def newConf: SparkConf = new SparkConf(loadDefaults = 
false).set("spark.authenticate", "false")
+
   override def beforeEach() {
     super.beforeEach()
     resetSparkContext()
-    System.setProperty("spark.authenticate", "false")
   }
 
   override def beforeAll() {
@@ -54,7 +55,6 @@ class FileServerSuite extends FunSuite with LocalSparkContext 
{
     val jarFile = new File(testTempDir, "test.jar")
     val jarStream = new FileOutputStream(jarFile)
     val jar = new JarOutputStream(jarStream, new java.util.jar.Manifest())
-    System.setProperty("spark.authenticate", "false")
 
     val jarEntry = new JarEntry(textFile.getName)
     jar.putNextEntry(jarEntry)
@@ -81,7 +81,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext 
{
   }
 
   test("Distributing files locally") {
-    sc = new SparkContext("local[4]", "test")
+    sc = new SparkContext("local[4]", "test", newConf)
     sc.addFile(tmpFile.toString)
     val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
     val result = sc.parallelize(testData).reduceByKey {
@@ -115,7 +115,7 @@ class FileServerSuite extends FunSuite with 
LocalSparkContext {
 
   test("Distributing files locally using URL as input") {
     // addFile("file:///....")
-    sc = new SparkContext("local[4]", "test")
+    sc = new SparkContext("local[4]", "test", newConf)
     sc.addFile(new File(tmpFile.toString).toURI.toString)
     val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
     val result = sc.parallelize(testData).reduceByKey {
@@ -129,7 +129,7 @@ class FileServerSuite extends FunSuite with 
LocalSparkContext {
   }
 
   test ("Dynamically adding JARS locally") {
-    sc = new SparkContext("local[4]", "test")
+    sc = new SparkContext("local[4]", "test", newConf)
     sc.addJar(tmpJarUrl)
     val testData = Array((1, 1))
     sc.parallelize(testData).foreach { x =>
@@ -140,7 +140,7 @@ class FileServerSuite extends FunSuite with 
LocalSparkContext {
   }
 
   test("Distributing files on a standalone cluster") {
-    sc = new SparkContext("local-cluster[1,1,512]", "test")
+    sc = new SparkContext("local-cluster[1,1,512]", "test", newConf)
     sc.addFile(tmpFile.toString)
     val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
     val result = sc.parallelize(testData).reduceByKey {
@@ -154,7 +154,7 @@ class FileServerSuite extends FunSuite with 
LocalSparkContext {
   }
 
   test ("Dynamically adding JARS on a standalone cluster") {
-    sc = new SparkContext("local-cluster[1,1,512]", "test")
+    sc = new SparkContext("local-cluster[1,1,512]", "test", newConf)
     sc.addJar(tmpJarUrl)
     val testData = Array((1,1))
     sc.parallelize(testData).foreach { x =>
@@ -165,7 +165,7 @@ class FileServerSuite extends FunSuite with 
LocalSparkContext {
   }
 
   test ("Dynamically adding JARS on a standalone cluster using local: URL") {
-    sc = new SparkContext("local-cluster[1,1,512]", "test")
+    sc = new SparkContext("local-cluster[1,1,512]", "test", newConf)
     sc.addJar(tmpJarUrl.replace("file", "local"))
     val testData = Array((1,1))
     sc.parallelize(testData).foreach { x =>

http://git-wip-us.apache.org/repos/asf/spark/blob/9f911c93/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala 
b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
index 2c8ef40..b154259 100644
--- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
@@ -41,12 +41,11 @@ class JobCancellationSuite extends FunSuite with 
ShouldMatchers with BeforeAndAf
   override def afterEach() {
     super.afterEach()
     resetSparkContext()
-    System.clearProperty("spark.scheduler.mode")
   }
 
   test("local mode, FIFO scheduler") {
-    System.setProperty("spark.scheduler.mode", "FIFO")
-    sc = new SparkContext("local[2]", "test")
+    val conf = new SparkConf().set("spark.scheduler.mode", "FIFO")
+    sc = new SparkContext("local[2]", "test", conf)
     testCount()
     testTake()
     // Make sure we can still launch tasks.
@@ -54,10 +53,10 @@ class JobCancellationSuite extends FunSuite with 
ShouldMatchers with BeforeAndAf
   }
 
   test("local mode, fair scheduler") {
-    System.setProperty("spark.scheduler.mode", "FAIR")
+    val conf = new SparkConf().set("spark.scheduler.mode", "FAIR")
     val xmlPath = 
getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
-    System.setProperty("spark.scheduler.allocation.file", xmlPath)
-    sc = new SparkContext("local[2]", "test")
+    conf.set("spark.scheduler.allocation.file", xmlPath)
+    sc = new SparkContext("local[2]", "test", conf)
     testCount()
     testTake()
     // Make sure we can still launch tasks.
@@ -65,8 +64,8 @@ class JobCancellationSuite extends FunSuite with 
ShouldMatchers with BeforeAndAf
   }
 
   test("cluster mode, FIFO scheduler") {
-    System.setProperty("spark.scheduler.mode", "FIFO")
-    sc = new SparkContext("local-cluster[2,1,512]", "test")
+    val conf = new SparkConf().set("spark.scheduler.mode", "FIFO")
+    sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
     testCount()
     testTake()
     // Make sure we can still launch tasks.
@@ -74,10 +73,10 @@ class JobCancellationSuite extends FunSuite with 
ShouldMatchers with BeforeAndAf
   }
 
   test("cluster mode, fair scheduler") {
-    System.setProperty("spark.scheduler.mode", "FAIR")
+    val conf = new SparkConf().set("spark.scheduler.mode", "FAIR")
     val xmlPath = 
getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
-    System.setProperty("spark.scheduler.allocation.file", xmlPath)
-    sc = new SparkContext("local-cluster[2,1,512]", "test")
+    conf.set("spark.scheduler.allocation.file", xmlPath)
+    sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
     testCount()
     testTake()
     // Make sure we can still launch tasks.

http://git-wip-us.apache.org/repos/asf/spark/blob/9f911c93/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala 
b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index be6508a..a877c71 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -31,19 +31,15 @@ class ShuffleSuite extends FunSuite with ShouldMatchers 
with LocalSparkContext {
   val conf = new SparkConf(loadDefaults = false)
 
   test("groupByKey without compression") {
-    try {
-      System.setProperty("spark.shuffle.compress", "false")
-      sc = new SparkContext("local", "test")
-      val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 4)
-      val groups = pairs.groupByKey(4).collect()
-      assert(groups.size === 2)
-      val valuesFor1 = groups.find(_._1 == 1).get._2
-      assert(valuesFor1.toList.sorted === List(1, 2, 3))
-      val valuesFor2 = groups.find(_._1 == 2).get._2
-      assert(valuesFor2.toList.sorted === List(1))
-    } finally {
-      System.setProperty("spark.shuffle.compress", "true")
-    }
+    val myConf = conf.clone().set("spark.shuffle.compress", "false")
+    sc = new SparkContext("local", "test", myConf)
+    val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 4)
+    val groups = pairs.groupByKey(4).collect()
+    assert(groups.size === 2)
+    val valuesFor1 = groups.find(_._1 == 1).get._2
+    assert(valuesFor1.toList.sorted === List(1, 2, 3))
+    val valuesFor2 = groups.find(_._1 == 2).get._2
+    assert(valuesFor2.toList.sorted === List(1))
   }
 
   test("shuffle non-zero block size") {

http://git-wip-us.apache.org/repos/asf/spark/blob/9f911c93/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 87e9012..ae8a3c7 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -19,25 +19,19 @@ package org.apache.spark
 
 import org.scalatest.FunSuite
 
-class SparkConfSuite extends FunSuite with LocalSparkContext {
+import org.apache.spark.util.ResetSystemProperties
+
+class SparkConfSuite extends FunSuite with LocalSparkContext with 
ResetSystemProperties {
   test("loading from system properties") {
-    try {
-      System.setProperty("spark.test.testProperty", "2")
-      val conf = new SparkConf()
-      assert(conf.get("spark.test.testProperty") === "2")
-    } finally {
-      System.clearProperty("spark.test.testProperty")
-    }
+    System.setProperty("spark.test.testProperty", "2")
+    val conf = new SparkConf()
+    assert(conf.get("spark.test.testProperty") === "2")
   }
 
   test("initializing without loading defaults") {
-    try {
-      System.setProperty("spark.test.testProperty", "2")
-      val conf = new SparkConf(false)
-      assert(!conf.contains("spark.test.testProperty"))
-    } finally {
-      System.clearProperty("spark.test.testProperty")
-    }
+    System.setProperty("spark.test.testProperty", "2")
+    val conf = new SparkConf(false)
+    assert(!conf.contains("spark.test.testProperty"))
   }
 
   test("named set methods") {
@@ -115,22 +109,16 @@ class SparkConfSuite extends FunSuite with 
LocalSparkContext {
 
   test("nested property names") {
     // This wasn't supported by some external conf parsing libraries
-    try {
-      System.setProperty("spark.test.a", "a")
-      System.setProperty("spark.test.a.b", "a.b")
-      System.setProperty("spark.test.a.b.c", "a.b.c")
-      val conf = new SparkConf()
-      assert(conf.get("spark.test.a") === "a")
-      assert(conf.get("spark.test.a.b") === "a.b")
-      assert(conf.get("spark.test.a.b.c") === "a.b.c")
-      conf.set("spark.test.a.b", "A.B")
-      assert(conf.get("spark.test.a") === "a")
-      assert(conf.get("spark.test.a.b") === "A.B")
-      assert(conf.get("spark.test.a.b.c") === "a.b.c")
-    } finally {
-      System.clearProperty("spark.test.a")
-      System.clearProperty("spark.test.a.b")
-      System.clearProperty("spark.test.a.b.c")
-    }
+    System.setProperty("spark.test.a", "a")
+    System.setProperty("spark.test.a.b", "a.b")
+    System.setProperty("spark.test.a.b.c", "a.b.c")
+    val conf = new SparkConf()
+    assert(conf.get("spark.test.a") === "a")
+    assert(conf.get("spark.test.a.b") === "a.b")
+    assert(conf.get("spark.test.a.b.c") === "a.b.c")
+    conf.set("spark.test.a.b", "A.B")
+    assert(conf.get("spark.test.a") === "a")
+    assert(conf.get("spark.test.a.b") === "A.B")
+    assert(conf.get("spark.test.a.b.c") === "a.b.c")
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9f911c93/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
index 94fba10..12c65f0 100644
--- 
a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
@@ -27,10 +27,13 @@ import org.apache.spark.scheduler.local.LocalBackend
 class SparkContextSchedulerCreationSuite
   extends FunSuite with PrivateMethodTester with LocalSparkContext with 
Logging {
 
-  def createTaskScheduler(master: String): TaskSchedulerImpl = {
+  def createTaskScheduler(master: String): TaskSchedulerImpl =
+    createTaskScheduler(master, new SparkConf())
+
+  def createTaskScheduler(master: String, conf: SparkConf): TaskSchedulerImpl 
= {
     // Create local SparkContext to setup a SparkEnv. We don't actually want 
to start() the
     // real schedulers, so we don't want to create a full SparkContext with 
the desired scheduler.
-    sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test", conf)
     val createTaskSchedulerMethod = 
PrivateMethod[TaskScheduler]('createTaskScheduler)
     val sched = SparkContext invokePrivate createTaskSchedulerMethod(sc, 
master)
     sched.asInstanceOf[TaskSchedulerImpl]
@@ -115,9 +118,10 @@ class SparkContextSchedulerCreationSuite
     testYarn("yarn-client", 
"org.apache.spark.scheduler.cluster.YarnClientClusterScheduler")
   }
 
-  def testMesos(master: String, expectedClass: Class[_]) {
+  def testMesos(master: String, expectedClass: Class[_], coarse: Boolean) {
+    val conf = new SparkConf().set("spark.mesos.coarse", coarse.toString)
     try {
-      val sched = createTaskScheduler(master)
+      val sched = createTaskScheduler(master, conf)
       assert(sched.backend.getClass === expectedClass)
     } catch {
       case e: UnsatisfiedLinkError =>
@@ -128,17 +132,14 @@ class SparkContextSchedulerCreationSuite
   }
 
   test("mesos fine-grained") {
-    System.setProperty("spark.mesos.coarse", "false")
-    testMesos("mesos://localhost:1234", classOf[MesosSchedulerBackend])
+    testMesos("mesos://localhost:1234", classOf[MesosSchedulerBackend], coarse 
= false)
   }
 
   test("mesos coarse-grained") {
-    System.setProperty("spark.mesos.coarse", "true")
-    testMesos("mesos://localhost:1234", classOf[CoarseMesosSchedulerBackend])
+    testMesos("mesos://localhost:1234", classOf[CoarseMesosSchedulerBackend], 
coarse = true)
   }
 
   test("mesos with zookeeper") {
-    System.setProperty("spark.mesos.coarse", "false")
-    testMesos("zk://localhost:1234,localhost:2345", 
classOf[MesosSchedulerBackend])
+    testMesos("zk://localhost:1234,localhost:2345", 
classOf[MesosSchedulerBackend], coarse = false)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9f911c93/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 961f9a7..fe817e0 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -23,11 +23,13 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkException, 
TestUtils}
 import org.apache.spark.deploy.SparkSubmit._
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ResetSystemProperties, Utils}
 import org.scalatest.FunSuite
 import org.scalatest.matchers.ShouldMatchers
 
-class SparkSubmitSuite extends FunSuite with ShouldMatchers {
+// Note: this suite mixes in ResetSystemProperties because SparkSubmit.main() 
sets a bunch
+// of properties that neeed to be cleared after tests.
+class SparkSubmitSuite extends FunSuite with ShouldMatchers with 
ResetSystemProperties {
   def beforeAll() {
     System.setProperty("spark.testing", "true")
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/9f911c93/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 5426e57..f2f79d8 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -27,9 +27,10 @@ import org.scalatest.matchers.ShouldMatchers
 import org.apache.spark.{LocalSparkContext, SparkContext}
 import org.apache.spark.SparkContext._
 import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.util.ResetSystemProperties
 
-class SparkListenerSuite extends FunSuite with LocalSparkContext with 
ShouldMatchers
-  with BeforeAndAfter with BeforeAndAfterAll {
+class SparkListenerSuite extends FunSuite  with LocalSparkContext with 
ShouldMatchers
+  with BeforeAndAfter with BeforeAndAfterAll with ResetSystemProperties {
 
   /** Length of time to wait while draining listener events. */
   val WAIT_TIMEOUT_MILLIS = 10000
@@ -38,10 +39,6 @@ class SparkListenerSuite extends FunSuite with 
LocalSparkContext with ShouldMatc
     sc = new SparkContext("local", "SparkListenerSuite")
   }
 
-  override def afterAll() {
-    System.clearProperty("spark.akka.frameSize")
-  }
-
   test("basic creation and shutdown of LiveListenerBus") {
     val counter = new BasicJobCounter
     val bus = new LiveListenerBus

http://git-wip-us.apache.org/repos/asf/spark/blob/9f911c93/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
index c4e7a4b..8b224ab 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer
 
 import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
 
-import org.apache.spark.{LocalSparkContext, SparkContext, SparkEnv}
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkEnv}
 import org.apache.spark.storage.TaskResultBlockId
 
 /**
@@ -55,27 +55,20 @@ class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, 
scheduler: TaskSchedule
 /**
  * Tests related to handling task results (both direct and indirect).
  */
-class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with 
BeforeAndAfterAll
-  with LocalSparkContext {
+class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with 
LocalSparkContext {
 
-  override def beforeAll {
-    // Set the Akka frame size to be as small as possible (it must be an 
integer, so 1 is as small
-    // as we can make it) so the tests don't take too long.
-    System.setProperty("spark.akka.frameSize", "1")
-  }
-
-  override def afterAll {
-    System.clearProperty("spark.akka.frameSize")
-  }
+  // Set the Akka frame size to be as small as possible (it must be an 
integer, so 1 is as small
+  // as we can make it) so the tests don't take too long.
+  def conf: SparkConf = new SparkConf().set("spark.akka.frameSize", "1")
 
   test("handling results smaller than Akka frame size") {
-    sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test", conf)
     val result = sc.parallelize(Seq(1), 1).map(x => 2 * x).reduce((x, y) => x)
     assert(result === 2)
   }
 
   test("handling results larger than Akka frame size") {
-    sc = new SparkContext("local", "test")
+    sc = new SparkContext("local", "test", conf)
     val akkaFrameSize =
       
sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt
     val result = sc.parallelize(Seq(1), 1).map(x => 
1.to(akkaFrameSize).toArray).reduce((x, y) => x)
@@ -89,7 +82,7 @@ class TaskResultGetterSuite extends FunSuite with 
BeforeAndAfter with BeforeAndA
   test("task retried if result missing from block manager") {
     // Set the maximum number of task failures to > 0, so that the task set 
isn't aborted
     // after the result is missing.
-    sc = new SparkContext("local[1,2]", "test")
+    sc = new SparkContext("local[1,2]", "test", conf)
     // If this test hangs, it's probably because no resource offers were made 
after the task
     // failed.
     val scheduler: TaskSchedulerImpl = sc.taskScheduler match {

http://git-wip-us.apache.org/repos/asf/spark/blob/9f911c93/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 7532da8..40aaf9d 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -162,12 +162,12 @@ class TaskSchedulerImplSuite extends FunSuite with 
LocalSparkContext with Loggin
   }
 
   test("Fair Scheduler Test") {
-    sc = new SparkContext("local", "TaskSchedulerImplSuite")
+    val xmlPath = 
getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
+    val conf = new SparkConf().set("spark.scheduler.allocation.file", xmlPath)
+    sc = new SparkContext("local", "TaskSchedulerImplSuite", conf)
     val taskScheduler = new TaskSchedulerImpl(sc)
     val taskSet = FakeTask.createTaskSet(1)
 
-    val xmlPath = 
getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
-    System.setProperty("spark.scheduler.allocation.file", xmlPath)
     val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
     val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
     schedulableBuilder.buildPools()

http://git-wip-us.apache.org/repos/asf/spark/blob/9f911c93/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 00deecc..ee7293d 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -23,9 +23,9 @@ import java.util.Arrays
 import akka.actor._
 import org.apache.spark.SparkConf
 import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
-import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, 
Utils}
+import org.apache.spark.util._
 import org.mockito.Mockito.{mock, when}
-import org.scalatest.{BeforeAndAfter, FunSuite, PrivateMethodTester}
+import org.scalatest.{BeforeAndAfterEach, BeforeAndAfter, FunSuite, 
PrivateMethodTester}
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.concurrent.Timeouts._
 import org.scalatest.matchers.ShouldMatchers._
@@ -34,18 +34,18 @@ import org.scalatest.time.SpanSugar._
 import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf}
 import org.apache.spark.scheduler.LiveListenerBus
 import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
-import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, 
Utils}
 
 import scala.language.implicitConversions
 import scala.language.postfixOps
 
-class BlockManagerSuite extends FunSuite with BeforeAndAfter with 
PrivateMethodTester {
+class BlockManagerSuite extends FunSuite with BeforeAndAfterEach with 
PrivateMethodTester
+  with ResetSystemProperties {
+
   private val conf = new SparkConf(false)
   var store: BlockManager = null
   var store2: BlockManager = null
   var actorSystem: ActorSystem = null
   var master: BlockManagerMaster = null
-  var oldArch: String = null
   conf.set("spark.authenticate", "false")
   val securityMgr = new SecurityManager(conf)
   val mapOutputTracker = new MapOutputTrackerMaster(conf)
@@ -58,7 +58,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter 
with PrivateMethodT
   implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
   def rdd(rddId: Int, splitId: Int) = RDDBlockId(rddId, splitId)
 
-  before {
+  override def beforeEach(): Unit = {
     val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", 
"localhost", 0, conf = conf,
       securityManager = securityMgr)
     this.actorSystem = actorSystem
@@ -69,7 +69,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter 
with PrivateMethodT
       conf)
 
     // Set the arch to 64-bit and compressedOops to true to get a 
deterministic test-case
-    oldArch = System.setProperty("os.arch", "amd64")
+    System.setProperty("os.arch", "amd64")
     conf.set("os.arch", "amd64")
     conf.set("spark.test.useCompressedOops", "true")
     conf.set("spark.storage.disableBlockManagerHeartBeat", "true")
@@ -77,9 +77,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter 
with PrivateMethodT
     SizeEstimator invokePrivate initialize()
   }
 
-  after {
-    System.clearProperty("spark.driver.port")
-
+  override def afterEach(): Unit = {
     if (store != null) {
       store.stop()
       store = null
@@ -92,14 +90,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter 
with PrivateMethodT
     actorSystem.awaitTermination()
     actorSystem = null
     master = null
-
-    if (oldArch != null) {
-      conf.set("os.arch", oldArch)
-    } else {
-      System.clearProperty("os.arch")
-    }
-
-    System.clearProperty("spark.test.useCompressedOops")
   }
 
   test("StorageLevel object caching") {

http://git-wip-us.apache.org/repos/asf/spark/blob/9f911c93/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala 
b/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala
new file mode 100644
index 0000000..d4b92f3
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.util.Properties
+
+import org.scalatest.{BeforeAndAfterEach, Suite}
+
+/**
+ * Mixin for automatically resetting system properties that are modified in 
ScalaTest tests.
+ * This resets the properties after each individual test.
+ *
+ * The order in which fixtures are mixed in affects the order in which they 
are invoked by tests.
+ * If we have a suite `MySuite extends FunSuite with Foo with Bar`, then
+ * Bar's `super` is Foo, so Bar's beforeEach() will and afterEach() methods 
will be invoked first
+ * by the rest runner.
+ *
+ * This means that ResetSystemProperties should appear as the last trait in 
test suites that it's
+ * mixed into in order to ensure that the system properties snapshot occurs as 
early as possible.
+ * ResetSystemProperties calls super.afterEach() before performing its own 
cleanup, ensuring that
+ * the old properties are restored as late as possible.
+ *
+ * See the "Composing fixtures by stacking traits" section at
+ * http://www.scalatest.org/user_guide/sharing_fixtures for more details about 
this pattern.
+ */
+private[spark] trait ResetSystemProperties extends BeforeAndAfterEach { this: 
Suite =>
+  var oldProperties: Properties = null
+
+  override def beforeEach(): Unit = {
+    oldProperties = new Properties(System.getProperties)
+    super.beforeEach()
+  }
+
+  override def afterEach(): Unit = {
+    try {
+      super.afterEach()
+    } finally {
+      System.setProperties(oldProperties)
+      oldProperties = null
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9f911c93/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala 
b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
index f9d1af8..c60f434 100644
--- a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
@@ -17,9 +17,7 @@
 
 package org.apache.spark.util
 
-import org.scalatest.BeforeAndAfterAll
-import org.scalatest.FunSuite
-import org.scalatest.PrivateMethodTester
+import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll, FunSuite, 
PrivateMethodTester}
 
 class DummyClass1 {}
 
@@ -46,20 +44,12 @@ class DummyString(val arr: Array[Char]) {
 }
 
 class SizeEstimatorSuite
-  extends FunSuite with BeforeAndAfterAll with PrivateMethodTester {
+  extends FunSuite with BeforeAndAfterEach with PrivateMethodTester with 
ResetSystemProperties {
 
-  var oldArch: String = _
-  var oldOops: String = _
-
-  override def beforeAll() {
+  override def beforeEach() {
     // Set the arch to 64-bit and compressedOops to true to get a 
deterministic test-case
-    oldArch = System.setProperty("os.arch", "amd64")
-    oldOops = System.setProperty("spark.test.useCompressedOops", "true")
-  }
-
-  override def afterAll() {
-    resetOrClear("os.arch", oldArch)
-    resetOrClear("spark.test.useCompressedOops", oldOops)
+    System.setProperty("os.arch", "amd64")
+    System.setProperty("spark.test.useCompressedOops", "true")
   }
 
   test("simple classes") {
@@ -122,7 +112,7 @@ class SizeEstimatorSuite
   }
 
   test("32-bit arch") {
-    val arch = System.setProperty("os.arch", "x86")
+    System.setProperty("os.arch", "x86")
 
     val initialize = PrivateMethod[Unit]('initialize)
     SizeEstimator invokePrivate initialize()
@@ -131,14 +121,13 @@ class SizeEstimatorSuite
     assertResult(48)(SizeEstimator.estimate(DummyString("a")))
     assertResult(48)(SizeEstimator.estimate(DummyString("ab")))
     assertResult(56)(SizeEstimator.estimate(DummyString("abcdefgh")))
-    resetOrClear("os.arch", arch)
   }
 
   // NOTE: The String class definition varies across JDK versions (1.6 vs. 
1.7) and vendors
   // (Sun vs IBM). Use a DummyString class to make tests deterministic.
   test("64-bit arch with no compressed oops") {
-    val arch = System.setProperty("os.arch", "amd64")
-    val oops = System.setProperty("spark.test.useCompressedOops", "false")
+    System.setProperty("os.arch", "amd64")
+    System.setProperty("spark.test.useCompressedOops", "false")
     val initialize = PrivateMethod[Unit]('initialize)
     SizeEstimator invokePrivate initialize()
 
@@ -146,16 +135,5 @@ class SizeEstimatorSuite
     assertResult(64)(SizeEstimator.estimate(DummyString("a")))
     assertResult(64)(SizeEstimator.estimate(DummyString("ab")))
     assertResult(72)(SizeEstimator.estimate(DummyString("abcdefgh")))
-
-    resetOrClear("os.arch", arch)
-    resetOrClear("spark.test.useCompressedOops", oops)
-  }
-
-  def resetOrClear(prop: String, oldValue: String) {
-    if (oldValue != null) {
-      System.setProperty(prop, oldValue)
-    } else {
-      System.clearProperty(prop)
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9f911c93/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 0aad882..99694e6 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -27,7 +27,7 @@ import com.google.common.base.Charsets
 import com.google.common.io.Files
 import org.scalatest.FunSuite
 
-class UtilsSuite extends FunSuite {
+class UtilsSuite extends FunSuite with ResetSystemProperties {
 
   test("bytesToString") {
     assert(Utils.bytesToString(10) === "10.0 B")

http://git-wip-us.apache.org/repos/asf/spark/blob/9f911c93/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala 
b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
index 973049b..f49a96e 100644
--- a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
@@ -28,11 +28,9 @@ object BroadcastTest {
     val bcName = if (args.length > 2) args(2) else "Http"
     val blockSize = if (args.length > 3) args(3) else "4096"
 
-    System.setProperty("spark.broadcast.factory", 
"org.apache.spark.broadcast." + bcName +
-      "BroadcastFactory")
-    System.setProperty("spark.broadcast.blockSize", blockSize)
     val sparkConf = new SparkConf().setAppName("Broadcast Test")
-
+      .set("spark.broadcast.factory", 
s"org.apache.spark.broadcast.${bcName}BroaddcastFactory")
+      .set("spark.broadcast.blockSize", blockSize)
     val sc = new SparkContext(sparkConf)
 
     val slices = if (args.length > 0) args(0).toInt else 2

http://git-wip-us.apache.org/repos/asf/spark/blob/9f911c93/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
 
b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
index 849bbf1..7422340 100644
--- 
a/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ 
b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -17,6 +17,7 @@
 
 package org.apache.spark.streaming;
 
+import org.apache.spark.SparkConf;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
 import org.junit.After;
 import org.junit.Before;
@@ -28,8 +29,11 @@ public abstract class LocalJavaStreamingContext {
     @Before
     public void setUp() {
         System.clearProperty("spark.driver.port");
-        System.setProperty("spark.streaming.clock", 
"org.apache.spark.streaming.util.ManualClock");
-        ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+        SparkConf conf = new SparkConf()
+            .setMaster("local[2]")
+            .setAppName("test")
+            .set("spark.streaming.clock", 
"org.apache.spark.streaming.util.ManualClock");
+        ssc = new JavaStreamingContext(conf, new Duration(1000));
         ssc.checkpoint("checkpoint");
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9f911c93/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
----------------------------------------------------------------------
diff --git 
a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala 
b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
index 8e8c356..6369c39 100644
--- a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
+++ b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
@@ -20,7 +20,7 @@ package org.apache.spark.tools
 import java.util.concurrent.{CountDownLatch, Executors}
 import java.util.concurrent.atomic.AtomicLong
 
-import org.apache.spark.SparkContext
+import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.serializer.KryoSerializer
 import org.apache.spark.util.Utils
 
@@ -47,11 +47,12 @@ object StoragePerfTester {
     val writeData = "1" * recordLength
     val executor = Executors.newFixedThreadPool(numMaps)
 
-    System.setProperty("spark.shuffle.compress", "false")
-    System.setProperty("spark.shuffle.sync", "true")
+    val conf = new SparkConf()
+      .set("spark.shuffle.compress", "false")
+      .set("spark.shuffle.sync", "true")
 
     // This is only used to instantiate a BlockManager. All thread scheduling 
is done manually.
-    val sc = new SparkContext("local[4]", "Write Tester")
+    val sc = new SparkContext("local[4]", "Write Tester", conf)
     val blockManager = sc.env.blockManager
 
     def writeOutputBytes(mapId: Int, total: AtomicLong) = {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to