[SPARK-6428] Added explicit types for all public methods in core.

Author: Reynold Xin <[email protected]>

Closes #5125 from rxin/core-explicit-type and squashes the following commits:

f471415 [Reynold Xin] Revert style checker changes.
81b66e4 [Reynold Xin] Code review feedback.
a7533e3 [Reynold Xin] Mima excludes.
1d795f5 [Reynold Xin] [SPARK-6428] Added explicit types for all public methods 
in core.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4ce2782a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4ce2782a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4ce2782a

Branch: refs/heads/master
Commit: 4ce2782a61e23ed0326faac2ee97a9bd36ec8963
Parents: bfd3ee9
Author: Reynold Xin <[email protected]>
Authored: Mon Mar 23 23:41:06 2015 -0700
Committer: Reynold Xin <[email protected]>
Committed: Mon Mar 23 23:41:06 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/Accumulators.scala   | 23 +++--
 .../scala/org/apache/spark/Dependency.scala     |  6 +-
 .../scala/org/apache/spark/FutureAction.scala   |  4 +-
 .../org/apache/spark/HeartbeatReceiver.scala    |  2 +-
 .../org/apache/spark/MapOutputTracker.scala     |  2 +-
 .../scala/org/apache/spark/Partitioner.scala    |  4 +-
 .../org/apache/spark/SerializableWritable.scala |  6 +-
 .../main/scala/org/apache/spark/SparkConf.scala |  2 +-
 .../scala/org/apache/spark/SparkContext.scala   | 48 ++++++-----
 .../main/scala/org/apache/spark/TaskState.scala |  4 +-
 .../main/scala/org/apache/spark/TestUtils.scala |  2 +-
 .../org/apache/spark/api/java/JavaRDD.scala     |  2 +-
 .../spark/api/java/JavaSparkContext.scala       |  2 +-
 .../org/apache/spark/api/java/JavaUtils.scala   | 35 ++++----
 .../org/apache/spark/api/python/PythonRDD.scala | 39 ++++-----
 .../org/apache/spark/api/python/SerDeUtil.scala |  2 +-
 .../WriteInputFormatTestDataGenerator.scala     | 27 +++---
 .../org/apache/spark/broadcast/Broadcast.scala  |  2 +-
 .../spark/broadcast/BroadcastManager.scala      |  2 +-
 .../apache/spark/broadcast/HttpBroadcast.scala  |  4 +-
 .../spark/broadcast/HttpBroadcastFactory.scala  |  2 +-
 .../spark/broadcast/TorrentBroadcast.scala      |  2 +-
 .../broadcast/TorrentBroadcastFactory.scala     |  2 +-
 .../scala/org/apache/spark/deploy/Client.scala  |  4 +-
 .../apache/spark/deploy/ClientArguments.scala   |  2 +-
 .../org/apache/spark/deploy/DeployMessage.scala |  2 +-
 .../spark/deploy/FaultToleranceTest.scala       |  2 +-
 .../org/apache/spark/deploy/JsonProtocol.scala  | 15 ++--
 .../apache/spark/deploy/SparkHadoopUtil.scala   |  2 +-
 .../org/apache/spark/deploy/SparkSubmit.scala   |  2 +-
 .../spark/deploy/SparkSubmitArguments.scala     |  2 +-
 .../apache/spark/deploy/client/AppClient.scala  |  2 +-
 .../deploy/history/FsHistoryProvider.scala      |  4 +-
 .../spark/deploy/history/HistoryServer.scala    | 10 +--
 .../master/FileSystemPersistenceEngine.scala    |  2 +-
 .../org/apache/spark/deploy/master/Master.scala |  2 +-
 .../deploy/master/RecoveryModeFactory.scala     | 15 +++-
 .../apache/spark/deploy/master/WorkerInfo.scala |  2 +-
 .../master/ZooKeeperPersistenceEngine.scala     |  2 +-
 .../spark/deploy/master/ui/MasterPage.scala     |  2 +-
 .../spark/deploy/worker/DriverRunner.scala      | 22 +++--
 .../org/apache/spark/deploy/worker/Worker.scala |  2 +-
 .../spark/deploy/worker/WorkerWatcher.scala     |  2 +-
 .../executor/CoarseGrainedExecutorBackend.scala |  2 +-
 .../apache/spark/executor/ExecutorActor.scala   |  2 +-
 .../org/apache/spark/executor/TaskMetrics.scala | 90 ++++++++++----------
 .../apache/spark/input/PortableDataStream.scala | 17 ++--
 .../spark/mapred/SparkHadoopMapRedUtil.scala    |  2 +-
 .../mapreduce/SparkHadoopMapReduceUtil.scala    |  2 +-
 .../apache/spark/metrics/MetricsSystem.scala    |  3 +-
 .../spark/metrics/sink/MetricsServlet.scala     | 18 ++--
 .../org/apache/spark/metrics/sink/Sink.scala    |  4 +-
 .../spark/network/nio/BlockMessageArray.scala   |  6 +-
 .../spark/network/nio/BufferMessage.scala       | 10 +--
 .../apache/spark/network/nio/Connection.scala   |  8 +-
 .../apache/spark/network/nio/ConnectionId.scala |  4 +-
 .../spark/network/nio/ConnectionManager.scala   |  2 +-
 .../spark/network/nio/ConnectionManagerId.scala |  2 +-
 .../org/apache/spark/network/nio/Message.scala  |  6 +-
 .../apache/spark/network/nio/MessageChunk.scala |  6 +-
 .../spark/network/nio/MessageChunkHeader.scala  |  4 +-
 .../apache/spark/partial/PartialResult.scala    |  2 +-
 .../org/apache/spark/rdd/CartesianRDD.scala     |  2 +-
 .../org/apache/spark/rdd/CoalescedRDD.scala     | 10 +--
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  | 13 ++-
 .../scala/org/apache/spark/rdd/JdbcRDD.scala    | 10 ++-
 .../org/apache/spark/rdd/MapPartitionsRDD.scala |  2 +-
 .../org/apache/spark/rdd/NewHadoopRDD.scala     |  2 +-
 .../spark/rdd/ParallelCollectionRDD.scala       |  2 +-
 .../apache/spark/rdd/PartitionPruningRDD.scala  | 10 ++-
 .../scala/org/apache/spark/rdd/PipedRDD.scala   |  8 +-
 .../main/scala/org/apache/spark/rdd/RDD.scala   | 12 +--
 .../org/apache/spark/rdd/ShuffledRDD.scala      |  4 +-
 .../org/apache/spark/rdd/SubtractedRDD.scala    |  2 +-
 .../scala/org/apache/spark/rdd/UnionRDD.scala   |  2 +-
 .../apache/spark/rdd/ZippedPartitionsRDD.scala  |  2 +-
 .../spark/scheduler/AccumulableInfo.scala       |  7 +-
 .../apache/spark/scheduler/DAGScheduler.scala   |  2 +-
 .../spark/scheduler/EventLoggingListener.scala  | 61 +++++++------
 .../org/apache/spark/scheduler/JobLogger.scala  |  2 +-
 .../org/apache/spark/scheduler/JobWaiter.scala  |  2 +-
 .../scheduler/OutputCommitCoordinator.scala     |  2 +-
 .../org/apache/spark/scheduler/ResultTask.scala |  2 +-
 .../apache/spark/scheduler/ShuffleMapTask.scala |  4 +-
 .../apache/spark/scheduler/SparkListener.scala  |  4 +-
 .../org/apache/spark/scheduler/Stage.scala      |  2 +-
 .../apache/spark/scheduler/TaskLocation.scala   | 19 +++--
 .../spark/scheduler/TaskSchedulerImpl.scala     | 16 ++--
 .../apache/spark/scheduler/TaskSetManager.scala | 13 +--
 .../cluster/CoarseGrainedSchedulerBackend.scala |  2 +-
 .../cluster/YarnSchedulerBackend.scala          |  2 +-
 .../scheduler/cluster/mesos/MemoryUtils.scala   |  2 +-
 .../cluster/mesos/MesosSchedulerBackend.scala   |  2 +-
 .../spark/scheduler/local/LocalBackend.scala    |  4 +-
 .../spark/serializer/JavaSerializer.scala       |  5 +-
 .../spark/serializer/KryoSerializer.scala       |  2 +-
 .../spark/shuffle/FileShuffleBlockManager.scala |  4 +-
 .../shuffle/IndexShuffleBlockManager.scala      |  4 +-
 .../org/apache/spark/storage/BlockId.scala      | 34 ++++----
 .../apache/spark/storage/BlockManagerId.scala   |  8 +-
 .../spark/storage/BlockManagerMaster.scala      |  2 +-
 .../spark/storage/BlockManagerMasterActor.scala |  4 +-
 .../spark/storage/BlockManagerSlaveActor.scala  |  2 +-
 .../spark/storage/BlockObjectWriter.scala       | 17 ++--
 .../org/apache/spark/storage/FileSegment.scala  |  4 +-
 .../org/apache/spark/storage/RDDInfo.scala      |  4 +-
 .../org/apache/spark/storage/StorageLevel.scala | 16 ++--
 .../spark/storage/StorageStatusListener.scala   |  7 +-
 .../spark/storage/TachyonFileSegment.scala      |  4 +-
 .../scala/org/apache/spark/ui/SparkUI.scala     |  2 +-
 .../scala/org/apache/spark/ui/UIUtils.scala     |  6 +-
 .../apache/spark/ui/UIWorkloadGenerator.scala   |  4 +-
 .../org/apache/spark/ui/exec/ExecutorsTab.scala | 10 +--
 .../spark/ui/jobs/JobProgressListener.scala     | 14 +--
 .../org/apache/spark/ui/jobs/JobsTab.scala      |  2 +-
 .../org/apache/spark/ui/jobs/StagePage.scala    | 10 ++-
 .../org/apache/spark/ui/jobs/StagesTab.scala    |  6 +-
 .../scala/org/apache/spark/ui/jobs/UIData.scala | 10 +--
 .../apache/spark/ui/storage/StorageTab.scala    | 12 +--
 .../apache/spark/util/CompletionIterator.scala  | 10 +--
 .../org/apache/spark/util/Distribution.scala    |  2 +-
 .../org/apache/spark/util/ManualClock.scala     | 30 +++----
 .../org/apache/spark/util/MetadataCleaner.scala |  7 +-
 .../org/apache/spark/util/MutablePair.scala     |  2 +-
 .../apache/spark/util/ParentClassLoader.scala   |  2 +-
 .../apache/spark/util/SerializableBuffer.scala  |  2 +-
 .../org/apache/spark/util/StatCounter.scala     |  4 +-
 .../util/TimeStampedWeakValueHashMap.scala      |  6 +-
 .../scala/org/apache/spark/util/Utils.scala     | 30 +++----
 .../apache/spark/util/collection/BitSet.scala   |  4 +-
 .../util/collection/ExternalAppendOnlyMap.scala |  4 +-
 .../spark/util/collection/ExternalSorter.scala  |  2 +-
 .../spark/util/collection/OpenHashMap.scala     |  6 +-
 .../spark/util/collection/OpenHashSet.scala     |  4 +-
 .../collection/PrimitiveKeyOpenHashMap.scala    |  8 +-
 .../apache/spark/util/collection/Utils.scala    |  2 +-
 .../spark/util/logging/FileAppender.scala       |  4 +-
 .../spark/util/random/RandomSampler.scala       | 44 +++++-----
 .../util/random/StratifiedSamplingUtils.scala   |  2 +-
 .../spark/util/random/XORShiftRandom.scala      |  2 +-
 project/MimaExcludes.scala                      |  8 +-
 scalastyle-config.xml                           |  4 +-
 142 files changed, 597 insertions(+), 526 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/Accumulators.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala 
