Repository: spark Updated Branches: refs/heads/master 864de3bf4 -> a16396df7
[SPARK-10772] [STREAMING] [SCALA] NullPointerException when transform function in DStream returns NULL Currently, the ```TransformedDStream``` will using ```Some(transformFunc(parentRDDs, validTime))``` as compute return value, when the ```transformFunc``` somehow returns null as return value, the followed operator will have NullPointerExeception. This fix uses the ```Option()``` instead of ```Some()``` to deal with the possible null value. When ```transformFunc``` returns ```null```, the option will transform null to ```None```, the downstream can handle ```None``` correctly. NOTE (2015-09-25): The latest fix will check the return value of transform function, if it is ```NULL```, a spark exception will be thrown out Author: Jacker Hu <[email protected]> Author: jhu-chang <[email protected]> Closes #8881 from jhu-chang/Fix_Transform. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a16396df Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a16396df Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a16396df Branch: refs/heads/master Commit: a16396df76cc27099011bfb96b28cbdd7f964ca8 Parents: 864de3b Author: Jacker Hu <[email protected]> Authored: Sat Oct 10 11:36:18 2015 +0100 Committer: Sean Owen <[email protected]> Committed: Sat Oct 10 11:36:18 2015 +0100 ---------------------------------------------------------------------- .../spark/streaming/dstream/TransformedDStream.scala | 12 ++++++++++-- .../apache/spark/streaming/BasicOperationsSuite.scala | 13 +++++++++++++ 2 files changed, 23 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/a16396df/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala index 5d46ca0..ab01f47 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala @@ -17,9 +17,11 @@ package org.apache.spark.streaming.dstream +import scala.reflect.ClassTag + +import org.apache.spark.SparkException import org.apache.spark.rdd.{PairRDDFunctions, RDD} import org.apache.spark.streaming.{Duration, Time} -import scala.reflect.ClassTag private[streaming] class TransformedDStream[U: ClassTag] ( @@ -38,6 +40,12 @@ class TransformedDStream[U: ClassTag] ( override def compute(validTime: Time): Option[RDD[U]] = { val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq - Some(transformFunc(parentRDDs, validTime)) + val transformedRDD = transformFunc(parentRDDs, validTime) + if (transformedRDD == null) { + throw new SparkException("Transform function must not return null. " + + "Return SparkContext.emptyRDD() instead to represent no element " + + "as the result of transformation.") + } + Some(transformedRDD) } } http://git-wip-us.apache.org/repos/asf/spark/blob/a16396df/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 2553768..9988f41 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -211,6 +211,19 @@ class BasicOperationsSuite extends TestSuiteBase { ) } + test("transform with NULL") { + val input = Seq(1 to 4) + intercept[SparkException] { + testOperation( + input, + (r: DStream[Int]) => r.transform(rdd => null.asInstanceOf[RDD[Int]]), + Seq(Seq()), + 1, + false + ) + } + } + test("transformWith") { val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() ) val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("") ) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
