Repository: spark
Updated Branches:
  refs/heads/branch-2.1 3c8861d92 -> 162bdb910


[SPARK-18031][TESTS] Fix flaky test ExecutorAllocationManagerSuite.basic 
functionality

## What changes were proposed in this pull request?

The failure is because in `test("basic functionality")`, it doesn't block until 
`ExecutorAllocationManager.manageAllocation` is called. This PR just adds 
StreamManualClock to allow the tests to block on expected wait time to make the 
test deterministic.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <[email protected]>

Closes #16321 from zsxwing/SPARK-18031.

(cherry picked from commit ccfe60a8304871779ff1b31b8c2d724f59d5b2af)
Signed-off-by: Tathagata Das <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/162bdb91
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/162bdb91
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/162bdb91

Branch: refs/heads/branch-2.1
Commit: 162bdb9103ecba99cd73004ddddede4d55ff8fc8
Parents: 3c8861d
Author: Shixiong Zhu <[email protected]>
Authored: Wed Dec 21 11:17:44 2016 -0800
Committer: Tathagata Das <[email protected]>
Committed: Wed Dec 21 11:17:53 2016 -0800

----------------------------------------------------------------------
 .../ExecutorAllocationManagerSuite.scala        | 36 +++++++++++++++++---
 1 file changed, 32 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/162bdb91/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
index b49e579..1d2bf35 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
@@ -36,11 +36,11 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite
 
   private val batchDurationMillis = 1000L
   private var allocationClient: ExecutorAllocationClient = null
-  private var clock: ManualClock = null
+  private var clock: StreamManualClock = null
 
   before {
     allocationClient = mock[ExecutorAllocationClient]
-    clock = new ManualClock()
+    clock = new StreamManualClock()
   }
 
   test("basic functionality") {
@@ -57,10 +57,14 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite
         reset(allocationClient)
         when(allocationClient.getExecutorIds()).thenReturn(Seq("1", "2"))
         addBatchProcTime(allocationManager, batchProcTimeMs.toLong)
-        clock.advance(SCALING_INTERVAL_DEFAULT_SECS * 1000 + 1)
+        val advancedTime = SCALING_INTERVAL_DEFAULT_SECS * 1000 + 1
+        val expectedWaitTime = clock.getTimeMillis() + advancedTime
+        clock.advance(advancedTime)
+        // Make sure ExecutorAllocationManager.manageAllocation is called
         eventually(timeout(10 seconds)) {
-          body
+          assert(clock.isStreamWaitingAt(expectedWaitTime))
         }
+        body
       }
 
       /** Verify that the expected number of total executor were requested */
@@ -394,3 +398,27 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite
     }
   }
 }
+
+/**
+ * A special manual clock that provide `isStreamWaitingAt` to allow the user 
to check if the clock
+ * is blocking.
+ */
+class StreamManualClock(time: Long = 0L) extends ManualClock(time) with 
Serializable {
+  private var waitStartTime: Option[Long] = None
+
+  override def waitTillTime(targetTime: Long): Long = synchronized {
+    try {
+      waitStartTime = Some(getTimeMillis())
+      super.waitTillTime(targetTime)
+    } finally {
+      waitStartTime = None
+    }
+  }
+
+  /**
+   * Returns if the clock is blocking and the time it started to block is the 
parameter `time`.
+   */
+  def isStreamWaitingAt(time: Long): Boolean = synchronized {
+    waitStartTime == Some(time)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to