b/core/src/main/scala/org/apache/spark/Accumulators.scala
index bcf8324..330df1d 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulators.scala
@@ -18,8 +18,6 @@
 package org.apache.spark
 
 import java.io.{ObjectInputStream, Serializable}
-import java.util.concurrent.atomic.AtomicLong
-import java.lang.ThreadLocal
 
 import scala.collection.generic.Growable
 import scala.collection.mutable.Map
@@ -109,7 +107,7 @@ class Accumulable[R, T] (
    * The typical use of this method is to directly mutate the local value, 
eg., to add
    * an element to a Set.
    */
-  def localValue = value_
+  def localValue: R = value_
 
   /**
    * Set the accumulator's value; only allowed on master.
@@ -137,7 +135,7 @@ class Accumulable[R, T] (
     Accumulators.register(this, false)
   }
 
-  override def toString = if (value_ == null) "null" else value_.toString
+  override def toString: String = if (value_ == null) "null" else 
value_.toString
 }
 
 /**
@@ -257,22 +255,22 @@ object AccumulatorParam {
 
   implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
     def addInPlace(t1: Double, t2: Double): Double = t1 + t2
-    def zero(initialValue: Double) = 0.0
+    def zero(initialValue: Double): Double = 0.0
   }
 
   implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
     def addInPlace(t1: Int, t2: Int): Int = t1 + t2
-    def zero(initialValue: Int) = 0
+    def zero(initialValue: Int): Int = 0
   }
 
   implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
-    def addInPlace(t1: Long, t2: Long) = t1 + t2
-    def zero(initialValue: Long) = 0L
+    def addInPlace(t1: Long, t2: Long): Long = t1 + t2
+    def zero(initialValue: Long): Long = 0L
   }
 
   implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
-    def addInPlace(t1: Float, t2: Float) = t1 + t2
-    def zero(initialValue: Float) = 0f
+    def addInPlace(t1: Float, t2: Float): Float = t1 + t2
+    def zero(initialValue: Float): Float = 0f
   }
 
   // TODO: Add AccumulatorParams for other types, e.g. lists and strings
@@ -351,6 +349,7 @@ private[spark] object Accumulators extends Logging {
     }
   }
 
-  def stringifyPartialValue(partialValue: Any) = "%s".format(partialValue)
-  def stringifyValue(value: Any) = "%s".format(value)
+  def stringifyPartialValue(partialValue: Any): String = 
"%s".format(partialValue)
+
+  def stringifyValue(value: Any): String = "%s".format(value)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/Dependency.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala 
b/core/src/main/scala/org/apache/spark/Dependency.scala
index 9a7cd45..fc8cdde 100644
--- a/core/src/main/scala/org/apache/spark/Dependency.scala
+++ b/core/src/main/scala/org/apache/spark/Dependency.scala
@@ -74,7 +74,7 @@ class ShuffleDependency[K, V, C](
     val mapSideCombine: Boolean = false)
   extends Dependency[Product2[K, V]] {
 
-  override def rdd = _rdd.asInstanceOf[RDD[Product2[K, V]]]
+  override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, 
V]]]
 
   val shuffleId: Int = _rdd.context.newShuffleId()
 
@@ -91,7 +91,7 @@ class ShuffleDependency[K, V, C](
  */
 @DeveloperApi
 class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
-  override def getParents(partitionId: Int) = List(partitionId)
+  override def getParents(partitionId: Int): List[Int] = List(partitionId)
 }
 
 
