Repository: spark
Updated Branches:
  refs/heads/master 6930e965e -> 94598653b


[SPARK-6428][Streaming] Added explicit types for all public methods.

Author: Reynold Xin <[email protected]>

Closes #5110 from rxin/streaming-explicit-type and squashes the following 
commits:

2c2db32 [Reynold Xin] [SPARK-6428][Streaming] Added explicit types for all 
public methods.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/94598653
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/94598653
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/94598653

Branch: refs/heads/master
Commit: 94598653bc772e71709163db3fed4048aa7f5f75
Parents: 6930e96
Author: Reynold Xin <[email protected]>
Authored: Tue Mar 24 17:08:25 2015 -0700
Committer: Reynold Xin <[email protected]>
Committed: Tue Mar 24 17:08:25 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/streaming/Checkpoint.scala |  6 ++--
 .../apache/spark/streaming/DStreamGraph.scala   |  6 ++--
 .../org/apache/spark/streaming/Duration.scala   | 15 +++++----
 .../org/apache/spark/streaming/Interval.scala   | 10 +++---
 .../spark/streaming/StreamingContext.scala      |  5 +--
 .../streaming/api/java/JavaDStreamLike.scala    |  3 +-
 .../streaming/api/python/PythonDStream.scala    | 15 +++++----
 .../spark/streaming/dstream/DStream.scala       | 20 ++++++------
 .../dstream/DStreamCheckpointData.scala         |  2 +-
 .../streaming/dstream/FileInputDStream.scala    |  4 +--
 .../streaming/dstream/FilteredDStream.scala     |  2 +-
 .../dstream/FlatMapValuedDStream.scala          |  2 +-
 .../streaming/dstream/FlatMappedDStream.scala   |  2 +-
 .../streaming/dstream/ForEachDStream.scala      |  2 +-
 .../streaming/dstream/GlommedDStream.scala      |  2 +-
 .../spark/streaming/dstream/InputDStream.scala  |  2 +-
 .../dstream/MapPartitionedDStream.scala         |  2 +-
 .../streaming/dstream/MapValuedDStream.scala    |  2 +-
 .../spark/streaming/dstream/MappedDStream.scala |  2 +-
 .../dstream/ReducedWindowedDStream.scala        |  4 +--
 .../streaming/dstream/ShuffledDStream.scala     |  2 +-
 .../spark/streaming/dstream/StateDStream.scala  |  2 +-
 .../streaming/dstream/TransformedDStream.scala  |  2 +-
 .../spark/streaming/dstream/UnionDStream.scala  |  6 ++--
 .../streaming/dstream/WindowedDStream.scala     |  2 +-
 .../rdd/WriteAheadLogBackedBlockRDD.scala       |  6 ++--
 .../streaming/receiver/ActorReceiver.scala      | 18 +++++------
 .../streaming/receiver/BlockGenerator.scala     |  2 +-
 .../spark/streaming/receiver/Receiver.scala     |  2 +-
 .../streaming/receiver/ReceiverSupervisor.scala |  4 +--
 .../receiver/ReceiverSupervisorImpl.scala       |  6 ++--
 .../apache/spark/streaming/scheduler/Job.scala  |  2 +-
 .../streaming/scheduler/JobGenerator.scala      |  8 ++---
 .../streaming/scheduler/JobScheduler.scala      |  2 +-
 .../spark/streaming/scheduler/JobSet.scala      | 11 +++----
 .../streaming/scheduler/ReceiverTracker.scala   | 11 +++----
 .../ui/StreamingJobProgressListener.scala       | 33 +++++++++++---------
 .../spark/streaming/util/RawTextHelper.scala    | 14 +++++----
 38 files changed, 127 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/94598653/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index db64e11..f73b463 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -67,12 +67,12 @@ object Checkpoint extends Logging {
   val REGEX = (PREFIX + """([\d]+)([\w\.]*)""").r
 
   /** Get the checkpoint file for the given checkpoint time */
-  def checkpointFile(checkpointDir: String, checkpointTime: Time) = {
+  def checkpointFile(checkpointDir: String, checkpointTime: Time): Path = {
     new Path(checkpointDir, PREFIX + checkpointTime.milliseconds)
   }
 
   /** Get the checkpoint backup file for the given checkpoint time */
-  def checkpointBackupFile(checkpointDir: String, checkpointTime: Time) = {
+  def checkpointBackupFile(checkpointDir: String, checkpointTime: Time): Path 
= {
     new Path(checkpointDir, PREFIX + checkpointTime.milliseconds + ".bk")
   }
 
@@ -232,6 +232,8 @@ object CheckpointReader extends Logging {
   def read(checkpointDir: String, conf: SparkConf, hadoopConf: Configuration): 
Option[Checkpoint] =
   {
     val checkpointPath = new Path(checkpointDir)
+
+    // TODO(rxin): Why is this a def?!
     def fs = checkpointPath.getFileSystem(hadoopConf)
 
     // Try to find the checkpoint files

http://git-wip-us.apache.org/repos/asf/spark/blob/94598653/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index 0e285d6..1751404 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -100,11 +100,11 @@ final private[streaming] class DStreamGraph extends 
Serializable with Logging {
     }
   }
 
-  def getInputStreams() = this.synchronized { inputStreams.toArray }
+  def getInputStreams(): Array[InputDStream[_]] = this.synchronized { 
inputStreams.toArray }
 
-  def getOutputStreams() = this.synchronized { outputStreams.toArray }
+  def getOutputStreams(): Array[DStream[_]] = this.synchronized { 
outputStreams.toArray }
 
-  def getReceiverInputStreams() = this.synchronized {
+  def getReceiverInputStreams(): Array[ReceiverInputDStream[_]] = 
this.synchronized {
     inputStreams.filter(_.isInstanceOf[ReceiverInputDStream[_]])
       .map(_.asInstanceOf[ReceiverInputDStream[_]])
       .toArray

http://git-wip-us.apache.org/repos/asf/spark/blob/94598653/streaming/src/main/scala/org/apache/spark/streaming/Duration.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Duration.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/Duration.scala
index a0d8fb5..3249bb3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Duration.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Duration.scala
@@ -55,7 +55,6 @@ case class Duration (private val millis: Long) {
 
   def div(that: Duration): Double = this / that
 
-
   def isMultipleOf(that: Duration): Boolean =
     (this.millis % that.millis == 0)
 
@@ -71,7 +70,7 @@ case class Duration (private val millis: Long) {
 
   def milliseconds: Long = millis
 
-  def prettyPrint = Utils.msDurationToString(millis)
+  def prettyPrint: String = Utils.msDurationToString(millis)
 
 }
 
@@ -80,7 +79,7 @@ case class Duration (private val millis: Long) {
  * a given number of milliseconds.
  */
 object Milliseconds {
-  def apply(milliseconds: Long) = new Duration(milliseconds)
+  def apply(milliseconds: Long): Duration = new Duration(milliseconds)
 }
 
 /**
@@ -88,7 +87,7 @@ object Milliseconds {
  * a given number of seconds.
  */
 object Seconds {
-  def apply(seconds: Long) = new Duration(seconds * 1000)
+  def apply(seconds: Long): Duration = new Duration(seconds * 1000)
 }
 
 /**
@@ -96,7 +95,7 @@ object Seconds {
  * a given number of minutes.
  */
 object Minutes {
-  def apply(minutes: Long) = new Duration(minutes * 60000)
+  def apply(minutes: Long): Duration = new Duration(minutes * 60000)
 }
 
 // Java-friendlier versions of the objects above.
@@ -107,16 +106,16 @@ object Durations {
   /**
    * @return [[org.apache.spark.streaming.Duration]] representing given number 
of milliseconds.
    */
-  def milliseconds(milliseconds: Long) = Milliseconds(milliseconds)
+  def milliseconds(milliseconds: Long): Duration = Milliseconds(milliseconds)
 
   /**
    * @return [[org.apache.spark.streaming.Duration]] representing given number 
of seconds.
    */
-  def seconds(seconds: Long) = Seconds(seconds)
+  def seconds(seconds: Long): Duration = Seconds(seconds)
 
   /**
    * @return [[org.apache.spark.streaming.Duration]] representing given number 
of minutes.
    */
-  def minutes(minutes: Long) = Minutes(minutes)
+  def minutes(minutes: Long): Duration = Minutes(minutes)
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/94598653/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala
index ad4f3fd..3f5be78 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Interval.scala
@@ -39,18 +39,18 @@ class Interval(val beginTime: Time, val endTime: Time) {
     this.endTime < that.endTime
   }
 
-  def <= (that: Interval) = (this < that || this == that)
+  def <= (that: Interval): Boolean = (this < that || this == that)
 
-  def > (that: Interval) = !(this <= that)
+  def > (that: Interval): Boolean = !(this <= that)
 
-  def >= (that: Interval) = !(this < that)
+  def >= (that: Interval): Boolean = !(this < that)
 
-  override def toString = "[" + beginTime + ", " + endTime + "]"
+  override def toString: String = "[" + beginTime + ", " + endTime + "]"
 }
 
 private[streaming]
 object Interval {
-  def currentInterval(duration: Duration): Interval  = {
+  def currentInterval(duration: Duration): Interval = {
     val time = new Time(System.currentTimeMillis)
     val intervalBegin = time.floor(duration)
     new Interval(intervalBegin, intervalBegin + duration)

http://git-wip-us.apache.org/repos/asf/spark/blob/94598653/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 543224d..f57f295 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -188,7 +188,7 @@ class StreamingContext private[streaming] (
   /**
    * Return the associated Spark context
    */
-  def sparkContext = sc
+  def sparkContext: SparkContext = sc
 
   /**
    * Set each DStreams in this context to remember RDDs it generated in the 
last given duration.
@@ -596,7 +596,8 @@ object StreamingContext extends Logging {
   @deprecated("Replaced by implicit functions in the DStream companion object. 
This is " +
     "kept here only for backward compatibility.", "1.3.0")
   def toPairDStreamFunctions[K, V](stream: DStream[(K, V)])
-      (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = {
+      (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null)
+    : PairDStreamFunctions[K, V] = {
     DStream.toPairDStreamFunctions(stream)(kt, vt, ord)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/94598653/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index 2eabdd9..73030e1 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -415,8 +415,9 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, 
R], R <: JavaRDDLike[T
     implicit val cmv2: ClassTag[V2] = fakeClassTag
     implicit val cmw: ClassTag[W] = fakeClassTag
 
-    def scalaTransform (inThis: RDD[T], inThat: RDD[(K2, V2)], time: Time): 
RDD[W] =
+    def scalaTransform (inThis: RDD[T], inThat: RDD[(K2, V2)], time: Time): 
RDD[W] = {
       transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
+    }
     dstream.transformWith[(K2, V2), W](other.dstream, scalaTransform(_, _, _))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/94598653/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
index 7053f47..4c28654 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
@@ -176,11 +176,11 @@ private[python] abstract class PythonDStream(
 
   val func = new TransformFunction(pfunc)
 
-  override def dependencies = List(parent)
+  override def dependencies: List[DStream[_]] = List(parent)
 
   override def slideDuration: Duration = parent.slideDuration
 
-  val asJavaDStream  = JavaDStream.fromDStream(this)
+  val asJavaDStream: JavaDStream[Array[Byte]] = JavaDStream.fromDStream(this)
 }
 
 /**
@@ -212,7 +212,7 @@ private[python] class PythonTransformed2DStream(
 
   val func = new TransformFunction(pfunc)
 
-  override def dependencies = List(parent, parent2)
+  override def dependencies: List[DStream[_]] = List(parent, parent2)
 
   override def slideDuration: Duration = parent.slideDuration
 
@@ -223,7 +223,7 @@ private[python] class PythonTransformed2DStream(
     func(Some(rdd1), Some(rdd2), validTime)
   }
 
-  val asJavaDStream  = JavaDStream.fromDStream(this)
+  val asJavaDStream: JavaDStream[Array[Byte]] = JavaDStream.fromDStream(this)
 }
 
 /**
@@ -260,12 +260,15 @@ private[python] class PythonReducedWindowedDStream(
   extends PythonDStream(parent, preduceFunc) {
 
   super.persist(StorageLevel.MEMORY_ONLY)
-  override val mustCheckpoint = true
 
-  val invReduceFunc = new TransformFunction(pinvReduceFunc)
+  override val mustCheckpoint: Boolean = true
+
+  val invReduceFunc: TransformFunction = new TransformFunction(pinvReduceFunc)
 
   def windowDuration: Duration = _windowDuration
+
   override def slideDuration: Duration = _slideDuration
+
   override def parentRememberDuration: Duration = rememberDuration + 
windowDuration
 
   override def compute(validTime: Time): Option[RDD[Array[Byte]]] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/94598653/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index b874f56..795c5aa 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -104,7 +104,7 @@ abstract class DStream[T: ClassTag] (
   private[streaming] def parentRememberDuration = rememberDuration
 
   /** Return the StreamingContext associated with this DStream */
-  def context = ssc
+  def context: StreamingContext = ssc
 
   /* Set the creation call site */
   private[streaming] val creationSite = DStream.getCreationSite()
@@ -619,14 +619,16 @@ abstract class DStream[T: ClassTag] (
    * operator, so this DStream will be registered as an output stream and 
there materialized.
    */
   def print(num: Int) {
-    def foreachFunc = (rdd: RDD[T], time: Time) => {
-      val firstNum = rdd.take(num + 1)
-      println ("-------------------------------------------")
-      println ("Time: " + time)
-      println ("-------------------------------------------")
-      firstNum.take(num).foreach(println)
-      if (firstNum.size > num) println("...")
-      println()
+    def foreachFunc: (RDD[T], Time) => Unit = {
+      (rdd: RDD[T], time: Time) => {
+        val firstNum = rdd.take(num + 1)
+        println("-------------------------------------------")
+        println("Time: " + time)
+        println("-------------------------------------------")
+        firstNum.take(num).foreach(println)
+        if (firstNum.size > num) println("...")
+        println()
+      }
     }
     new ForEachDStream(this, 
context.sparkContext.clean(foreachFunc)).register()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/94598653/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
index 0dc7279..39fd213 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
@@ -114,7 +114,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: 
DStream[T])
     }
   }
 
-  override def toString() = {
+  override def toString: String = {
     "[\n" + currentCheckpointFiles.size + " checkpoint files \n" +
       currentCheckpointFiles.mkString("\n") + "\n]"
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/94598653/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 22de8c0..66d5191 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -298,7 +298,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
   private[streaming]
   class FileInputDStreamCheckpointData extends DStreamCheckpointData(this) {
 
-    def hadoopFiles = data.asInstanceOf[mutable.HashMap[Time, Array[String]]]
+    private def hadoopFiles = data.asInstanceOf[mutable.HashMap[Time, 
Array[String]]]
 
     override def update(time: Time) {
       hadoopFiles.clear()
@@ -320,7 +320,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
       }
     }
 
-    override def toString() = {
+    override def toString: String = {
       "[\n" + hadoopFiles.size + " file sets\n" +
         hadoopFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n") + 
"\n]"
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/94598653/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala
index c81534a..fcd5216 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala
@@ -27,7 +27,7 @@ class FilteredDStream[T: ClassTag](
     filterFunc: T => Boolean
   ) extends DStream[T](parent.ssc) {
 
-  override def dependencies = List(parent)
+  override def dependencies: List[DStream[_]] = List(parent)
 
   override def slideDuration: Duration = parent.slideDuration
 

http://git-wip-us.apache.org/repos/asf/spark/blob/94598653/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala
index 6586234..9d09a3b 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala
@@ -28,7 +28,7 @@ class FlatMapValuedDStream[K: ClassTag, V: ClassTag, U: 
ClassTag](
     flatMapValueFunc: V => TraversableOnce[U]
   ) extends DStream[(K, U)](parent.ssc) {
 
-  override def dependencies = List(parent)
+  override def dependencies: List[DStream[_]] = List(parent)
 
   override def slideDuration: Duration = parent.slideDuration
 

http://git-wip-us.apache.org/repos/asf/spark/blob/94598653/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
index c7bb283..475ea2d 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
@@ -27,7 +27,7 @@ class FlatMappedDStream[T: ClassTag, U: ClassTag](
     flatMapFunc: T => Traversable[U]
   ) extends DStream[U](parent.ssc) {
 
-  override def dependencies = List(parent)
+  override def dependencies: List[DStream[_]] = List(parent)
 
   override def slideDuration: Duration = parent.slideDuration
 

http://git-wip-us.apache.org/repos/asf/spark/blob/94598653/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
index 1361c30..685a32e 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
@@ -28,7 +28,7 @@ class ForEachDStream[T: ClassTag] (
     foreachFunc: (RDD[T], Time) => Unit
   ) extends DStream[Unit](parent.ssc) {
 
-  override def dependencies = List(parent)
+  override def dependencies: List[DStream[_]] = List(parent)
 
   override def slideDuration: Duration = parent.slideDuration
 

http://git-wip-us.apache.org/repos/asf/spark/blob/94598653/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala
index a9bb51f..dbb295f 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala
@@ -25,7 +25,7 @@ private[streaming]
 class GlommedDStream[T: ClassTag](parent: DStream[T])
   extends DStream[Array[T]](parent.ssc) {
 
-  override def dependencies = List(parent)
+  override def dependencies: List[DStream[_]] = List(parent)
 
   override def slideDuration: Duration = parent.slideDuration
 

http://git-wip-us.apache.org/repos/asf/spark/blob/94598653/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
index aa1993f..e652702 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
@@ -61,7 +61,7 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : 
StreamingContext)
     }
   }
 
-  override def dependencies = List()
+  override def dependencies: List[DStream[_]] = List()
 
   override def slideDuration: Duration = {
     if (ssc == null) throw new Exception("ssc is null")

http://git-wip-us.apache.org/repos/asf/spark/blob/94598653/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala
index 3d8ee29..5994bc1 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala
@@ -28,7 +28,7 @@ class MapPartitionedDStream[T: ClassTag, U: ClassTag](
     preservePartitioning: Boolean
   ) extends DStream[U](parent.ssc) {
 
-  override def dependencies = List(parent)
+  override def dependencies: List[DStream[_]] = List(parent)
 
   override def slideDuration: Duration = parent.slideDuration
 

http://git-wip-us.apache.org/repos/asf/spark/blob/94598653/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala
index 7aea1f9..954d2eb 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala
@@ -28,7 +28,7 @@ class MapValuedDStream[K: ClassTag, V: ClassTag, U: ClassTag](
     mapValueFunc: V => U
   ) extends DStream[(K, U)](parent.ssc) {
 
-  override def dependencies = List(parent)
+  override def dependencies: List[DStream[_]] = List(parent)
 
   override def slideDuration: Duration = parent.slideDuration
 

http://git-wip-us.apache.org/repos/asf/spark/blob/94598653/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala
index 02704a8..fa14b2e 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala
@@ -27,7 +27,7 @@ class MappedDStream[T: ClassTag, U: ClassTag] (
     mapFunc: T => U
   ) extends DStream[U](parent.ssc) {
 
-  override def dependencies = List(parent)
+  override def dependencies: List[DStream[_]] = List(parent)
 
   override def slideDuration: Duration = parent.slideDuration
 

http://git-wip-us.apache.org/repos/asf/spark/blob/94598653/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
index c0a5af0..1385ccb 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -52,7 +52,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
 
   // Reduce each batch of data using reduceByKey which will be further reduced 
by window
   // by ReducedWindowedDStream
-  val reducedStream = parent.reduceByKey(reduceFunc, partitioner)
+  private val reducedStream = parent.reduceByKey(reduceFunc, partitioner)
 
   // Persist RDDs to memory by default as these RDDs are going to be reused.
   super.persist(StorageLevel.MEMORY_ONLY_SER)
@@ -60,7 +60,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
 
   def windowDuration: Duration =  _windowDuration
 
-  override def dependencies = List(reducedStream)
+  override def dependencies: List[DStream[_]] = List(reducedStream)
 
   override def slideDuration: Duration = _slideDuration
 

http://git-wip-us.apache.org/repos/asf/spark/blob/94598653/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
index 880a89b..7757cca 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
@@ -33,7 +33,7 @@ class ShuffledDStream[K: ClassTag, V: ClassTag, C: ClassTag](
     mapSideCombine: Boolean = true
   ) extends DStream[(K,C)] (parent.ssc) {
 
-  override def dependencies = List(parent)
+  override def dependencies: List[DStream[_]] = List(parent)
 
   override def slideDuration: Duration = parent.slideDuration
 

http://git-wip-us.apache.org/repos/asf/spark/blob/94598653/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
index ebb04dd..de8718d 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
@@ -36,7 +36,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
 
   super.persist(StorageLevel.MEMORY_ONLY_SER)
 
-  override def dependencies = List(parent)
+  override def dependencies: List[DStream[_]] = List(parent)
 
   override def slideDuration: Duration = parent.slideDuration
 

http://git-wip-us.apache.org/repos/asf/spark/blob/94598653/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
index 71b6185..5d46ca0 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
@@ -32,7 +32,7 @@ class TransformedDStream[U: ClassTag] (
   require(parents.map(_.slideDuration).distinct.size == 1,
     "Some of the DStreams have different slide durations")
 
-  override def dependencies = parents.toList
+  override def dependencies: List[DStream[_]] = parents.toList
 
   override def slideDuration: Duration = parents.head.slideDuration
 

http://git-wip-us.apache.org/repos/asf/spark/blob/94598653/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
index abbc40b..9405dba 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
@@ -33,17 +33,17 @@ class UnionDStream[T: ClassTag](parents: Array[DStream[T]])
   require(parents.map(_.slideDuration).distinct.size == 1,
     "Some of the DStreams have different slide durations")
 
-  override def dependencies = parents.toList
+  override def dependencies: List[DStream[_]] = parents.toList
 
   override def slideDuration: Duration = parents.head.slideDuration
 
   override def compute(validTime: Time): Option[RDD[T]] = {
     val rdds = new ArrayBuffer[RDD[T]]()
-    parents.map(_.getOrCompute(validTime)).foreach(_ match {
+    parents.map(_.getOrCompute(validTime)).foreach {
       case Some(rdd) => rdds += rdd
       case None => throw new Exception("Could not generate RDD from a parent 
for unifying at time "
         + validTime)
-    })
+    }
     if (rdds.size > 0) {
       Some(new UnionRDD(ssc.sc, rdds))
     } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/94598653/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
index 775b6bf..899865a 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
@@ -46,7 +46,7 @@ class WindowedDStream[T: ClassTag](
 
   def windowDuration: Duration =  _windowDuration
 
-  override def dependencies = List(parent)
+  override def dependencies: List[DStream[_]] = List(parent)
 
   override def slideDuration: Duration = _slideDuration
 

http://git-wip-us.apache.org/repos/asf/spark/blob/94598653/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
index dd1e963..93caa4b 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
@@ -117,8 +117,8 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
   override def getPreferredLocations(split: Partition): Seq[String] = {
     val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition]
     val blockLocations = getBlockIdLocations().get(partition.blockId)
-    def segmentLocations = HdfsUtils.getFileSegmentLocations(
-      partition.segment.path, partition.segment.offset, 
partition.segment.length, hadoopConfig)
-    blockLocations.getOrElse(segmentLocations)
+    blockLocations.getOrElse(
+      HdfsUtils.getFileSegmentLocations(
+        partition.segment.path, partition.segment.offset, 
partition.segment.length, hadoopConfig))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/94598653/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala
index a7d63bd..cd30978 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.streaming.receiver
 
+import java.nio.ByteBuffer
 import java.util.concurrent.atomic.AtomicInteger
 
 import scala.concurrent.duration._
@@ -25,10 +26,10 @@ import scala.reflect.ClassTag
 
 import akka.actor._
 import akka.actor.SupervisorStrategy.{Escalate, Restart}
+
 import org.apache.spark.{Logging, SparkEnv}
-import org.apache.spark.storage.StorageLevel
-import java.nio.ByteBuffer
 import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.storage.StorageLevel
 
 /**
  * :: DeveloperApi ::
@@ -149,13 +150,13 @@ private[streaming] class ActorReceiver[T: ClassTag](
   class Supervisor extends Actor {
 
     override val supervisorStrategy = receiverSupervisorStrategy
-    val worker = context.actorOf(props, name)
+    private val worker = context.actorOf(props, name)
     logInfo("Started receiver worker at:" + worker.path)
 
-    val n: AtomicInteger = new AtomicInteger(0)
-    val hiccups: AtomicInteger = new AtomicInteger(0)
+    private val n: AtomicInteger = new AtomicInteger(0)
+    private val hiccups: AtomicInteger = new AtomicInteger(0)
 
-    def receive = {
+    override def receive: PartialFunction[Any, Unit] = {
 
       case IteratorData(iterator) =>
         logDebug("received iterator")
@@ -189,13 +190,12 @@ private[streaming] class ActorReceiver[T: ClassTag](
     }
   }
 
-  def onStart() = {
+  def onStart(): Unit = {
     supervisor
     logInfo("Supervision tree for receivers initialized at:" + supervisor.path)
-
   }
 
-  def onStop() = {
+  def onStop(): Unit = {
     supervisor ! PoisonPill
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/94598653/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
index ee5e639..42514d8 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
@@ -120,7 +120,7 @@ private[streaming] class BlockGenerator(
    * `BlockGeneratorListener.onAddData` callback will be called. All received 
data items
    * will be periodically pushed into BlockManager.
    */
-  def addDataWithCallback(data: Any, metadata: Any) = synchronized {
+  def addDataWithCallback(data: Any, metadata: Any): Unit = synchronized {
     waitToPush()
     currentBuffer += data
     listener.onAddData(data, metadata)

http://git-wip-us.apache.org/repos/asf/spark/blob/94598653/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
index 5acf8a9..5b5a3fe 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
@@ -245,7 +245,7 @@ abstract class Receiver[T](val storageLevel: StorageLevel) 
extends Serializable
    * Get the unique identifier the receiver input stream that this
    * receiver is associated with.
    */
-  def streamId = id
+  def streamId: Int = id
 
   /*
    * =================

http://git-wip-us.apache.org/repos/asf/spark/blob/94598653/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
index 1f0244c..4943f29 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
@@ -162,13 +162,13 @@ private[streaming] abstract class ReceiverSupervisor(
   }
 
   /** Check if receiver has been marked for stopping */
-  def isReceiverStarted() = {
+  def isReceiverStarted(): Boolean = {
     logDebug("state = " + receiverState)
     receiverState == Started
   }
 
   /** Check if receiver has been marked for stopping */
-  def isReceiverStopped() = {
+  def isReceiverStopped(): Boolean = {
     logDebug("state = " + receiverState)
     receiverState == Stopped
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/94598653/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 7d29ed8..8f2f1fe 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicLong
 import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.Await
 
-import akka.actor.{Actor, Props}
+import akka.actor.{ActorRef, Actor, Props}
 import akka.pattern.ask
 import com.google.common.base.Throwables
 import org.apache.hadoop.conf.Configuration
@@ -83,7 +83,7 @@ private[streaming] class ReceiverSupervisorImpl(
   private val actor = env.actorSystem.actorOf(
     Props(new Actor {
 
-      override def receive() = {
+      override def receive: PartialFunction[Any, Unit] = {
         case StopReceiver =>
           logInfo("Received stop signal")
           stop("Stopped by driver", None)
@@ -92,7 +92,7 @@ private[streaming] class ReceiverSupervisorImpl(
           cleanupOldBlocks(threshTime)
       }
 
-      def ref = self
+      def ref: ActorRef = self
     }), "Receiver-" + streamId + "-" + System.currentTimeMillis())
 
   /** Unique block ids if one wants to add blocks directly */

http://git-wip-us.apache.org/repos/asf/spark/blob/94598653/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
index 7e0f6b2..30cf87f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
@@ -36,5 +36,5 @@ class Job(val time: Time, func: () => _) {
     id = "streaming job " + time + "." + number
   }
 
-  override def toString = id
+  override def toString: String = id
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/94598653/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 59488df..4946806 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -82,7 +82,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends 
Logging {
     if (eventActor != null) return // generator has already been started
 
     eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
-      def receive = {
+      override def receive: PartialFunction[Any, Unit] = {
         case event: JobGeneratorEvent =>  processEvent(event)
       }
     }), "JobGenerator")
@@ -111,8 +111,8 @@ class JobGenerator(jobScheduler: JobScheduler) extends 
Logging {
       val pollTime = 100
 
       // To prevent graceful stop to get stuck permanently
-      def hasTimedOut = {
-        val timedOut = System.currentTimeMillis() - timeWhenStopStarted > 
stopTimeout
+      def hasTimedOut: Boolean = {
+        val timedOut = (System.currentTimeMillis() - timeWhenStopStarted) > 
stopTimeout
         if (timedOut) {
           logWarning("Timed out while stopping the job generator (timeout = " 
+ stopTimeout + ")")
         }
@@ -133,7 +133,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends 
Logging {
       logInfo("Stopped generation timer")
 
       // Wait for the jobs to complete and checkpoints to be written
-      def haveAllBatchesBeenProcessed = {
+      def haveAllBatchesBeenProcessed: Boolean = {
         lastProcessedBatch != null && lastProcessedBatch.milliseconds == 
stopTime
       }
       logInfo("Waiting for jobs to be processed and checkpoints to be written")

http://git-wip-us.apache.org/repos/asf/spark/blob/94598653/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 60bc099..d6a93ac 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -56,7 +56,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging 
{
 
     logDebug("Starting JobScheduler")
     eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
-      def receive = {
+      override def receive: PartialFunction[Any, Unit] = {
         case event: JobSchedulerEvent => processEvent(event)
       }
     }), "JobScheduler")

http://git-wip-us.apache.org/repos/asf/spark/blob/94598653/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
index 8c15a75..5b13487 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
@@ -28,8 +28,7 @@ private[streaming]
 case class JobSet(
     time: Time,
     jobs: Seq[Job],
-    receivedBlockInfo: Map[Int, Array[ReceivedBlockInfo]] = Map.empty
-  ) {
+    receivedBlockInfo: Map[Int, Array[ReceivedBlockInfo]] = Map.empty) {
 
   private val incompleteJobs = new HashSet[Job]()
   private val submissionTime = System.currentTimeMillis() // when this jobset 
was submitted
@@ -48,17 +47,17 @@ case class JobSet(
     if (hasCompleted) processingEndTime = System.currentTimeMillis()
   }
 
-  def hasStarted = processingStartTime > 0
+  def hasStarted: Boolean = processingStartTime > 0
 
-  def hasCompleted = incompleteJobs.isEmpty
+  def hasCompleted: Boolean = incompleteJobs.isEmpty
 
   // Time taken to process all the jobs from the time they started processing
   // (i.e. not including the time they wait in the streaming scheduler queue)
-  def processingDelay = processingEndTime - processingStartTime
+  def processingDelay: Long = processingEndTime - processingStartTime
 
   // Time taken to process all the jobs from the time they were submitted
   // (i.e. including the time they wait in the streaming scheduler queue)
-  def totalDelay = {
+  def totalDelay: Long = {
     processingEndTime - time.milliseconds
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/94598653/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index b36aeb3..9890047 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -72,7 +72,7 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
   private var actor: ActorRef = null
 
   /** Start the actor and receiver execution thread. */
-  def start() = synchronized {
+  def start(): Unit = synchronized {
     if (actor != null) {
       throw new SparkException("ReceiverTracker already started")
     }
@@ -86,7 +86,7 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
   }
 
   /** Stop the receiver execution thread. */
-  def stop(graceful: Boolean) = synchronized {
+  def stop(graceful: Boolean): Unit = synchronized {
     if (!receiverInputStreams.isEmpty && actor != null) {
       // First, stop the receivers
       if (!skipReceiverLaunch) receiverExecutor.stop(graceful)
@@ -201,7 +201,7 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
 
   /** Actor to receive messages from the receivers. */
   private class ReceiverTrackerActor extends Actor {
-    def receive = {
+    override def receive: PartialFunction[Any, Unit] = {
       case RegisterReceiver(streamId, typ, host, receiverActor) =>
         registerReceiver(streamId, typ, host, receiverActor, sender)
         sender ! true
@@ -244,16 +244,15 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
 
       if (graceful) {
         val pollTime = 100
-        def done = { receiverInfo.isEmpty && !running }
         logInfo("Waiting for receiver job to terminate gracefully")
-        while(!done) {
+        while (receiverInfo.nonEmpty || running) {
           Thread.sleep(pollTime)
         }
         logInfo("Waited for receiver job to terminate gracefully")
       }
 
       // Check if all the receivers have been deregistered or not
-      if (!receiverInfo.isEmpty) {
+      if (receiverInfo.nonEmpty) {
         logWarning("Not all of the receivers have deregistered, " + 
receiverInfo)
       } else {
         logInfo("All of the receivers have deregistered successfully")

http://git-wip-us.apache.org/repos/asf/spark/blob/94598653/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index 5ee53a5..e4bd067 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -17,9 +17,10 @@
 
 package org.apache.spark.streaming.ui
 
+import scala.collection.mutable.{Queue, HashMap}
+
 import org.apache.spark.streaming.{Time, StreamingContext}
 import org.apache.spark.streaming.scheduler._
-import scala.collection.mutable.{Queue, HashMap}
 import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted
 import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted
 import org.apache.spark.streaming.scheduler.BatchInfo
@@ -59,11 +60,13 @@ private[streaming] class StreamingJobProgressListener(ssc: 
StreamingContext)
     }
   }
 
-  override def onBatchSubmitted(batchSubmitted: 
StreamingListenerBatchSubmitted) = synchronized {
-    runningBatchInfos(batchSubmitted.batchInfo.batchTime) = 
batchSubmitted.batchInfo
+  override def onBatchSubmitted(batchSubmitted: 
StreamingListenerBatchSubmitted): Unit = {
+    synchronized {
+      runningBatchInfos(batchSubmitted.batchInfo.batchTime) = 
batchSubmitted.batchInfo
+    }
   }
 
-  override def onBatchStarted(batchStarted: StreamingListenerBatchStarted) = 
synchronized {
+  override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): 
Unit = synchronized {
     runningBatchInfos(batchStarted.batchInfo.batchTime) = 
batchStarted.batchInfo
     waitingBatchInfos.remove(batchStarted.batchInfo.batchTime)
 
@@ -72,19 +75,21 @@ private[streaming] class StreamingJobProgressListener(ssc: 
StreamingContext)
     }
   }
 
-  override def onBatchCompleted(batchCompleted: 
StreamingListenerBatchCompleted) = synchronized {
-    waitingBatchInfos.remove(batchCompleted.batchInfo.batchTime)
-    runningBatchInfos.remove(batchCompleted.batchInfo.batchTime)
-    completedaBatchInfos.enqueue(batchCompleted.batchInfo)
-    if (completedaBatchInfos.size > batchInfoLimit) 
completedaBatchInfos.dequeue()
-    totalCompletedBatches += 1L
-
-    batchCompleted.batchInfo.receivedBlockInfo.foreach { case (_, infos) =>
-      totalProcessedRecords += infos.map(_.numRecords).sum
+  override def onBatchCompleted(batchCompleted: 
StreamingListenerBatchCompleted): Unit = {
+    synchronized {
+      waitingBatchInfos.remove(batchCompleted.batchInfo.batchTime)
+      runningBatchInfos.remove(batchCompleted.batchInfo.batchTime)
+      completedaBatchInfos.enqueue(batchCompleted.batchInfo)
+      if (completedaBatchInfos.size > batchInfoLimit) 
completedaBatchInfos.dequeue()
+      totalCompletedBatches += 1L
+
+      batchCompleted.batchInfo.receivedBlockInfo.foreach { case (_, infos) =>
+        totalProcessedRecords += infos.map(_.numRecords).sum
+      }
     }
   }
 
-  def numReceivers = synchronized {
+  def numReceivers: Int = synchronized {
     ssc.graph.getReceiverInputStreams().size
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/94598653/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
index a73d6f3..4d968f8 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala
@@ -18,9 +18,7 @@
 package org.apache.spark.streaming.util
 
 import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
 import org.apache.spark.util.collection.OpenHashMap
-import scala.collection.JavaConversions.mapAsScalaMap
 
 private[streaming]
 object RawTextHelper {
@@ -71,7 +69,7 @@ object RawTextHelper {
     var count = 0
 
     while(data.hasNext) {
-      value = data.next
+      value = data.next()
       if (value != null) {
         count += 1
         if (len == 0) {
@@ -108,9 +106,13 @@ object RawTextHelper {
     }
   }
 
-  def add(v1: Long, v2: Long) = (v1 + v2)
+  def add(v1: Long, v2: Long): Long = {
+    v1 + v2
+  }
 
-  def subtract(v1: Long, v2: Long) = (v1 - v2)
+  def subtract(v1: Long, v2: Long): Long = {
+    v1 - v2
+  }
 
-  def max(v1: Long, v2: Long) = math.max(v1, v2)
+  def max(v1: Long, v2: Long): Long = math.max(v1, v2)
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to