Repository: spark Updated Branches: refs/heads/branch-1.0 08e86eec5 -> 99c0c33f0
[Hot Fix #469] Fix flaky test in SparkListenerSuite The two modified tests may fail if the race condition does not bid in our favor... Author: Andrew Or <[email protected]> Closes #516 from andrewor14/stage-info-test-fix and squashes the following commits: b4b6100 [Andrew Or] Add/replace missing waitUntilEmpty() calls to listener bus (cherry picked from commit 4b2bab1d08a6b790be94717bbdd643d896d85c16) Signed-off-by: Reynold Xin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/99c0c33f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/99c0c33f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/99c0c33f Branch: refs/heads/branch-1.0 Commit: 99c0c33f0a2a364254fa6c0822246340c0d3db35 Parents: 08e86ee Author: Andrew Or <[email protected]> Authored: Wed Apr 23 21:59:33 2014 -0700 Committer: Reynold Xin <[email protected]> Committed: Wed Apr 23 21:59:43 2014 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/scheduler/SparkListenerSuite.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/99c0c33f/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index ab13917..ba048ce 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -50,9 +50,9 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, JobSucceeded)) } assert(counter.count === 0) - // Starting listener bus should flush all buffered events (asynchronously, hence the sleep) + // Starting listener bus should flush all buffered events bus.start() - Thread.sleep(1000) + assert(bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) assert(counter.count === 5) // After listener bus has stopped, posting events should not increment counter @@ -177,6 +177,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc listener.stageInfos.clear() rdd3.count() + assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) listener.stageInfos.size should be {2} // Shuffle map stage + result stage val stageInfo3 = listener.stageInfos.keys.find(_.stageId == 2).get stageInfo3.rddInfos.size should be {2} // ShuffledRDD, MapPartitionsRDD