@@ -107,7 +107,7 @@ class OneToOneDependency[T](rdd: RDD[T]) extends 
NarrowDependency[T](rdd) {
 class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
   extends NarrowDependency[T](rdd) {
 
-  override def getParents(partitionId: Int) = {
+  override def getParents(partitionId: Int): List[Int] = {
     if (partitionId >= outStart && partitionId < outStart + length) {
       List(partitionId - outStart + inStart)
     } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/FutureAction.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala 
b/core/src/main/scala/org/apache/spark/FutureAction.scala
index e97a737..91f9ef8 100644
--- a/core/src/main/scala/org/apache/spark/FutureAction.scala
+++ b/core/src/main/scala/org/apache/spark/FutureAction.scala
@@ -168,7 +168,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: 
JobWaiter[_], resultFunc:
     }
   }
 
-  def jobIds = Seq(jobWaiter.jobId)
+  def jobIds: Seq[Int] = Seq(jobWaiter.jobId)
 }
 
 
@@ -276,7 +276,7 @@ class ComplexFutureAction[T] extends FutureAction[T] {
 
   override def value: Option[Try[T]] = p.future.value
 
-  def jobIds = jobs
+  def jobIds: Seq[Int] = jobs
 
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala 
b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index 69178da..715f292 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -65,7 +65,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, 
scheduler: TaskSchedule
     super.preStart()
   }
   
-  override def receiveWithLogging = {
+  override def receiveWithLogging: PartialFunction[Any, Unit] = {
     case Heartbeat(executorId, taskMetrics, blockManagerId) =>
       val unknownExecutor = !scheduler.executorHeartbeatReceived(
         executorId, taskMetrics, blockManagerId)

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala 
b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 6e4edc7..c9426c5 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -43,7 +43,7 @@ private[spark] class MapOutputTrackerMasterActor(tracker: 
MapOutputTrackerMaster
   extends Actor with ActorLogReceive with Logging {
   val maxAkkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
 
-  override def receiveWithLogging = {
+  override def receiveWithLogging: PartialFunction[Any, Unit] = {
     case GetMapOutputStatuses(shuffleId: Int) =>
       val hostPort = sender.path.address.hostPort
       logInfo("Asked to send map output locations for shuffle " + shuffleId + 
" to " + hostPort)

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/Partitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala 
b/core/src/main/scala/org/apache/spark/Partitioner.scala
index e53a78e..b8d2444 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -76,7 +76,7 @@ object Partitioner {
  * produce an unexpected or incorrect result.
  */
 class HashPartitioner(partitions: Int) extends Partitioner {
-  def numPartitions = partitions
+  def numPartitions: Int = partitions
 
   def getPartition(key: Any): Int = key match {
     case null => 0
@@ -154,7 +154,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
     }
   }
 
-  def numPartitions = rangeBounds.length + 1
+  def numPartitions: Int = rangeBounds.length + 1
 
   private var binarySearch: ((Array[K], K) => Int) = 
CollectionsUtils.makeBinarySearch[K]
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/SerializableWritable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SerializableWritable.scala 
b/core/src/main/scala/org/apache/spark/SerializableWritable.scala
index 55cb259..cb2cae1 100644
--- a/core/src/main/scala/org/apache/spark/SerializableWritable.scala
+++ b/core/src/main/scala/org/apache/spark/SerializableWritable.scala
@@ -28,8 +28,10 @@ import org.apache.spark.util.Utils
 
 @DeveloperApi
 class SerializableWritable[T <: Writable](@transient var t: T) extends 
Serializable {
-  def value = t
-  override def toString = t.toString
+
+  def value: T = t
+
+  override def toString: String = t.toString
 
   private def writeObject(out: ObjectOutputStream): Unit = 
Utils.tryOrIOException {
     out.defaultWriteObject()

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala 
b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 2ca19f5..0c123c9 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -133,7 +133,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable 
with Logging {
   }
 
   /** Set multiple parameters together */
-  def setAll(settings: Traversable[(String, String)]) = {
+  def setAll(settings: Traversable[(String, String)]): SparkConf = {
     this.settings.putAll(settings.toMap.asJava)
     this
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 228ff71..a70be16 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -986,7 +986,7 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
     union(Seq(first) ++ rest)
 
   /** Get an RDD that has no partitions or elements. */
-  def emptyRDD[T: ClassTag] = new EmptyRDD[T](this)
+  def emptyRDD[T: ClassTag]: EmptyRDD[T] = new EmptyRDD[T](this)
 
   // Methods for creating shared variables
 
@@ -994,7 +994,7 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
    * Create an [[org.apache.spark.Accumulator]] variable of a given type, 
which tasks can "add"
    * values to using the `+=` method. Only the driver can access the 
accumulator's `value`.
    */
-  def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]) =
+  def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]): 
Accumulator[T] =
   {
     val acc = new Accumulator(initialValue, param)
     cleaner.foreach(_.registerAccumulatorForCleanup(acc))
@@ -1006,7 +1006,8 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
    * in the Spark UI. Tasks can "add" values to the accumulator using the `+=` 
method. Only the
    * driver can access the accumulator's `value`.
    */
-  def accumulator[T](initialValue: T, name: String)(implicit param: 
AccumulatorParam[T]) = {
+  def accumulator[T](initialValue: T, name: String)(implicit param: 
AccumulatorParam[T])
+    : Accumulator[T] = {
     val acc = new Accumulator(initialValue, param, Some(name))
     cleaner.foreach(_.registerAccumulatorForCleanup(acc))
     acc
@@ -1018,7 +1019,8 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
    * @tparam R accumulator result type
    * @tparam T type that can be added to the accumulator
    */
-  def accumulable[R, T](initialValue: R)(implicit param: AccumulableParam[R, 
T]) = {
+  def accumulable[R, T](initialValue: R)(implicit param: AccumulableParam[R, 
T])
+    : Accumulable[R, T] = {
     val acc = new Accumulable(initialValue, param)
     cleaner.foreach(_.registerAccumulatorForCleanup(acc))
     acc
@@ -1031,7 +1033,8 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
    * @tparam R accumulator result type
    * @tparam T type that can be added to the accumulator
    */
-  def accumulable[R, T](initialValue: R, name: String)(implicit param: 
AccumulableParam[R, T]) = {
+  def accumulable[R, T](initialValue: R, name: String)(implicit param: 
AccumulableParam[R, T])
+    : Accumulable[R, T] = {
     val acc = new Accumulable(initialValue, param, Some(name))
     cleaner.foreach(_.registerAccumulatorForCleanup(acc))
     acc
@@ -1209,7 +1212,7 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
   override def killExecutor(executorId: String): Boolean = 
super.killExecutor(executorId)
 
   /** The version of Spark on which this application is running. */
-  def version = SPARK_VERSION
+  def version: String = SPARK_VERSION
 
   /**
    * Return a map from the slave to the max memory available for caching and 
the remaining
@@ -1659,7 +1662,7 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
     }
   }
 
-  def getCheckpointDir = checkpointDir
+  def getCheckpointDir: Option[String] = checkpointDir
 
   /** Default level of parallelism to use when not given by user (e.g. 
parallelize and makeRDD). */
   def defaultParallelism: Int = {
@@ -1900,28 +1903,28 @@ object SparkContext extends Logging {
     "backward compatibility.", "1.3.0")
   object DoubleAccumulatorParam extends AccumulatorParam[Double] {
     def addInPlace(t1: Double, t2: Double): Double = t1 + t2
-    def zero(initialValue: Double) = 0.0
+    def zero(initialValue: Double): Double = 0.0
   }
 
   @deprecated("Replaced by implicit objects in AccumulatorParam. This is kept 
here only for " +
     "backward compatibility.", "1.3.0")
   object IntAccumulatorParam extends AccumulatorParam[Int] {
     def addInPlace(t1: Int, t2: Int): Int = t1 + t2
-    def zero(initialValue: Int) = 0
+    def zero(initialValue: Int): Int = 0
   }
 
   @deprecated("Replaced by implicit objects in AccumulatorParam. This is kept 
here only for " +
     "backward compatibility.", "1.3.0")
   object LongAccumulatorParam extends AccumulatorParam[Long] {
-    def addInPlace(t1: Long, t2: Long) = t1 + t2
-    def zero(initialValue: Long) = 0L
+    def addInPlace(t1: Long, t2: Long): Long = t1 + t2
+    def zero(initialValue: Long): Long = 0L
   }
 
   @deprecated("Replaced by implicit objects in AccumulatorParam. This is kept 
here only for " +
     "backward compatibility.", "1.3.0")
   object FloatAccumulatorParam extends AccumulatorParam[Float] {
-    def addInPlace(t1: Float, t2: Float) = t1 + t2
-    def zero(initialValue: Float) = 0f
+    def addInPlace(t1: Float, t2: Float): Float = t1 + t2
+    def zero(initialValue: Float): Float = 0f
   }
 
   // The following deprecated functions have already been moved to `object 
RDD` to
@@ -1931,18 +1934,18 @@ object SparkContext extends Logging {
   @deprecated("Replaced by implicit functions in the RDD companion object. 
This is " +
     "kept here only for backward compatibility.", "1.3.0")
   def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
-      (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = {
+      (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): 
PairRDDFunctions[K, V] =
     RDD.rddToPairRDDFunctions(rdd)
-  }
 
   @deprecated("Replaced by implicit functions in the RDD companion object. 
This is " +
     "kept here only for backward compatibility.", "1.3.0")
-  def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = 
RDD.rddToAsyncRDDActions(rdd)
+  def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]): AsyncRDDActions[T] =
+    RDD.rddToAsyncRDDActions(rdd)
 
   @deprecated("Replaced by implicit functions in the RDD companion object. 
This is " +
     "kept here only for backward compatibility.", "1.3.0")
   def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: 
ClassTag](
-      rdd: RDD[(K, V)]) = {
+      rdd: RDD[(K, V)]): SequenceFileRDDFunctions[K, V] = {
     val kf = implicitly[K => Writable]
     val vf = implicitly[V => Writable]
     // Set the Writable class to null and `SequenceFileRDDFunctions` will use 
Reflection to get it
@@ -1954,16 +1957,17 @@ object SparkContext extends Logging {
   @deprecated("Replaced by implicit functions in the RDD companion object. 
This is " +
     "kept here only for backward compatibility.", "1.3.0")
   def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](
-      rdd: RDD[(K, V)]) =
+      rdd: RDD[(K, V)]): OrderedRDDFunctions[K, V, (K, V)] =
     RDD.rddToOrderedRDDFunctions(rdd)
 
   @deprecated("Replaced by implicit functions in the RDD companion object. 
This is " +
     "kept here only for backward compatibility.", "1.3.0")
-  def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]) = 
RDD.doubleRDDToDoubleRDDFunctions(rdd)
+  def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]): DoubleRDDFunctions =
+    RDD.doubleRDDToDoubleRDDFunctions(rdd)
 
   @deprecated("Replaced by implicit functions in the RDD companion object. 
This is " +
     "kept here only for backward compatibility.", "1.3.0")
-  def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]) 
=
+  def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: 
Numeric[T]): DoubleRDDFunctions =
     RDD.numericRDDToDoubleRDDFunctions(rdd)
 
   // The following deprecated functions have already been moved to `object 
WritableFactory` to
@@ -2134,7 +2138,7 @@ object SparkContext extends Logging {
         (backend, scheduler)
 
       case LOCAL_N_REGEX(threads) =>
-        def localCpuCount = Runtime.getRuntime.availableProcessors()
+        def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
         // local[*] estimates the number of cores on the machine; local[N] 
uses exactly N threads.
         val threadCount = if (threads == "*") localCpuCount else threads.toInt
         if (threadCount <= 0) {
@@ -2146,7 +2150,7 @@ object SparkContext extends Logging {
         (backend, scheduler)
 
       case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
-        def localCpuCount = Runtime.getRuntime.availableProcessors()
+        def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
         // local[*, M] means the number of cores on the computer with M 
failures
         // local[N, M] means exactly N threads with M failures
         val threadCount = if (threads == "*") localCpuCount else threads.toInt

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/TaskState.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskState.scala 
b/core/src/main/scala/org/apache/spark/TaskState.scala
index c415fe9..fe19f07 100644
--- a/core/src/main/scala/org/apache/spark/TaskState.scala
+++ b/core/src/main/scala/org/apache/spark/TaskState.scala
@@ -27,9 +27,9 @@ private[spark] object TaskState extends Enumeration {
 
   type TaskState = Value
 
-  def isFailed(state: TaskState) = (LOST == state) || (FAILED == state)
+  def isFailed(state: TaskState): Boolean = (LOST == state) || (FAILED == 
state)
 
-  def isFinished(state: TaskState) = FINISHED_STATES.contains(state)
+  def isFinished(state: TaskState): Boolean = FINISHED_STATES.contains(state)
 
   def toMesos(state: TaskState): MesosTaskState = state match {
     case LAUNCHING => MesosTaskState.TASK_STARTING

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala 
b/core/src/main/scala/org/apache/spark/TestUtils.scala
index 35b324b..398ca41 100644
--- a/core/src/main/scala/org/apache/spark/TestUtils.scala
+++ b/core/src/main/scala/org/apache/spark/TestUtils.scala
@@ -107,7 +107,7 @@ private[spark] object TestUtils {
 
   private class JavaSourceFromString(val name: String, val code: String)
     extends SimpleJavaFileObject(createURI(name), SOURCE) {
-    override def getCharContent(ignoreEncodingErrors: Boolean) = code
+    override def getCharContent(ignoreEncodingErrors: Boolean): String = code
   }
 
   /** Creates a compiled class with the given name. Class file will be placed 
in destDir. */

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala 
b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
index 3e9beb6..18ccd62 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
@@ -179,7 +179,7 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: 
ClassTag[T])
   def subtract(other: JavaRDD[T], p: Partitioner): JavaRDD[T] =
     wrapRDD(rdd.subtract(other, p))
 
-  override def toString = rdd.toString
+  override def toString: String = rdd.toString
 
   /** Assign a name to this RDD */
   def setName(name: String): JavaRDD[T] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala 
b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 6d6ed69..3be6783 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -108,7 +108,7 @@ class JavaSparkContext(val sc: SparkContext)
 
   private[spark] val env = sc.env
 
-  def statusTracker = new JavaSparkStatusTracker(sc)
+  def statusTracker: JavaSparkStatusTracker = new JavaSparkStatusTracker(sc)
 
   def isLocal: java.lang.Boolean = sc.isLocal
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala 
b/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala
index 71b2673..8f9647e 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.api.java
 
+import java.util.Map.Entry
+
 import com.google.common.base.Optional
 
 import java.{util => ju}
@@ -30,8 +32,8 @@ private[spark] object JavaUtils {
     }
 
   // Workaround for SPARK-3926 / SI-8911
-  def mapAsSerializableJavaMap[A, B](underlying: collection.Map[A, B]) =
-    new SerializableMapWrapper(underlying)
+  def mapAsSerializableJavaMap[A, B](underlying: collection.Map[A, B]): 
SerializableMapWrapper[A, B]
+    = new SerializableMapWrapper(underlying)
 
   // Implementation is copied from 
scala.collection.convert.Wrappers.MapWrapper,
   // but implements java.io.Serializable. It can't just be subclassed to make 
it
@@ -40,36 +42,33 @@ private[spark] object JavaUtils {
   class SerializableMapWrapper[A, B](underlying: collection.Map[A, B])
     extends ju.AbstractMap[A, B] with java.io.Serializable { self =>
 
-    override def size = underlying.size
+    override def size: Int = underlying.size
 
     override def get(key: AnyRef): B = try {
-      underlying get key.asInstanceOf[A] match {
-        case None => null.asInstanceOf[B]
-        case Some(v) => v
-      }
+      underlying.getOrElse(key.asInstanceOf[A], null.asInstanceOf[B])
     } catch {
       case ex: ClassCastException => null.asInstanceOf[B]
     }
 
     override def entrySet: ju.Set[ju.Map.Entry[A, B]] = new 
ju.AbstractSet[ju.Map.Entry[A, B]] {
-      def size = self.size
+      override def size: Int = self.size
 
-      def iterator = new ju.Iterator[ju.Map.Entry[A, B]] {
+      override def iterator: ju.Iterator[ju.Map.Entry[A, B]] = new 
ju.Iterator[ju.Map.Entry[A, B]] {
         val ui = underlying.iterator
         var prev : Option[A] = None
 
-        def hasNext = ui.hasNext
+        def hasNext: Boolean = ui.hasNext
 
-        def next() = {
-          val (k, v) = ui.next
+        def next(): Entry[A, B] = {
+          val (k, v) = ui.next()
           prev = Some(k)
           new ju.Map.Entry[A, B] {
             import scala.util.hashing.byteswap32
-            def getKey = k
-            def getValue = v
-            def setValue(v1 : B) = self.put(k, v1)
-            override def hashCode = byteswap32(k.hashCode) + 
(byteswap32(v.hashCode) << 16)
-            override def equals(other: Any) = other match {
+            override def getKey: A = k
+            override def getValue: B = v
+            override def setValue(v1 : B): B = self.put(k, v1)
+            override def hashCode: Int = byteswap32(k.hashCode) + 
(byteswap32(v.hashCode) << 16)
+            override def equals(other: Any): Boolean = other match {
               case e: ju.Map.Entry[_, _] => k == e.getKey && v == e.getValue
               case _ => false
             }
@@ -81,7 +80,7 @@ private[spark] object JavaUtils {
             case Some(k) =>
               underlying match {
                 case mm: mutable.Map[A, _] =>
-                  mm remove k
+                  mm.remove(k)
                   prev = None
                 case _ =>
                   throw new UnsupportedOperationException("remove")

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 4c71b69..19f4c95 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -54,9 +54,11 @@ private[spark] class PythonRDD(
   val bufferSize = conf.getInt("spark.buffer.size", 65536)
   val reuse_worker = conf.getBoolean("spark.python.worker.reuse", true)
 
-  override def getPartitions = firstParent.partitions
+  override def getPartitions: Array[Partition] = firstParent.partitions
 
-  override val partitioner = if (preservePartitoning) firstParent.partitioner 
else None
+  override val partitioner: Option[Partitioner] = {
+    if (preservePartitoning) firstParent.partitioner else None
+  }
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[Array[Byte]] = {
     val startTime = System.currentTimeMillis
@@ -92,7 +94,7 @@ private[spark] class PythonRDD(
     // Return an iterator that read lines from the process's stdout
     val stream = new DataInputStream(new 
BufferedInputStream(worker.getInputStream, bufferSize))
     val stdoutIterator = new Iterator[Array[Byte]] {
-      def next(): Array[Byte] = {
+      override def next(): Array[Byte] = {
         val obj = _nextObj
         if (hasNext) {
           _nextObj = read()
@@ -175,7 +177,7 @@ private[spark] class PythonRDD(
 
       var _nextObj = read()
 
-      def hasNext = _nextObj != null
+      override def hasNext: Boolean = _nextObj != null
     }
     new InterruptibleIterator(context, stdoutIterator)
   }
@@ -303,11 +305,10 @@ private class PythonException(msg: String, cause: 
Exception) extends RuntimeExce
  * Form an RDD[(Array[Byte], Array[Byte])] from key-value pairs returned from 
Python.
  * This is used by PySpark's shuffle operations.
  */
-private class PairwiseRDD(prev: RDD[Array[Byte]]) extends
-  RDD[(Long, Array[Byte])](prev) {
-  override def getPartitions = prev.partitions
-  override val partitioner = prev.partitioner
-  override def compute(split: Partition, context: TaskContext) =
+private class PairwiseRDD(prev: RDD[Array[Byte]]) extends RDD[(Long, 
Array[Byte])](prev) {
+  override def getPartitions: Array[Partition] = prev.partitions
+  override val partitioner: Option[Partitioner] = prev.partitioner
+  override def compute(split: Partition, context: TaskContext): 
Iterator[(Long, Array[Byte])] =
     prev.iterator(split, context).grouped(2).map {
       case Seq(a, b) => (Utils.deserializeLongValue(a), b)
       case x => throw new SparkException("PairwiseRDD: unexpected value: " + x)
@@ -435,7 +436,7 @@ private[spark] object PythonRDD extends Logging {
       keyConverterClass: String,
       valueConverterClass: String,
       minSplits: Int,
-      batchSize: Int) = {
+      batchSize: Int): JavaRDD[Array[Byte]] = {
     val keyClass = 
Option(keyClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
     val valueClass = 
Option(valueClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
     val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
@@ -462,7 +463,7 @@ private[spark] object PythonRDD extends Logging {
       keyConverterClass: String,
       valueConverterClass: String,
       confAsMap: java.util.HashMap[String, String],
-      batchSize: Int) = {
+      batchSize: Int): JavaRDD[Array[Byte]] = {
     val mergedConf = getMergedConf(confAsMap, sc.hadoopConfiguration())
     val rdd =
       newAPIHadoopRDDFromClassNames[K, V, F](sc,
@@ -488,7 +489,7 @@ private[spark] object PythonRDD extends Logging {
       keyConverterClass: String,
       valueConverterClass: String,
       confAsMap: java.util.HashMap[String, String],
-      batchSize: Int) = {
+      batchSize: Int): JavaRDD[Array[Byte]] = {
     val conf = PythonHadoopUtil.mapToConf(confAsMap)
     val rdd =
       newAPIHadoopRDDFromClassNames[K, V, F](sc,
@@ -505,7 +506,7 @@ private[spark] object PythonRDD extends Logging {
       inputFormatClass: String,
       keyClass: String,
       valueClass: String,
-      conf: Configuration) = {
+      conf: Configuration): RDD[(K, V)] = {
     val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
     val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
     val fc = Utils.classForName(inputFormatClass).asInstanceOf[Class[F]]
@@ -531,7 +532,7 @@ private[spark] object PythonRDD extends Logging {
       keyConverterClass: String,
       valueConverterClass: String,
       confAsMap: java.util.HashMap[String, String],
-      batchSize: Int) = {
+      batchSize: Int): JavaRDD[Array[Byte]] = {
     val mergedConf = getMergedConf(confAsMap, sc.hadoopConfiguration())
     val rdd =
       hadoopRDDFromClassNames[K, V, F](sc,
@@ -557,7 +558,7 @@ private[spark] object PythonRDD extends Logging {
       keyConverterClass: String,
       valueConverterClass: String,
       confAsMap: java.util.HashMap[String, String],
-      batchSize: Int) = {
+      batchSize: Int): JavaRDD[Array[Byte]] = {
     val conf = PythonHadoopUtil.mapToConf(confAsMap)
     val rdd =
       hadoopRDDFromClassNames[K, V, F](sc,
@@ -686,7 +687,7 @@ private[spark] object PythonRDD extends Logging {
       pyRDD: JavaRDD[Array[Byte]],
       batchSerialized: Boolean,
       path: String,
-      compressionCodecClass: String) = {
+      compressionCodecClass: String): Unit = {
     saveAsHadoopFile(
       pyRDD, batchSerialized, path, 
"org.apache.hadoop.mapred.SequenceFileOutputFormat",
       null, null, null, null, new java.util.HashMap(), compressionCodecClass)
@@ -711,7 +712,7 @@ private[spark] object PythonRDD extends Logging {
       keyConverterClass: String,
       valueConverterClass: String,
       confAsMap: java.util.HashMap[String, String],
-      compressionCodecClass: String) = {
+      compressionCodecClass: String): Unit = {
     val rdd = SerDeUtil.pythonToPairRDD(pyRDD, batchSerialized)
     val (kc, vc) = getKeyValueTypes(keyClass, valueClass).getOrElse(
       inferKeyValueTypes(rdd, keyConverterClass, valueConverterClass))
@@ -741,7 +742,7 @@ private[spark] object PythonRDD extends Logging {
       valueClass: String,
       keyConverterClass: String,
       valueConverterClass: String,
-      confAsMap: java.util.HashMap[String, String]) = {
+      confAsMap: java.util.HashMap[String, String]): Unit = {
     val rdd = SerDeUtil.pythonToPairRDD(pyRDD, batchSerialized)
     val (kc, vc) = getKeyValueTypes(keyClass, valueClass).getOrElse(
       inferKeyValueTypes(rdd, keyConverterClass, valueConverterClass))
@@ -766,7 +767,7 @@ private[spark] object PythonRDD extends Logging {
       confAsMap: java.util.HashMap[String, String],
       keyConverterClass: String,
       valueConverterClass: String,
-      useNewAPI: Boolean) = {
+      useNewAPI: Boolean): Unit = {
     val conf = PythonHadoopUtil.mapToConf(confAsMap)
     val converted = convertRDD(SerDeUtil.pythonToPairRDD(pyRDD, 
batchSerialized),
       keyConverterClass, valueConverterClass, new JavaToWritableConverter)

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala 
b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
index fb52a96..257491e 100644
--- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
@@ -84,7 +84,7 @@ private[spark] object SerDeUtil extends Logging {
   private var initialized = false
   // This should be called before trying to unpickle array.array from Python
   // In cluster mode, this should be put in closure
-  def initialize() = {
+  def initialize(): Unit = {
     synchronized{
       if (!initialized) {
         Unpickler.registerConstructor("array", "array", new ArrayConstructor())

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
 
b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
index cf289fb..8f30ff9 100644
--- 
a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
+++ 
b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
@@ -18,38 +18,37 @@
 package org.apache.spark.api.python
 
 import java.io.{DataOutput, DataInput}
+import java.{util => ju}
 
 import com.google.common.base.Charsets.UTF_8
 
 import org.apache.hadoop.io._
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
+
+import org.apache.spark.SparkException
 import org.apache.spark.api.java.JavaSparkContext
-import org.apache.spark.{SparkContext, SparkException}
 
 /**
  * A class to test Pyrolite serialization on the Scala side, that will be 
deserialized
  * in Python
- * @param str
- * @param int
- * @param double
  */
 case class TestWritable(var str: String, var int: Int, var double: Double) 
extends Writable {
   def this() = this("", 0, 0.0)
 
-  def getStr = str
+  def getStr: String = str
   def setStr(str: String) { this.str = str }
-  def getInt = int
+  def getInt: Int = int
   def setInt(int: Int) { this.int = int }
-  def getDouble = double
+  def getDouble: Double = double
   def setDouble(double: Double) { this.double = double }
 
-  def write(out: DataOutput) = {
+  def write(out: DataOutput): Unit = {
     out.writeUTF(str)
     out.writeInt(int)
     out.writeDouble(double)
   }
 
-  def readFields(in: DataInput) = {
+  def readFields(in: DataInput): Unit = {
     str = in.readUTF()
     int = in.readInt()
     double = in.readDouble()
@@ -57,28 +56,28 @@ case class TestWritable(var str: String, var int: Int, var 
double: Double) exten
 }
 
 private[python] class TestInputKeyConverter extends Converter[Any, Any] {
-  override def convert(obj: Any) = {
+  override def convert(obj: Any): Char = {
     obj.asInstanceOf[IntWritable].get().toChar
   }
 }
 
 private[python] class TestInputValueConverter extends Converter[Any, Any] {
   import collection.JavaConversions._
-  override def convert(obj: Any) = {
+  override def convert(obj: Any): ju.List[Double] = {
     val m = obj.asInstanceOf[MapWritable]
     seqAsJavaList(m.keySet.map(w => 
w.asInstanceOf[DoubleWritable].get()).toSeq)
   }
 }
 
 private[python] class TestOutputKeyConverter extends Converter[Any, Any] {
-  override def convert(obj: Any) = {
+  override def convert(obj: Any): Text = {
     new Text(obj.asInstanceOf[Int].toString)
   }
 }
 
 private[python] class TestOutputValueConverter extends Converter[Any, Any] {
   import collection.JavaConversions._
-  override def convert(obj: Any) = {
+  override def convert(obj: Any): DoubleWritable = {
     new DoubleWritable(obj.asInstanceOf[java.util.Map[Double, 
_]].keySet().head)
   }
 }
@@ -86,7 +85,7 @@ private[python] class TestOutputValueConverter extends 
Converter[Any, Any] {
 private[python] class DoubleArrayWritable extends 
ArrayWritable(classOf[DoubleWritable])
 
 private[python] class DoubleArrayToWritableConverter extends Converter[Any, 
Writable] {
-  override def convert(obj: Any) = obj match {
+  override def convert(obj: Any): DoubleArrayWritable = obj match {
     case arr if arr.getClass.isArray && arr.getClass.getComponentType == 
classOf[Double] =>
       val daw = new DoubleArrayWritable
       daw.set(arr.asInstanceOf[Array[Double]].map(new DoubleWritable(_)))

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala 
b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
index a5ea478..12d79f6 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
@@ -146,5 +146,5 @@ abstract class Broadcast[T: ClassTag](val id: Long) extends 
Serializable with Lo
     }
   }
 
-  override def toString = "Broadcast(" + id + ")"
+  override def toString: String = "Broadcast(" + id + ")"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala 
b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala
index 8f8a0b1..685313a 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala
@@ -58,7 +58,7 @@ private[spark] class BroadcastManager(
 
   private val nextBroadcastId = new AtomicLong(0)
 
-  def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean) = {
+  def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T] = {
     broadcastFactory.newBroadcast[T](value_, isLocal, 
nextBroadcastId.getAndIncrement())
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala 
b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index 1444c0d..74ccfa6 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -160,7 +160,7 @@ private[broadcast] object HttpBroadcast extends Logging {
     logInfo("Broadcast server started at " + serverUri)
   }
 
-  def getFile(id: Long) = new File(broadcastDir, BroadcastBlockId(id).name)
+  def getFile(id: Long): File = new File(broadcastDir, 
BroadcastBlockId(id).name)
 
   private def write(id: Long, value: Any) {
     val file = getFile(id)
@@ -222,7 +222,7 @@ private[broadcast] object HttpBroadcast extends Logging {
    * If removeFromDriver is true, also remove these persisted blocks on the 
driver
    * and delete the associated broadcast file.
    */
-  def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = 
synchronized {
+  def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean): Unit 
= synchronized {
     SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, 
blocking)
     if (removeFromDriver) {
       val file = getFile(id)

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcastFactory.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcastFactory.scala 
b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcastFactory.scala
index c7ef02d..cf3ae36 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcastFactory.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcastFactory.scala
@@ -31,7 +31,7 @@ class HttpBroadcastFactory extends BroadcastFactory {
     HttpBroadcast.initialize(isDriver, conf, securityMgr)
   }
 
-  override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: 
Long) =
+  override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: 
Long): Broadcast[T] =
     new HttpBroadcast[T](value_, isLocal, id)
 
   override def stop() { HttpBroadcast.stop() }

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala 
b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 94142d3..23b02e6 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -222,7 +222,7 @@ private object TorrentBroadcast extends Logging {
    * Remove all persisted blocks associated with this torrent broadcast on the 
executors.
    * If removeFromDriver is true, also remove these persisted blocks on the 
driver.
    */
-  def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = {
+  def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean): Unit 
= {
     logDebug(s"Unpersisting TorrentBroadcast $id")
     SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, 
blocking)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala 
b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala
index fb024c1..96d8dd7 100644
--- 
a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala
+++ 
b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala
@@ -30,7 +30,7 @@ class TorrentBroadcastFactory extends BroadcastFactory {
 
   override def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: 
SecurityManager) { }
 
-  override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: 
Long) = {
+  override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: 
Long): Broadcast[T] = {
     new TorrentBroadcast[T](value_, id)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/deploy/Client.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala 
b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index 237d26f..65238af 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -38,7 +38,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: 
SparkConf)
   var masterActor: ActorSelection = _
   val timeout = AkkaUtils.askTimeout(conf)
 
-  override def preStart() = {
+  override def preStart(): Unit = {
     masterActor = context.actorSelection(
       Master.toAkkaUrl(driverArgs.master, AkkaUtils.protocol(context.system)))
 
@@ -118,7 +118,7 @@ private class ClientActor(driverArgs: ClientArguments, 
conf: SparkConf)
     }
   }
 
-  override def receiveWithLogging = {
+  override def receiveWithLogging: PartialFunction[Any, Unit] = {
 
     case SubmitDriverResponse(success, driverId, message) =>
       println(message)

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala 
b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
index 53bc62a..5cbac78 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala
@@ -42,7 +42,7 @@ private[deploy] class ClientArguments(args: Array[String]) {
   var memory: Int = DEFAULT_MEMORY
   var cores: Int = DEFAULT_CORES
   private var _driverOptions = ListBuffer[String]()
-  def driverOptions = _driverOptions.toSeq
+  def driverOptions: Seq[String] = _driverOptions.toSeq
 
   // kill parameters
   var driverId: String = ""

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala 
b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index 7f600d8..0997507 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -162,7 +162,7 @@ private[deploy] object DeployMessages {
     Utils.checkHost(host, "Required hostname")
     assert (port > 0)
 
-    def uri = "spark://" + host + ":" + port
+    def uri: String = "spark://" + host + ":" + port
     def restUri: Option[String] = restPort.map { p => "spark://" + host + ":" 
+ p }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala 
b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
index 5668b53..a7c8927 100644
--- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
@@ -426,7 +426,7 @@ private object SparkDocker {
 }
 
 private class DockerId(val id: String) {
-  override def toString = id
+  override def toString: String = id
 }
 
 private object Docker extends Logging {

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
index 458a7c3..dfc5b97 100644
--- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.deploy
 
+import org.json4s.JsonAST.JObject
 import org.json4s.JsonDSL._
 
 import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, 
WorkerStateResponse}
@@ -24,7 +25,7 @@ import org.apache.spark.deploy.master.{ApplicationInfo, 
DriverInfo, WorkerInfo}
 import org.apache.spark.deploy.worker.ExecutorRunner
 
 private[deploy] object JsonProtocol {
- def writeWorkerInfo(obj: WorkerInfo) = {
+ def writeWorkerInfo(obj: WorkerInfo): JObject = {
    ("id" -> obj.id) ~
    ("host" -> obj.host) ~
    ("port" -> obj.port) ~
@@ -39,7 +40,7 @@ private[deploy] object JsonProtocol {
    ("lastheartbeat" -> obj.lastHeartbeat)
  }
 
-  def writeApplicationInfo(obj: ApplicationInfo) = {
+  def writeApplicationInfo(obj: ApplicationInfo): JObject = {
     ("starttime" -> obj.startTime) ~
     ("id" -> obj.id) ~
     ("name" -> obj.desc.name) ~
@@ -51,7 +52,7 @@ private[deploy] object JsonProtocol {
     ("duration" -> obj.duration)
   }
 
-  def writeApplicationDescription(obj: ApplicationDescription) = {
+  def writeApplicationDescription(obj: ApplicationDescription): JObject = {
     ("name" -> obj.name) ~
     ("cores" -> obj.maxCores) ~
     ("memoryperslave" -> obj.memoryPerSlave) ~
@@ -59,14 +60,14 @@ private[deploy] object JsonProtocol {
     ("command" -> obj.command.toString)
   }
 
-  def writeExecutorRunner(obj: ExecutorRunner) = {
+  def writeExecutorRunner(obj: ExecutorRunner): JObject = {
     ("id" -> obj.execId) ~
     ("memory" -> obj.memory) ~
     ("appid" -> obj.appId) ~
     ("appdesc" -> writeApplicationDescription(obj.appDesc))
   }
 
-  def writeDriverInfo(obj: DriverInfo) = {
+  def writeDriverInfo(obj: DriverInfo): JObject = {
     ("id" -> obj.id) ~
     ("starttime" -> obj.startTime.toString) ~
     ("state" -> obj.state.toString) ~
@@ -74,7 +75,7 @@ private[deploy] object JsonProtocol {
     ("memory" -> obj.desc.mem)
   }
 
-  def writeMasterState(obj: MasterStateResponse) = {
+  def writeMasterState(obj: MasterStateResponse): JObject = {
     ("url" -> obj.uri) ~
     ("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~
     ("cores" -> obj.workers.map(_.cores).sum) ~
@@ -87,7 +88,7 @@ private[deploy] object JsonProtocol {
     ("status" -> obj.status.toString)
   }
 
-  def writeWorkerState(obj: WorkerStateResponse) = {
+  def writeWorkerState(obj: WorkerStateResponse): JObject = {
     ("id" -> obj.workerId) ~
     ("masterurl" -> obj.masterUrl) ~
     ("masterwebuiurl" -> obj.masterWebUiUrl) ~

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index e0a32fb..c2568eb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -193,7 +193,7 @@ class SparkHadoopUtil extends Logging {
    * that file.
    */
   def listLeafStatuses(fs: FileSystem, basePath: Path): Seq[FileStatus] = {
-    def recurse(path: Path) = {
+    def recurse(path: Path): Array[FileStatus] = {
       val (directories, leaves) = fs.listStatus(path).partition(_.isDir)
       leaves ++ directories.flatMap(f => listLeafStatuses(fs, f.getPath))
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 4f506be..660307d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -777,7 +777,7 @@ private[deploy] object SparkSubmitUtils {
   }
 
   /** A nice function to use in tests as well. Values are dummy strings. */
-  def getModuleDescriptor = DefaultModuleDescriptor.newDefaultInstance(
+  def getModuleDescriptor: DefaultModuleDescriptor = 
DefaultModuleDescriptor.newDefaultInstance(
     ModuleRevisionId.newInstance("org.apache.spark", "spark-submit-parent", 
"1.0"))
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 2250d5a..6eb73c4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -252,7 +252,7 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
     master.startsWith("spark://") && deployMode == "cluster"
   }
 
-  override def toString = {
+  override def toString: String = {
     s"""Parsed arguments:
     |  master                  $master
     |  deployMode              $deployMode

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala 
b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index 2d24083..3b72972 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -116,7 +116,7 @@ private[spark] class AppClient(
       
masterAkkaUrls.map(AddressFromURIString(_).hostPort).contains(remoteUrl.hostPort)
     }
 
-    override def receiveWithLogging = {
+    override def receiveWithLogging: PartialFunction[Any, Unit] = {
       case RegisteredApplication(appId_, masterUrl) =>
         appId = appId_
         registered = true

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index db7c499..80c9c13 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -93,7 +93,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) 
extends ApplicationHis
    */
   private def getRunner(operateFun: () => Unit): Runnable = {
     new Runnable() {
-      override def run() = Utils.tryOrExit {
+      override def run(): Unit = Utils.tryOrExit {
         operateFun()
       }
     }
@@ -141,7 +141,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) 
extends ApplicationHis
     }
   }
 
-  override def getListing() = applications.values
+  override def getListing(): Iterable[FsApplicationHistoryInfo] = 
applications.values
 
   override def getAppUI(appId: String): Option[SparkUI] = {
     try {

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index af483d5..72f6048 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -61,7 +61,7 @@ class HistoryServer(
   private val appCache = CacheBuilder.newBuilder()
     .maximumSize(retainedApplications)
     .removalListener(new RemovalListener[String, SparkUI] {
-      override def onRemoval(rm: RemovalNotification[String, SparkUI]) = {
+      override def onRemoval(rm: RemovalNotification[String, SparkUI]): Unit = 
{
         detachSparkUI(rm.getValue())
       }
     })
@@ -149,14 +149,14 @@ class HistoryServer(
    *
    * @return List of all known applications.
    */
-  def getApplicationList() = provider.getListing()
+  def getApplicationList(): Iterable[ApplicationHistoryInfo] = 
provider.getListing()
 
   /**
    * Returns the provider configuration to show in the listing page.
    *
    * @return A map with the provider's configuration.
    */
-  def getProviderConfig() = provider.getConfig()
+  def getProviderConfig(): Map[String, String] = provider.getConfig()
 
 }
 
@@ -195,9 +195,7 @@ object HistoryServer extends Logging {
     server.bind()
 
     Runtime.getRuntime().addShutdownHook(new Thread("HistoryServerStopper") {
-      override def run() = {
-        server.stop()
-      }
+      override def run(): Unit = server.stop()
     })
 
     // Wait until the end of the world... or if the HistoryServer process is 
manually stopped

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
 
b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
index d2d30bf..32499b3 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
@@ -48,7 +48,7 @@ private[master] class FileSystemPersistenceEngine(
     new File(dir + File.separator + name).delete()
   }
 
-  override def read[T: ClassTag](prefix: String) = {
+  override def read[T: ClassTag](prefix: String): Seq[T] = {
     val files = new File(dir).listFiles().filter(_.getName.startsWith(prefix))
     files.map(deserializeFromFile[T])
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 1b42121..8050662 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -204,7 +204,7 @@ private[master] class Master(
     self ! RevokedLeadership
   }
 
-  override def receiveWithLogging = {
+  override def receiveWithLogging: PartialFunction[Any, Unit] = {
     case ElectedLeader => {
       val (storedApps, storedDrivers, storedWorkers) = 
persistenceEngine.readPersistedData()
       state = if (storedApps.isEmpty && storedDrivers.isEmpty && 
storedWorkers.isEmpty) {

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala
index 1583bf1..351db8f 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryModeFactory.scala
@@ -51,20 +51,27 @@ abstract class StandaloneRecoveryModeFactory(conf: 
SparkConf, serializer: Serial
  */
 private[master] class FileSystemRecoveryModeFactory(conf: SparkConf, 
serializer: Serialization)
   extends StandaloneRecoveryModeFactory(conf, serializer) with Logging {
+
   val RECOVERY_DIR = conf.get("spark.deploy.recoveryDirectory", "")
 
-  def createPersistenceEngine() = {
+  def createPersistenceEngine(): PersistenceEngine = {
     logInfo("Persisting recovery state to directory: " + RECOVERY_DIR)
     new FileSystemPersistenceEngine(RECOVERY_DIR, serializer)
   }
 
-  def createLeaderElectionAgent(master: LeaderElectable) = new 
MonarchyLeaderAgent(master)
+  def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent 
= {
+    new MonarchyLeaderAgent(master)
+  }
 }
 
 private[master] class ZooKeeperRecoveryModeFactory(conf: SparkConf, 
serializer: Serialization)
   extends StandaloneRecoveryModeFactory(conf, serializer) {
-  def createPersistenceEngine() = new ZooKeeperPersistenceEngine(conf, 
serializer)
 
-  def createLeaderElectionAgent(master: LeaderElectable) =
+  def createPersistenceEngine(): PersistenceEngine = {
+    new ZooKeeperPersistenceEngine(conf, serializer)
+  }
+
+  def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent 
= {
     new ZooKeeperLeaderElectionAgent(master, conf)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
index e94aae9..9b3d48c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
@@ -104,7 +104,7 @@ private[spark] class WorkerInfo(
     "http://"; + this.publicAddress + ":" + this.webUiPort
   }
 
-  def setState(state: WorkerState.Value) = {
+  def setState(state: WorkerState.Value): Unit = {
     this.state = state
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
 
b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
index 1ac6677..a285783 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
@@ -46,7 +46,7 @@ private[master] class ZooKeeperPersistenceEngine(conf: 
SparkConf, val serializat
     zk.delete().forPath(WORKING_DIR + "/" + name)
   }
 
-  override def read[T: ClassTag](prefix: String) = {
+  override def read[T: ClassTag](prefix: String): Seq[T] = {
     val file = zk.getChildren.forPath(WORKING_DIR).filter(_.startsWith(prefix))
     file.map(deserializeFromFile[T]).flatten
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
index dee2e4a..46509e3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
@@ -95,7 +95,7 @@ private[ui] class MasterPage(parent: MasterWebUI) extends 
WebUIPage("") {
 
     // For now we only show driver information if the user has submitted 
drivers to the cluster.
     // This is until we integrate the notion of drivers and applications in 
the UI.
-    def hasDrivers = activeDrivers.length > 0 || completedDrivers.length > 0
+    def hasDrivers: Boolean = activeDrivers.length > 0 || 
completedDrivers.length > 0
 
     val content =
         <div class="row-fluid">

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index 27a9eab..e0948e1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -56,8 +56,14 @@ private[deploy] class DriverRunner(
   private var finalExitCode: Option[Int] = None
 
   // Decoupled for testing
-  def setClock(_clock: Clock) = clock = _clock
-  def setSleeper(_sleeper: Sleeper) = sleeper = _sleeper
+  def setClock(_clock: Clock): Unit = {
+    clock = _clock
+  }
+
+  def setSleeper(_sleeper: Sleeper): Unit = {
+    sleeper = _sleeper
+  }
+
   private var clock: Clock = new SystemClock()
   private var sleeper = new Sleeper {
     def sleep(seconds: Int): Unit = (0 until seconds).takeWhile(f => 
{Thread.sleep(1000); !killed})
@@ -155,7 +161,7 @@ private[deploy] class DriverRunner(
 
   private def launchDriver(builder: ProcessBuilder, baseDir: File, supervise: 
Boolean) {
     builder.directory(baseDir)
-    def initialize(process: Process) = {
+    def initialize(process: Process): Unit = {
       // Redirect stdout and stderr to files
       val stdout = new File(baseDir, "stdout")
       CommandUtils.redirectStream(process.getInputStream, stdout)
@@ -169,8 +175,8 @@ private[deploy] class DriverRunner(
     runCommandWithRetry(ProcessBuilderLike(builder), initialize, supervise)
   }
 
-  def runCommandWithRetry(command: ProcessBuilderLike, initialize: Process => 
Unit,
-    supervise: Boolean) {
+  def runCommandWithRetry(
+      command: ProcessBuilderLike, initialize: Process => Unit, supervise: 
Boolean): Unit = {
     // Time to wait between submission retries.
     var waitSeconds = 1
     // A run of this many seconds resets the exponential back-off.
@@ -216,8 +222,8 @@ private[deploy] trait ProcessBuilderLike {
 }
 
 private[deploy] object ProcessBuilderLike {
-  def apply(processBuilder: ProcessBuilder) = new ProcessBuilderLike {
-    def start() = processBuilder.start()
-    def command = processBuilder.command()
+  def apply(processBuilder: ProcessBuilder): ProcessBuilderLike = new 
ProcessBuilderLike {
+    override def start(): Process = processBuilder.start()
+    override def command: Seq[String] = processBuilder.command()
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index c1b0a29..c4c24a7 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -275,7 +275,7 @@ private[worker] class Worker(
     }
   }
 
-  override def receiveWithLogging = {
+  override def receiveWithLogging: PartialFunction[Any, Unit] = {
     case RegisteredWorker(masterUrl, masterWebUiUrl) =>
       logInfo("Successfully registered with master " + masterUrl)
       registered = true

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
index 09d866f..e079027 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
@@ -50,7 +50,7 @@ private[spark] class WorkerWatcher(workerUrl: String)
 
   private def exitNonZero() = if (isTesting) isShutDown = true else 
System.exit(-1)
 
-  override def receiveWithLogging = {
+  override def receiveWithLogging: PartialFunction[Any, Unit] = {
     case AssociatedEvent(localAddress, remoteAddress, inbound) if 
isWorker(remoteAddress) =>
       logInfo(s"Successfully connected to $workerUrl")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index dd19e49..b5205d4 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -62,7 +62,7 @@ private[spark] class CoarseGrainedExecutorBackend(
       .map(e => (e._1.substring(prefix.length).toLowerCase, e._2))
   }
 
-  override def receiveWithLogging = {
+  override def receiveWithLogging: PartialFunction[Any, Unit] = {
     case RegisteredExecutor =>
       logInfo("Successfully registered with driver")
       val (hostname, _) = Utils.parseHostPort(hostPort)

http://git-wip-us.apache.org/repos/asf/spark/blob/4ce2782a/core/src/main/scala/org/apache/spark/executor/ExecutorActor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorActor.scala 
b/core/src/main/scala/org/apache/spark/executor/ExecutorActor.scala
index 41925f7..3e47d13 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorActor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorActor.scala
@@ -33,7 +33,7 @@ private[spark] case object TriggerThreadDump
 private[spark]
 class ExecutorActor(executorId: String) extends Actor with ActorLogReceive 
with Logging {
 
-  override def receiveWithLogging = {
+  override def receiveWithLogging: PartialFunction[Any, Unit] = {
     case TriggerThreadDump =>
       sender ! Utils.getThreadDump()
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to