Repository: spark Updated Branches: refs/heads/master c4979f6ea -> 0a811210f
[SPARK-18617][SPARK-18560][TEST] Fix flaky test: StreamingContextSuite. Receiver data should be deserialized properly ## What changes were proposed in this pull request? Fixed the potential SparkContext leak in `StreamingContextSuite.SPARK-18560 Receiver data should be deserialized properly` which was added in #16052. I also removed FakeByteArrayReceiver and used TestReceiver directly. ## How was this patch tested? Jenkins Author: Shixiong Zhu <[email protected]> Closes #16091 from zsxwing/SPARK-18617-follow-up. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a811210 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a811210 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a811210 Branch: refs/heads/master Commit: 0a811210f809eb5b80eae14694d484d45b48b3f6 Parents: c4979f6 Author: Shixiong Zhu <[email protected]> Authored: Wed Nov 30 17:41:43 2016 -0800 Committer: Reynold Xin <[email protected]> Committed: Wed Nov 30 17:41:43 2016 -0800 ---------------------------------------------------------------------- .../spark/streaming/StreamingContextSuite.scala | 34 +++++--------------- 1 file changed, 8 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/0a811210/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 45d8f50..35eeb9d 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.streaming import java.io.{File, NotSerializableException} +import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.atomic.AtomicInteger import scala.collection.mutable.ArrayBuffer @@ -811,7 +812,8 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo // other one. Then Spark jobs need to fetch remote blocks and it will trigger SPARK-18560. val conf = new SparkConf().setMaster("local-cluster[2,1,1024]").setAppName(appName) ssc = new StreamingContext(conf, Milliseconds(100)) - val input = ssc.receiverStream(new FakeByteArrayReceiver) + val input = ssc.receiverStream(new TestReceiver) + val latch = new CountDownLatch(1) input.count().foreachRDD { rdd => // Make sure we can read from BlockRDD if (rdd.collect().headOption.getOrElse(0L) > 0) { @@ -820,12 +822,17 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo setDaemon(true) override def run(): Unit = { ssc.stop(stopSparkContext = true, stopGracefully = false) + latch.countDown() } }.start() } } ssc.start() ssc.awaitTerminationOrTimeout(60000) + // Wait until `ssc.top` returns. Otherwise, we may finish this test too fast and leak an active + // SparkContext. Note: the stop codes in `after` will just do nothing if `ssc.stop` in this test + // is running. + assert(latch.await(60, TimeUnit.SECONDS)) } def addInputStream(s: StreamingContext): DStream[Int] = { @@ -891,31 +898,6 @@ object TestReceiver { val counter = new AtomicInteger(1) } -class FakeByteArrayReceiver extends Receiver[Array[Byte]](StorageLevel.MEMORY_ONLY) with Logging { - - val data: Array[Byte] = "test".getBytes - var receivingThreadOption: Option[Thread] = None - - override def onStart(): Unit = { - val thread = new Thread() { - override def run() { - logInfo("Receiving started") - while (!isStopped) { - store(data) - } - logInfo("Receiving stopped") - } - } - receivingThreadOption = Some(thread) - thread.start() - } - - override def onStop(): Unit = { - // no clean to be done, the receiving thread should stop on it own, so just wait for it. - receivingThreadOption.foreach(_.join()) - } -} - /** Custom receiver for testing whether a slow receiver can be shutdown gracefully or not */ class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int) extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
