Repository: spark
Updated Branches:
  refs/heads/branch-1.0 e43e31ded -> f0abf5f08


Fixing a race condition in event listener unit test

Author: Kan Zhang <[email protected]>

Closes #401 from kanzhang/fix-1475 and squashes the following commits:

c6058bd [Kan Zhang] Fixing a race condition in event listener unit test

(cherry picked from commit 38877ccf394a50bfd37c8433d4aafaa91683d3b8)
Signed-off-by: Reynold Xin <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f0abf5f0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f0abf5f0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f0abf5f0

Branch: refs/heads/branch-1.0
Commit: f0abf5f08208f19c9ff966c43c68dbdebbd28a07
Parents: e43e31d
Author: Kan Zhang <[email protected]>
Authored: Wed Apr 16 17:39:11 2014 -0700
Committer: Reynold Xin <[email protected]>
Committed: Wed Apr 16 17:39:22 2014 -0700

----------------------------------------------------------------------
 .../spark/scheduler/LiveListenerBus.scala       |  4 ---
 .../spark/scheduler/SparkListenerSuite.scala    | 28 +++++++++++++-------
 2 files changed, 19 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f0abf5f0/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
index 545fa45..cbac4c1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
@@ -50,9 +50,6 @@ private[spark] class LiveListenerBus extends SparkListenerBus 
with Logging {
     }
   }
 
-  // Exposed for testing
-  @volatile private[spark] var stopCalled = false
-
   /**
    * Start sending events to attached listeners.
    *
@@ -97,7 +94,6 @@ private[spark] class LiveListenerBus extends SparkListenerBus 
with Logging {
   }
 
   def stop() {
-    stopCalled = true
     if (!started) {
       throw new IllegalStateException("Attempted to stop a listener bus that 
has not yet started!")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/f0abf5f0/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 4cdccdd..36511a9 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -77,14 +77,21 @@ class SparkListenerSuite extends FunSuite with 
LocalSparkContext with ShouldMatc
   test("bus.stop() waits for the event queue to completely drain") {
     @volatile var drained = false
 
+    // When Listener has started
+    val listenerStarted = new Semaphore(0)
+
     // Tells the listener to stop blocking
-    val listenerWait = new Semaphore(1)
+    val listenerWait = new Semaphore(0)
+
+    // When stopper has started
+    val stopperStarted = new Semaphore(0)
 
-    // When stop has returned
-    val stopReturned = new Semaphore(1)
+    // When stopper has returned
+    val stopperReturned = new Semaphore(0)
 
     class BlockingListener extends SparkListener {
       override def onJobEnd(jobEnd: SparkListenerJobEnd) = {
+        listenerStarted.release()
         listenerWait.acquire()
         drained = true
       }
@@ -97,23 +104,26 @@ class SparkListenerSuite extends FunSuite with 
LocalSparkContext with ShouldMatc
     bus.start()
     bus.post(SparkListenerJobEnd(0, JobSucceeded))
 
-    // the queue should not drain immediately
+    listenerStarted.acquire()
+    // Listener should be blocked after start
     assert(!drained)
 
     new Thread("ListenerBusStopper") {
       override def run() {
+        stopperStarted.release()
         // stop() will block until notify() is called below
         bus.stop()
-        stopReturned.release(1)
+        stopperReturned.release()
       }
     }.start()
 
-    while (!bus.stopCalled) {
-      Thread.sleep(10)
-    }
+    stopperStarted.acquire()
+    // Listener should remain blocked after stopper started
+    assert(!drained)
 
+    // unblock Listener to let queue drain
     listenerWait.release()
-    stopReturned.acquire()
+    stopperReturned.acquire()
     assert(drained)
   }
 

Reply via email to