Repository: spark Updated Branches: refs/heads/branch-1.4 2337ccc15 -> 065d114c6
[SPARK-7430] [STREAMING] [TEST] General improvements to streaming tests to increase debuggability Author: Tathagata Das <[email protected]> Closes #5961 from tdas/SPARK-7430 and squashes the following commits: d654978 [Tathagata Das] Fix scala style fbf7174 [Tathagata Das] Added more verbose assert failure messages. 6aea07a [Tathagata Das] Ensure SynchronizedBuffer is used in every TestSuiteBase (cherry picked from commit cfdadcbd2b529cd9ac721509a7ebafe436afcd8d) Signed-off-by: Tathagata Das <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/065d114c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/065d114c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/065d114c Branch: refs/heads/branch-1.4 Commit: 065d114c6d0d0c51ee9f06b93c87dba196eb3626 Parents: 2337ccc1 Author: Tathagata Das <[email protected]> Authored: Thu May 7 00:21:10 2015 -0700 Committer: Tathagata Das <[email protected]> Committed: Thu May 7 00:21:27 2015 -0700 ---------------------------------------------------------------------- .../apache/spark/streaming/TestSuiteBase.scala | 33 +++++++++++++------- 1 file changed, 21 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/065d114c/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 4d0cd75..4f70ae7 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -73,9 +73,11 @@ class TestInputStream[T: ClassTag](ssc_ : StreamingContext, input: Seq[Seq[T]], * * The buffer contains a sequence of RDD's, each containing a sequence of items */ -class TestOutputStream[T: ClassTag](parent: DStream[T], - val output: ArrayBuffer[Seq[T]] = ArrayBuffer[Seq[T]]()) - extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => { +class TestOutputStream[T: ClassTag]( + parent: DStream[T], + val output: SynchronizedBuffer[Seq[T]] = + new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]] + ) extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => { val collected = rdd.collect() output += collected }) { @@ -95,8 +97,10 @@ class TestOutputStream[T: ClassTag](parent: DStream[T], * The buffer contains a sequence of RDD's, each containing a sequence of partitions, each * containing a sequence of items. */ -class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T], - val output: ArrayBuffer[Seq[Seq[T]]] = ArrayBuffer[Seq[Seq[T]]]()) +class TestOutputStreamWithPartitions[T: ClassTag]( + parent: DStream[T], + val output: SynchronizedBuffer[Seq[Seq[T]]] = + new ArrayBuffer[Seq[Seq[T]]] with SynchronizedBuffer[Seq[Seq[T]]]) extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => { val collected = rdd.glom().collect().map(_.toSeq) output += collected @@ -108,10 +112,6 @@ class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T], ois.defaultReadObject() output.clear() } - - def toTestOutputStream: TestOutputStream[T] = { - new TestOutputStream[T](this.parent, this.output.map(_.flatten)) - } } /** @@ -425,12 +425,21 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { logInfo("--------------------------------") // Match the output with the expected output - assert(output.size === expectedOutput.size, "Number of outputs do not match") for (i <- 0 until output.size) { if (useSet) { - assert(output(i).toSet === expectedOutput(i).toSet) + assert( + output(i).toSet === expectedOutput(i).toSet, + s"Set comparison failed\n" + + s"Expected output (${expectedOutput.size} items):\n${expectedOutput.mkString("\n")}\n" + + s"Generated output (${output.size} items): ${output.mkString("\n")}" + ) } else { - assert(output(i).toList === expectedOutput(i).toList) + assert( + output(i).toList === expectedOutput(i).toList, + s"Ordered list comparison failed\n" + + s"Expected output (${expectedOutput.size} items):\n${expectedOutput.mkString("\n")}\n" + + s"Generated output (${output.size} items): ${output.mkString("\n")}" + ) } } logInfo("Output verified successfully") --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
