[SPARK-21415] Triage scapegoat warnings, part 1 ## What changes were proposed in this pull request?
Address scapegoat warnings for: - BigDecimal double constructor - Catching NPE - Finalizer without super - List.size is O(n) - Prefer Seq.empty - Prefer Set.empty - reverse.map instead of reverseMap - Type shadowing - Unnecessary if condition. - Use .log1p - Var could be val In some instances like Seq.empty, I avoided making the change even where valid in test code to keep the scope of the change smaller. Those issues are concerned with performance and it won't matter for tests. ## How was this patch tested? Existing tests Author: Sean Owen <[email protected]> Closes #18635 from srowen/Scapegoat1. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e26dac5f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e26dac5f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e26dac5f Branch: refs/heads/master Commit: e26dac5feb02033f980b1e69c9b0ff50869b6f9e Parents: 26cd2ca Author: Sean Owen <[email protected]> Authored: Tue Jul 18 08:47:17 2017 +0100 Committer: Sean Owen <[email protected]> Committed: Tue Jul 18 08:47:17 2017 +0100 ---------------------------------------------------------------------- .../scala/org/apache/spark/SSLOptions.scala | 26 +++++++++--------- .../main/scala/org/apache/spark/SparkEnv.scala | 2 +- .../main/scala/org/apache/spark/TestUtils.scala | 8 +++--- .../org/apache/spark/api/python/PythonRDD.scala | 1 + .../scala/org/apache/spark/api/r/SerDe.scala | 3 +-- .../apache/spark/deploy/SparkHadoopUtil.scala | 2 +- .../deploy/history/FsHistoryProvider.scala | 2 +- .../spark/deploy/master/ui/MasterWebUI.scala | 2 +- .../spark/deploy/worker/CommandUtils.scala | 6 ++--- .../main/scala/org/apache/spark/package.scala | 10 +++---- .../org/apache/spark/rdd/CoalescedRDD.scala | 2 +- .../scala/org/apache/spark/scheduler/Pool.scala | 2 +- .../org/apache/spark/scheduler/TaskResult.scala | 2 +- .../apache/spark/scheduler/TaskSetManager.scala | 4 +-- .../cluster/CoarseGrainedSchedulerBackend.scala | 5 ++-- .../org/apache/spark/storage/BlockManager.scala | 6 ++--- .../org/apache/spark/ui/jobs/AllJobsPage.scala | 2 +- .../apache/spark/ui/jobs/AllStagesPage.scala | 2 +- .../org/apache/spark/ui/jobs/PoolPage.scala | 2 +- .../org/apache/spark/ui/jobs/StagePage.scala | 2 +- .../org/apache/spark/ui/jobs/StageTable.scala | 2 +- .../org/apache/spark/ui/storage/RDDPage.scala | 2 +- .../org/apache/spark/util/ClosureCleaner.scala | 26 +++++++++--------- .../org/apache/spark/util/JsonProtocol.scala | 4 +-- .../scala/org/apache/spark/util/Utils.scala | 6 ++--- .../org/apache/spark/examples/LocalFileLR.scala | 6 ++--- .../org/apache/spark/examples/LocalKMeans.scala | 15 +++++------ .../org/apache/spark/examples/LocalLR.scala | 4 +-- .../org/apache/spark/examples/SparkHdfsLR.scala | 6 ++--- .../org/apache/spark/examples/SparkLR.scala | 2 +- .../spark/examples/ml/DecisionTreeExample.scala | 6 ++--- .../apache/spark/examples/ml/GBTExample.scala | 6 ++--- .../spark/examples/ml/RandomForestExample.scala | 6 ++--- .../examples/mllib/DecisionTreeRunner.scala | 2 +- .../spark/sql/kafka010/KafkaTestUtils.scala | 2 +- .../spark/streaming/kafka010/KafkaRDD.scala | 2 +- .../streaming/kafka010/KafkaTestUtils.scala | 2 +- .../spark/graphx/impl/EdgePartition.scala | 4 +-- .../apache/spark/graphx/impl/GraphImpl.scala | 2 +- .../spark/graphx/util/GraphGenerators.scala | 3 ++- .../ml/classification/LogisticRegression.scala | 4 +-- .../spark/ml/feature/RFormulaParser.scala | 2 +- .../spark/ml/regression/LinearRegression.scala | 2 +- .../apache/spark/mllib/clustering/KMeans.scala | 2 +- .../mllib/linalg/EigenValueDecomposition.scala | 14 +++++----- .../apache/spark/mllib/optimization/LBFGS.scala | 2 +- .../stat/correlation/SpearmanCorrelation.scala | 2 +- .../spark/mllib/stat/test/StreamingTest.scala | 2 +- project/SparkBuild.scala | 4 +-- .../spark/deploy/mesos/ui/DriverPage.scala | 4 +-- .../cluster/mesos/MesosSchedulerUtils.scala | 2 +- .../spark/sql/catalyst/analysis/Analyzer.scala | 5 ++-- .../expressions/collectionOperations.scala | 4 +-- .../optimizer/CostBasedJoinReorder.scala | 2 +- .../sql/catalyst/optimizer/expressions.scala | 2 +- .../spark/sql/catalyst/parser/AstBuilder.scala | 2 +- .../spark/sql/catalyst/planning/patterns.scala | 2 +- .../statsEstimation/FilterEstimation.scala | 14 +++++----- .../expressions/MathExpressionsSuite.scala | 6 ++--- .../apache/spark/sql/types/DecimalSuite.scala | 4 +-- .../spark/sql/execution/command/ddl.scala | 4 +-- .../datasources/jdbc/JDBCRelation.scala | 3 +-- .../datasources/parquet/ParquetFileFormat.scala | 2 +- .../apache/spark/sql/execution/objects.scala | 5 +--- .../sql/execution/r/MapPartitionsRWrapper.scala | 3 +-- .../streaming/CompactibleFileStreamLog.scala | 8 ++---- .../execution/streaming/StreamExecution.scala | 2 +- .../spark/sql/DataFrameAggregateSuite.scala | 28 ++++++++++---------- .../org/apache/spark/sql/DataFrameSuite.scala | 4 +-- .../org/apache/spark/sql/SQLQuerySuite.scala | 4 +-- .../apache/spark/sql/StringFunctionsSuite.scala | 2 +- .../execution/datasources/json/JsonSuite.scala | 4 +-- .../ParquetPartitionDiscoverySuite.scala | 2 +- .../org/apache/spark/sql/hive/HiveUtils.scala | 2 +- .../org/apache/spark/sql/hive/TableReader.scala | 8 +++--- .../spark/sql/hive/client/HiveClientImpl.scala | 2 +- .../spark/sql/hive/HiveInspectorSuite.scala | 2 +- .../spark/sql/hive/client/HiveClientSuite.scala | 9 +++---- .../spark/streaming/StreamingContext.scala | 2 +- .../streaming/api/java/JavaDStreamLike.scala | 2 +- .../spark/streaming/dstream/StateDStream.scala | 2 +- .../spark/streaming/util/RawTextHelper.scala | 1 - 82 files changed, 186 insertions(+), 195 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/SSLOptions.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SSLOptions.scala b/core/src/main/scala/org/apache/spark/SSLOptions.scala index f86fd20..477b019 100644 --- a/core/src/main/scala/org/apache/spark/SSLOptions.scala +++ b/core/src/main/scala/org/apache/spark/SSLOptions.scala @@ -94,21 +94,23 @@ private[spark] case class SSLOptions( * are supported by the current Java security provider for this protocol. */ private val supportedAlgorithms: Set[String] = if (enabledAlgorithms.isEmpty) { - Set() + Set.empty } else { var context: SSLContext = null - try { - context = SSLContext.getInstance(protocol.orNull) - /* The set of supported algorithms does not depend upon the keys, trust, or + if (protocol.isEmpty) { + logDebug("No SSL protocol specified") + context = SSLContext.getDefault + } else { + try { + context = SSLContext.getInstance(protocol.get) + /* The set of supported algorithms does not depend upon the keys, trust, or rng, although they will influence which algorithms are eventually used. */ - context.init(null, null, null) - } catch { - case npe: NullPointerException => - logDebug("No SSL protocol specified") - context = SSLContext.getDefault - case nsa: NoSuchAlgorithmException => - logDebug(s"No support for requested SSL protocol ${protocol.get}") - context = SSLContext.getDefault + context.init(null, null, null) + } catch { + case nsa: NoSuchAlgorithmException => + logDebug(s"No support for requested SSL protocol ${protocol.get}") + context = SSLContext.getDefault + } } val providerAlgorithms = context.getServerSocketFactory.getSupportedCipherSuites.toSet http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/SparkEnv.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 3196c1e..45ed986 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -420,7 +420,7 @@ object SparkEnv extends Logging { if (!conf.contains("spark.scheduler.mode")) { Seq(("spark.scheduler.mode", schedulingMode)) } else { - Seq[(String, String)]() + Seq.empty[(String, String)] } val sparkProperties = (conf.getAll ++ schedulerMode).sorted http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index 3f912dc..a80016d 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -58,8 +58,8 @@ private[spark] object TestUtils { def createJarWithClasses( classNames: Seq[String], toStringValue: String = "", - classNamesWithBase: Seq[(String, String)] = Seq(), - classpathUrls: Seq[URL] = Seq()): URL = { + classNamesWithBase: Seq[(String, String)] = Seq.empty, + classpathUrls: Seq[URL] = Seq.empty): URL = { val tempDir = Utils.createTempDir() val files1 = for (name <- classNames) yield { createCompiledClass(name, tempDir, toStringValue, classpathUrls = classpathUrls) @@ -137,7 +137,7 @@ private[spark] object TestUtils { val options = if (classpathUrls.nonEmpty) { Seq("-classpath", classpathUrls.map { _.getFile }.mkString(File.pathSeparator)) } else { - Seq() + Seq.empty } compiler.getTask(null, null, null, options.asJava, null, Arrays.asList(sourceFile)).call() @@ -160,7 +160,7 @@ private[spark] object TestUtils { destDir: File, toStringValue: String = "", baseClass: String = null, - classpathUrls: Seq[URL] = Seq()): File = { + classpathUrls: Seq[URL] = Seq.empty): File = { val extendsText = Option(baseClass).map { c => s" extends ${c}" }.getOrElse("") val sourceFile = new JavaSourceFromString(className, "public class " + className + extendsText + " implements java.io.Serializable {" + http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index fb0405b..6a81752 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -974,6 +974,7 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial } } } + super.finalize() } } // scalastyle:on no.finalize http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/api/r/SerDe.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala index dad928c..537ab57 100644 --- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala +++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala @@ -128,8 +128,7 @@ private[spark] object SerDe { } def readBoolean(in: DataInputStream): Boolean = { - val intVal = in.readInt() - if (intVal == 0) false else true + in.readInt() != 0 } def readDate(in: DataInputStream): Date = { http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 6afe58b..ccbabf0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -337,7 +337,7 @@ class SparkHadoopUtil extends Logging { if (credentials != null) { credentials.getAllTokens.asScala.map(tokenToString) } else { - Seq() + Seq.empty } } http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index b2a50bd..687fd2d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -317,7 +317,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val newLastScanTime = getNewLastScanTime() logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime") val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq) - .getOrElse(Seq[FileStatus]()) + .getOrElse(Seq.empty[FileStatus]) // scan for modified applications, replay and merge them val logInfos: Seq[FileStatus] = statusList .filter { entry => http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index 8cfd0f6..e42f41b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -55,7 +55,7 @@ class MasterWebUI( } def addProxyTargets(id: String, target: String): Unit = { - var endTarget = target.stripSuffix("/") + val endTarget = target.stripSuffix("/") val handler = createProxyHandler("/proxy/" + id, endTarget) attachHandler(handler) proxyHandlers(id) = handler http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala index cba4aaf..12e0dae 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala @@ -44,7 +44,7 @@ object CommandUtils extends Logging { memory: Int, sparkHome: String, substituteArguments: String => String, - classPaths: Seq[String] = Seq[String](), + classPaths: Seq[String] = Seq.empty, env: Map[String, String] = sys.env): ProcessBuilder = { val localCommand = buildLocalCommand( command, securityMgr, substituteArguments, classPaths, env) @@ -73,7 +73,7 @@ object CommandUtils extends Logging { command: Command, securityMgr: SecurityManager, substituteArguments: String => String, - classPath: Seq[String] = Seq[String](), + classPath: Seq[String] = Seq.empty, env: Map[String, String]): Command = { val libraryPathName = Utils.libraryPathEnvName val libraryPathEntries = command.libraryPathEntries @@ -96,7 +96,7 @@ object CommandUtils extends Logging { command.arguments.map(substituteArguments), newEnvironment, command.classPathEntries ++ classPath, - Seq[String](), // library path already captured in environment variable + Seq.empty, // library path already captured in environment variable // filter out auth secret from java options command.javaOpts.filterNot(_.startsWith("-D" + SecurityManager.SPARK_AUTH_SECRET_CONF))) } http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/package.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/package.scala b/core/src/main/scala/org/apache/spark/package.scala index 2610d6f..8058a4d 100644 --- a/core/src/main/scala/org/apache/spark/package.scala +++ b/core/src/main/scala/org/apache/spark/package.scala @@ -17,6 +17,8 @@ package org.apache +import java.util.Properties + /** * Core Spark functionality. [[org.apache.spark.SparkContext]] serves as the main entry point to * Spark, while [[org.apache.spark.rdd.RDD]] is the data type representing a distributed collection, @@ -40,9 +42,6 @@ package org.apache * Developer API</span> are intended for advanced users want to extend Spark through lower * level interfaces. These are subject to changes or removal in minor releases. */ - -import java.util.Properties - package object spark { private object SparkBuildInfo { @@ -57,6 +56,9 @@ package object spark { val resourceStream = Thread.currentThread().getContextClassLoader. getResourceAsStream("spark-version-info.properties") + if (resourceStream == null) { + throw new SparkException("Could not find spark-version-info.properties") + } try { val unknownProp = "<unknown>" @@ -71,8 +73,6 @@ package object spark { props.getProperty("date", unknownProp) ) } catch { - case npe: NullPointerException => - throw new SparkException("Error while locating file spark-version-info.properties", npe) case e: Exception => throw new SparkException("Error loading properties from spark-version-info.properties", e) } finally { http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala index 2cba1fe..10451a3 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala @@ -269,7 +269,7 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10) tries = 0 // if we don't have enough partition groups, create duplicates while (numCreated < targetLen) { - var (nxt_replica, nxt_part) = partitionLocs.partsWithLocs(tries) + val (nxt_replica, nxt_part) = partitionLocs.partsWithLocs(tries) tries += 1 val pgroup = new PartitionGroup(Some(nxt_replica)) groupArr += pgroup http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/scheduler/Pool.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala index 1181371..f4b0ab1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala @@ -97,7 +97,7 @@ private[spark] class Pool( } override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = { - var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager] + val sortedTaskSetQueue = new ArrayBuffer[TaskSetManager] val sortedSchedulableQueue = schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator) for (schedulable <- sortedSchedulableQueue) { http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala index 366b92c..836769e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala @@ -60,7 +60,7 @@ private[spark] class DirectTaskResult[T]( val numUpdates = in.readInt if (numUpdates == 0) { - accumUpdates = Seq() + accumUpdates = Seq.empty } else { val _accumUpdates = new ArrayBuffer[AccumulatorV2[_, _]] for (i <- 0 until numUpdates) { http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 3968fb7..589fe67 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -891,7 +891,7 @@ private[spark] class TaskSetManager( override def removeSchedulable(schedulable: Schedulable) {} override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = { - var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]() + val sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]() sortedTaskSetQueue += this sortedTaskSetQueue } @@ -948,7 +948,7 @@ private[spark] class TaskSetManager( if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) { val time = clock.getTimeMillis() - var medianDuration = successfulTaskDurations.median + val medianDuration = successfulTaskDurations.median val threshold = max(SPECULATION_MULTIPLIER * medianDuration, minTimeToSpeculation) // TODO: Threshold should also look at standard deviation of task durations and have a lower // bound based on that. http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 0b396b7..a46824a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -23,7 +23,6 @@ import javax.annotation.concurrent.GuardedBy import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.concurrent.Future -import scala.concurrent.duration.Duration import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState} import org.apache.spark.internal.Logging @@ -427,11 +426,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * be called in the yarn-client mode when AM re-registers after a failure. * */ protected def reset(): Unit = { - val executors = synchronized { + val executors: Set[String] = synchronized { requestedTotalExecutors = 0 numPendingExecutors = 0 executorsPendingToRemove.clear() - Set() ++ executorDataMap.keys + executorDataMap.keys.toSet } // Remove all the lingering executors that should be removed but not yet. The reason might be http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index adbe3cf..aaacabe 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1275,11 +1275,11 @@ private[spark] class BlockManager( val numPeersToReplicateTo = level.replication - 1 val startTime = System.nanoTime - var peersReplicatedTo = mutable.HashSet.empty ++ existingReplicas - var peersFailedToReplicateTo = mutable.HashSet.empty[BlockManagerId] + val peersReplicatedTo = mutable.HashSet.empty ++ existingReplicas + val peersFailedToReplicateTo = mutable.HashSet.empty[BlockManagerId] var numFailures = 0 - val initialPeers = getPeers(false).filterNot(existingReplicas.contains(_)) + val initialPeers = getPeers(false).filterNot(existingReplicas.contains) var peersForReplication = blockReplicationPolicy.prioritize( blockManagerId, http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala index cce7a76..a7f2caa 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala @@ -241,7 +241,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") { }.getOrElse(jobIdTitle) val jobSortDesc = Option(parameterJobSortDesc).map(_.toBoolean).getOrElse( // New jobs should be shown above old jobs by default. - if (jobSortColumn == jobIdTitle) true else false + jobSortColumn == jobIdTitle ) val jobPageSize = Option(parameterJobPageSize).map(_.toInt).getOrElse(100) val jobPrevPageSize = Option(parameterJobPrevPageSize).map(_.toInt).getOrElse(jobPageSize) http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala index 2b0816e..a30c135 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala @@ -115,7 +115,7 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { if (sc.isDefined && isFairScheduler) { <h4>{pools.size} Fair Scheduler Pools</h4> ++ poolTable.toNodeSeq } else { - Seq[Node]() + Seq.empty[Node] } } if (shouldShowActiveStages) { http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala index b164f32..819fe57 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala @@ -41,7 +41,7 @@ private[ui] class PoolPage(parent: StagesTab) extends WebUIPage("pool") { val poolToActiveStages = listener.poolToActiveStages val activeStages = poolToActiveStages.get(poolName) match { case Some(s) => s.values.toSeq - case None => Seq[StageInfo]() + case None => Seq.empty[StageInfo] } val shouldShowActiveStages = activeStages.nonEmpty val activeStagesTable = http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 6b3dadc..8ed5174 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -565,7 +565,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { val executorTable = new ExecutorTable(stageId, stageAttemptId, parent) val maybeAccumulableTable: Seq[Node] = - if (hasAccumulators) { <h4>Accumulators</h4> ++ accumulableTable } else Seq() + if (hasAccumulators) { <h4>Accumulators</h4> ++ accumulableTable } else Seq.empty val aggMetrics = <span class="collapse-aggregated-metrics collapse-table" http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index a28daf7..f0a12a2 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -60,7 +60,7 @@ private[ui] class StageTableBase( }.getOrElse("Stage Id") val stageSortDesc = Option(parameterStageSortDesc).map(_.toBoolean).getOrElse( // New stages should be shown above old jobs by default. - if (stageSortColumn == "Stage Id") true else false + stageSortColumn == "Stage Id" ) val stagePageSize = Option(parameterStagePageSize).map(_.toInt).getOrElse(100) val stagePrevPageSize = Option(parameterStagePrevPageSize).map(_.toInt) http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala index 317e0aa..e8ff08f 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala @@ -51,7 +51,7 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") { val rddStorageInfo = AllRDDResource.getRDDStorageInfo(rddId, listener, includeDetails = true) .getOrElse { // Rather than crashing, render an "RDD Not Found" page - return UIUtils.headerSparkPage("RDD Not Found", Seq[Node](), parent) + return UIUtils.headerSparkPage("RDD Not Found", Seq.empty[Node], parent) } // Worker table http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index 489688c..48a1d7b 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -81,7 +81,7 @@ private[spark] object ClosureCleaner extends Logging { val stack = Stack[Class[_]](obj.getClass) while (!stack.isEmpty) { val cr = getClassReader(stack.pop()) - val set = Set[Class[_]]() + val set = Set.empty[Class[_]] cr.accept(new InnerClosureFinder(set), 0) for (cls <- set -- seen) { seen += cls @@ -180,16 +180,18 @@ private[spark] object ClosureCleaner extends Logging { val declaredFields = func.getClass.getDeclaredFields val declaredMethods = func.getClass.getDeclaredMethods - logDebug(" + declared fields: " + declaredFields.size) - declaredFields.foreach { f => logDebug(" " + f) } - logDebug(" + declared methods: " + declaredMethods.size) - declaredMethods.foreach { m => logDebug(" " + m) } - logDebug(" + inner classes: " + innerClasses.size) - innerClasses.foreach { c => logDebug(" " + c.getName) } - logDebug(" + outer classes: " + outerClasses.size) - outerClasses.foreach { c => logDebug(" " + c.getName) } - logDebug(" + outer objects: " + outerObjects.size) - outerObjects.foreach { o => logDebug(" " + o) } + if (log.isDebugEnabled) { + logDebug(" + declared fields: " + declaredFields.size) + declaredFields.foreach { f => logDebug(" " + f) } + logDebug(" + declared methods: " + declaredMethods.size) + declaredMethods.foreach { m => logDebug(" " + m) } + logDebug(" + inner classes: " + innerClasses.size) + innerClasses.foreach { c => logDebug(" " + c.getName) } + logDebug(" + outer classes: " + outerClasses.size) + outerClasses.foreach { c => logDebug(" " + c.getName) } + logDebug(" + outer objects: " + outerObjects.size) + outerObjects.foreach { o => logDebug(" " + o) } + } // Fail fast if we detect return statements in closures getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0) @@ -201,7 +203,7 @@ private[spark] object ClosureCleaner extends Logging { // Initialize accessed fields with the outer classes first // This step is needed to associate the fields to the correct classes later for (cls <- outerClasses) { - accessedFields(cls) = Set[String]() + accessedFields(cls) = Set.empty[String] } // Populate accessed fields by visiting all fields and methods accessed by this and // all of its inner closures. If transitive cleaning is enabled, this may recursively http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 806d14e..8406826 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -696,7 +696,7 @@ private[spark] object JsonProtocol { val accumulatedValues = { Utils.jsonOption(json \ "Accumulables").map(_.extract[List[JValue]]) match { case Some(values) => values.map(accumulableInfoFromJson) - case None => Seq[AccumulableInfo]() + case None => Seq.empty[AccumulableInfo] } } @@ -726,7 +726,7 @@ private[spark] object JsonProtocol { val killed = Utils.jsonOption(json \ "Killed").exists(_.extract[Boolean]) val accumulables = Utils.jsonOption(json \ "Accumulables").map(_.extract[Seq[JValue]]) match { case Some(values) => values.map(accumulableInfoFromJson) - case None => Seq[AccumulableInfo]() + case None => Seq.empty[AccumulableInfo] } val taskInfo = http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/core/src/main/scala/org/apache/spark/util/Utils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 584337a..d661293 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1443,7 +1443,7 @@ private[spark] object Utils extends Logging { var firstUserFile = "<unknown>" var firstUserLine = 0 var insideSpark = true - var callStack = new ArrayBuffer[String]() :+ "<unknown>" + val callStack = new ArrayBuffer[String]() :+ "<unknown>" Thread.currentThread.getStackTrace().foreach { ste: StackTraceElement => // When running under some profilers, the current stack trace might contain some bogus @@ -2438,7 +2438,7 @@ private[spark] object Utils extends Logging { .getOrElse(UserGroupInformation.getCurrentUser().getShortUserName()) } - val EMPTY_USER_GROUPS = Set[String]() + val EMPTY_USER_GROUPS = Set.empty[String] // Returns the groups to which the current user belongs. def getCurrentUserGroups(sparkConf: SparkConf, username: String): Set[String] = { @@ -2587,7 +2587,7 @@ private[spark] object Utils extends Logging { * Unions two comma-separated lists of files and filters out empty strings. */ def unionFileLists(leftList: Option[String], rightList: Option[String]): Set[String] = { - var allFiles = Set[String]() + var allFiles = Set.empty[String] leftList.foreach { value => allFiles ++= value.split(",") } rightList.foreach { value => allFiles ++= value.split(",") } allFiles.filter { _.nonEmpty } http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala b/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala index a897cad..8dbb7ee 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala @@ -53,16 +53,16 @@ object LocalFileLR { val fileSrc = scala.io.Source.fromFile(args(0)) val lines = fileSrc.getLines().toArray - val points = lines.map(parsePoint _) + val points = lines.map(parsePoint) val ITERATIONS = args(1).toInt // Initialize w to a random value - var w = DenseVector.fill(D) {2 * rand.nextDouble - 1} + val w = DenseVector.fill(D) {2 * rand.nextDouble - 1} println("Initial w: " + w) for (i <- 1 to ITERATIONS) { println("On iteration " + i) - var gradient = DenseVector.zeros[Double](D) + val gradient = DenseVector.zeros[Double](D) for (p <- points) { val scale = (1 / (1 + math.exp(-p.y * (w.dot(p.x)))) - 1) * p.y gradient += p.x * scale http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala index fca585c..963c9a5 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala @@ -47,12 +47,11 @@ object LocalKMeans { } def closestPoint(p: Vector[Double], centers: HashMap[Int, Vector[Double]]): Int = { - var index = 0 var bestIndex = 0 var closest = Double.PositiveInfinity for (i <- 1 to centers.size) { - val vCurr = centers.get(i).get + val vCurr = centers(i) val tempDist = squaredDistance(p, vCurr) if (tempDist < closest) { closest = tempDist @@ -76,8 +75,8 @@ object LocalKMeans { showWarning() val data = generateData - var points = new HashSet[Vector[Double]] - var kPoints = new HashMap[Int, Vector[Double]] + val points = new HashSet[Vector[Double]] + val kPoints = new HashMap[Int, Vector[Double]] var tempDist = 1.0 while (points.size < K) { @@ -92,11 +91,11 @@ object LocalKMeans { println("Initial centers: " + kPoints) while(tempDist > convergeDist) { - var closest = data.map (p => (closestPoint(p, kPoints), (p, 1))) + val closest = data.map (p => (closestPoint(p, kPoints), (p, 1))) - var mappings = closest.groupBy[Int] (x => x._1) + val mappings = closest.groupBy[Int] (x => x._1) - var pointStats = mappings.map { pair => + val pointStats = mappings.map { pair => pair._2.reduceLeft [(Int, (Vector[Double], Int))] { case ((id1, (p1, c1)), (id2, (p2, c2))) => (id1, (p1 + p2, c1 + c2)) } @@ -107,7 +106,7 @@ object LocalKMeans { tempDist = 0.0 for (mapping <- newPoints) { - tempDist += squaredDistance(kPoints.get(mapping._1).get, mapping._2) + tempDist += squaredDistance(kPoints(mapping._1), mapping._2) } for (newP <- newPoints) { http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/examples/src/main/scala/org/apache/spark/examples/LocalLR.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalLR.scala b/examples/src/main/scala/org/apache/spark/examples/LocalLR.scala index 13ccc2a..eb5221f 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LocalLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LocalLR.scala @@ -60,12 +60,12 @@ object LocalLR { val data = generateData // Initialize w to a random value - var w = DenseVector.fill(D) {2 * rand.nextDouble - 1} + val w = DenseVector.fill(D) {2 * rand.nextDouble - 1} println("Initial w: " + w) for (i <- 1 to ITERATIONS) { println("On iteration " + i) - var gradient = DenseVector.zeros[Double](D) + val gradient = DenseVector.zeros[Double](D) for (p <- data) { val scale = (1 / (1 + math.exp(-p.y * (w.dot(p.x)))) - 1) * p.y gradient += p.x * scale http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala index 05ac6cb..9d675bb 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala @@ -40,8 +40,8 @@ object SparkHdfsLR { def parsePoint(line: String): DataPoint = { val tok = new java.util.StringTokenizer(line, " ") - var y = tok.nextToken.toDouble - var x = new Array[Double](D) + val y = tok.nextToken.toDouble + val x = new Array[Double](D) var i = 0 while (i < D) { x(i) = tok.nextToken.toDouble; i += 1 @@ -78,7 +78,7 @@ object SparkHdfsLR { val ITERATIONS = args(1).toInt // Initialize w to a random value - var w = DenseVector.fill(D) {2 * rand.nextDouble - 1} + val w = DenseVector.fill(D) {2 * rand.nextDouble - 1} println("Initial w: " + w) for (i <- 1 to ITERATIONS) { http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala index cb2be09..c18e3d3 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkLR.scala @@ -72,7 +72,7 @@ object SparkLR { val points = spark.sparkContext.parallelize(generateData, numSlices).cache() // Initialize w to a random value - var w = DenseVector.fill(D) {2 * rand.nextDouble - 1} + val w = DenseVector.fill(D) {2 * rand.nextDouble - 1} println("Initial w: " + w) for (i <- 1 to ITERATIONS) { http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala index b03701e..19f2d77 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala @@ -251,7 +251,7 @@ object DecisionTreeExample { .setMinInfoGain(params.minInfoGain) .setCacheNodeIds(params.cacheNodeIds) .setCheckpointInterval(params.checkpointInterval) - case _ => throw new IllegalArgumentException("Algo ${params.algo} not supported.") + case _ => throw new IllegalArgumentException(s"Algo ${params.algo} not supported.") } stages += dt val pipeline = new Pipeline().setStages(stages.toArray) @@ -278,7 +278,7 @@ object DecisionTreeExample { } else { println(treeModel) // Print model summary. } - case _ => throw new IllegalArgumentException("Algo ${params.algo} not supported.") + case _ => throw new IllegalArgumentException(s"Algo ${params.algo} not supported.") } // Evaluate model on training, test data. @@ -294,7 +294,7 @@ object DecisionTreeExample { println("Test data results:") evaluateRegressionModel(pipelineModel, test, labelColName) case _ => - throw new IllegalArgumentException("Algo ${params.algo} not supported.") + throw new IllegalArgumentException(s"Algo ${params.algo} not supported.") } spark.stop() http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/examples/src/main/scala/org/apache/spark/examples/ml/GBTExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/GBTExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/GBTExample.scala index 3bd8ff5..8f3ce4b 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/GBTExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/GBTExample.scala @@ -190,7 +190,7 @@ object GBTExample { .setCacheNodeIds(params.cacheNodeIds) .setCheckpointInterval(params.checkpointInterval) .setMaxIter(params.maxIter) - case _ => throw new IllegalArgumentException("Algo ${params.algo} not supported.") + case _ => throw new IllegalArgumentException(s"Algo ${params.algo} not supported.") } stages += dt val pipeline = new Pipeline().setStages(stages.toArray) @@ -217,7 +217,7 @@ object GBTExample { } else { println(rfModel) // Print model summary. } - case _ => throw new IllegalArgumentException("Algo ${params.algo} not supported.") + case _ => throw new IllegalArgumentException(s"Algo ${params.algo} not supported.") } // Evaluate model on training, test data. @@ -233,7 +233,7 @@ object GBTExample { println("Test data results:") DecisionTreeExample.evaluateRegressionModel(pipelineModel, test, labelColName) case _ => - throw new IllegalArgumentException("Algo ${params.algo} not supported.") + throw new IllegalArgumentException(s"Algo ${params.algo} not supported.") } spark.stop() http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/examples/src/main/scala/org/apache/spark/examples/ml/RandomForestExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/RandomForestExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/RandomForestExample.scala index a735c21..3c127a4 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/RandomForestExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/RandomForestExample.scala @@ -198,7 +198,7 @@ object RandomForestExample { .setCheckpointInterval(params.checkpointInterval) .setFeatureSubsetStrategy(params.featureSubsetStrategy) .setNumTrees(params.numTrees) - case _ => throw new IllegalArgumentException("Algo ${params.algo} not supported.") + case _ => throw new IllegalArgumentException(s"Algo ${params.algo} not supported.") } stages += dt val pipeline = new Pipeline().setStages(stages.toArray) @@ -225,7 +225,7 @@ object RandomForestExample { } else { println(rfModel) // Print model summary. } - case _ => throw new IllegalArgumentException("Algo ${params.algo} not supported.") + case _ => throw new IllegalArgumentException(s"Algo ${params.algo} not supported.") } // Evaluate model on training, test data. @@ -241,7 +241,7 @@ object RandomForestExample { println("Test data results:") DecisionTreeExample.evaluateRegressionModel(pipelineModel, test, labelColName) case _ => - throw new IllegalArgumentException("Algo ${params.algo} not supported.") + throw new IllegalArgumentException(s"Algo ${params.algo} not supported.") } spark.stop() http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala index 0ad0465..fa47e12 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala @@ -211,7 +211,7 @@ object DecisionTreeRunner { case Regression => (origExamples, null, 0) case _ => - throw new IllegalArgumentException("Algo ${params.algo} not supported.") + throw new IllegalArgumentException(s"Algo $algo not supported.") } // Create training, test sets. http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index f86b8f5..5915d9f 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -378,7 +378,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L zkUtils.getLeaderForPartition(topic, partition).isDefined && Request.isValidBrokerId(leaderAndInSyncReplicas.leader) && - leaderAndInSyncReplicas.isr.size >= 1 + leaderAndInSyncReplicas.isr.nonEmpty case _ => false http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala index 62cdf5b..d9fc9cc 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala @@ -156,7 +156,7 @@ private[spark] class KafkaRDD[K, V]( val prefExecs = if (null == prefHost) allExecs else allExecs.filter(_.host == prefHost) val execs = if (prefExecs.isEmpty) allExecs else prefExecs if (execs.isEmpty) { - Seq() + Seq.empty } else { // execs is sorted, tp.hashCode depends only on topic and partition, so consistent index val index = Math.floorMod(tp.hashCode, execs.length) http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala index 8273c2b..6c7024e 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala @@ -257,7 +257,7 @@ private[kafka010] class KafkaTestUtils extends Logging { zkUtils.getLeaderForPartition(topic, partition).isDefined && Request.isValidBrokerId(leaderAndInSyncReplicas.leader) && - leaderAndInSyncReplicas.isr.size >= 1 + leaderAndInSyncReplicas.isr.nonEmpty case _ => false http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala ---------------------------------------------------------------------- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala index 26349f4..0e6a340 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala @@ -388,7 +388,7 @@ class EdgePartition[ val aggregates = new Array[A](vertexAttrs.length) val bitset = new BitSet(vertexAttrs.length) - var ctx = new AggregatingEdgeContext[VD, ED, A](mergeMsg, aggregates, bitset) + val ctx = new AggregatingEdgeContext[VD, ED, A](mergeMsg, aggregates, bitset) var i = 0 while (i < size) { val localSrcId = localSrcIds(i) @@ -433,7 +433,7 @@ class EdgePartition[ val aggregates = new Array[A](vertexAttrs.length) val bitset = new BitSet(vertexAttrs.length) - var ctx = new AggregatingEdgeContext[VD, ED, A](mergeMsg, aggregates, bitset) + val ctx = new AggregatingEdgeContext[VD, ED, A](mergeMsg, aggregates, bitset) index.iterator.foreach { cluster => val clusterSrcId = cluster._1 val clusterPos = cluster._2 http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala ---------------------------------------------------------------------- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index 5d2a537..34e1253 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -74,7 +74,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( override def getCheckpointFiles: Seq[String] = { Seq(vertices.getCheckpointFile, replicatedVertexView.edges.getCheckpointFile).flatMap { case Some(path) => Seq(path) - case None => Seq() + case None => Seq.empty } } http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala ---------------------------------------------------------------------- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala index 2b3e5f9..4197311 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala @@ -18,6 +18,7 @@ package org.apache.spark.graphx.util import scala.annotation.tailrec +import scala.collection.mutable import scala.reflect.ClassTag import scala.util._ @@ -133,7 +134,7 @@ object GraphGenerators extends Logging { throw new IllegalArgumentException( s"numEdges must be <= $numEdgesUpperBound but was $numEdges") } - var edges: Set[Edge[Int]] = Set() + var edges = mutable.Set.empty[Edge[Int]] while (edges.size < numEdges) { if (edges.size % 100 == 0) { logDebug(edges.size + " edges") http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index b234bc4..65b09e5 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -736,7 +736,7 @@ class LogisticRegression @Since("1.2.0") ( b_k' = b_k - \mean(b_k) }}} */ - val rawIntercepts = histogram.map(c => math.log(c + 1)) // add 1 for smoothing + val rawIntercepts = histogram.map(math.log1p) // add 1 for smoothing (log1p(x) = log(1+x)) val rawMean = rawIntercepts.sum / rawIntercepts.length rawIntercepts.indices.foreach { i => initialCoefWithInterceptMatrix.update(i, numFeatures, rawIntercepts(i) - rawMean) @@ -820,7 +820,7 @@ class LogisticRegression @Since("1.2.0") ( val interceptVec = if ($(fitIntercept) || !isMultinomial) { Vectors.zeros(numCoefficientSets) } else { - Vectors.sparse(numCoefficientSets, Seq()) + Vectors.sparse(numCoefficientSets, Seq.empty) } // separate intercepts and coefficients from the combined matrix allCoefMatrix.foreachActive { (classIndex, featureIndex, value) => http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala index 2dd565a..32835fb 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala @@ -99,7 +99,7 @@ private[ml] case class ParsedRFormula(label: ColumnRef, terms: Seq[Term]) { }).map(_.distinct) // Deduplicates feature interactions, for example, a:b is the same as b:a. - var seen = mutable.Set[Set[String]]() + val seen = mutable.Set[Set[String]]() validInteractions.flatMap { case t if seen.contains(t.toSet) => None http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index 91cd229..ccc61fe 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -286,7 +286,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String s"training is not needed.") } if (handlePersistence) instances.unpersist() - val coefficients = Vectors.sparse(numFeatures, Seq()) + val coefficients = Vectors.sparse(numFeatures, Seq.empty) val intercept = yMean val model = copyValues(new LinearRegressionModel(uid, coefficients, intercept)) http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index 98e50c5..49043b5 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -363,7 +363,7 @@ class KMeans private ( // to their squared distance from the centers. Note that only distances between points // and new centers are computed in each iteration. var step = 0 - var bcNewCentersList = ArrayBuffer[Broadcast[_]]() + val bcNewCentersList = ArrayBuffer[Broadcast[_]]() while (step < initializationSteps) { val bcNewCenters = data.context.broadcast(newCenters) bcNewCentersList += bcNewCenters http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala index 7695aab..c7c1a54 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala @@ -78,13 +78,13 @@ private[mllib] object EigenValueDecomposition { require(n * ncv.toLong <= Integer.MAX_VALUE && ncv * (ncv.toLong + 8) <= Integer.MAX_VALUE, s"k = $k and/or n = $n are too large to compute an eigendecomposition") - var ido = new intW(0) - var info = new intW(0) - var resid = new Array[Double](n) - var v = new Array[Double](n * ncv) - var workd = new Array[Double](n * 3) - var workl = new Array[Double](ncv * (ncv + 8)) - var ipntr = new Array[Int](11) + val ido = new intW(0) + val info = new intW(0) + val resid = new Array[Double](n) + val v = new Array[Double](n * ncv) + val workd = new Array[Double](n * 3) + val workl = new Array[Double](ncv * (ncv + 8)) + val ipntr = new Array[Int](11) // call ARPACK's reverse communication, first iteration with ido = 0 arpack.dsaupd(ido, bmat, n, which, nev.`val`, tolW, resid, ncv, v, n, iparam, ipntr, workd, http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala index efedebe..21ec287 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala @@ -257,7 +257,7 @@ object LBFGS extends Logging { (denseGrad1, loss1 + loss2) } - val zeroSparseVector = Vectors.sparse(n, Seq()) + val zeroSparseVector = Vectors.sparse(n, Seq.empty) val (gradientSum, lossSum) = data.treeAggregate((zeroSparseVector, 0.0))(seqOp, combOp) // broadcasted model is not needed anymore http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala index b760347..ee51d33 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala @@ -57,7 +57,7 @@ private[stat] object SpearmanCorrelation extends Correlation with Logging { var preCol = -1 var preVal = Double.NaN var startRank = -1.0 - var cachedUids = ArrayBuffer.empty[Long] + val cachedUids = ArrayBuffer.empty[Long] val flush: () => Iterable[(Long, (Int, Double))] = () => { val averageRank = startRank + (cachedUids.size - 1) / 2.0 val output = cachedUids.map { uid => http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/mllib/src/main/scala/org/apache/spark/mllib/stat/test/StreamingTest.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/StreamingTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/StreamingTest.scala index 551ea35..80c6ef0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/StreamingTest.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/StreamingTest.scala @@ -133,7 +133,7 @@ class StreamingTest @Since("1.6.0") () extends Logging with Serializable { if (time.milliseconds > data.slideDuration.milliseconds * peacePeriod) { rdd } else { - data.context.sparkContext.parallelize(Seq()) + data.context.sparkContext.parallelize(Seq.empty) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/project/SparkBuild.scala ---------------------------------------------------------------------- diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 41f3a04..b9db1df 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -589,7 +589,7 @@ object PySparkAssembly { val zipFile = new File(BuildCommons.sparkHome , "python/lib/pyspark.zip") zipFile.delete() zipRecursive(src, zipFile) - Seq[File]() + Seq.empty[File] }).value ) @@ -810,7 +810,7 @@ object TestSettings { require(d.mkdir() || d.isDirectory(), s"Failed to create directory $d") } } - Seq[File]() + Seq.empty[File] }).value, concurrentRestrictions in Global += Tags.limit(Tags.Test, 1), // Remove certain packages from Scaladoc http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala index a6bb5d5..022191d 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala @@ -112,7 +112,7 @@ private[ui] class DriverPage(parent: MesosClusterUI) extends WebUIPage("driver") <td>Last Task Status</td> <td>{state.mesosTaskStatus.map(_.toString).getOrElse("")}</td> </tr> - }.getOrElse(Seq[Node]()) + }.getOrElse(Seq.empty[Node]) } private def propertiesRow(properties: collection.Map[String, String]): Seq[Node] = { @@ -175,6 +175,6 @@ private[ui] class DriverPage(parent: MesosClusterUI) extends WebUIPage("driver") {state.retries} </td> </tr> - }.getOrElse(Seq[Node]()) + }.getOrElse(Seq.empty[Node]) } } http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index 062ed1f..7ec116c 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -333,7 +333,7 @@ trait MesosSchedulerUtils extends Logging { try { splitter.split(constraintsVal).asScala.toMap.mapValues(v => if (v == null || v.isEmpty) { - Set[String]() + Set.empty[String] } else { v.split(',').toSet } http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 7745709..501e7e3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2334,8 +2334,9 @@ object TimeWindowing extends Rule[LogicalPlan] { val windowExpressions = p.expressions.flatMap(_.collect { case t: TimeWindow => t }).toSet + val numWindowExpr = windowExpressions.size // Only support a single window expression for now - if (windowExpressions.size == 1 && + if (numWindowExpr == 1 && windowExpressions.head.timeColumn.resolved && windowExpressions.head.checkInputDataTypes().isSuccess) { @@ -2402,7 +2403,7 @@ object TimeWindowing extends Rule[LogicalPlan] { renamedPlan.withNewChildren(substitutedPlan :: Nil) } - } else if (windowExpressions.size > 1) { + } else if (numWindowExpr > 1) { p.failAnalysis("Multiple time window expressions would result in a cartesian product " + "of rows, therefore they are currently not supported.") } else { http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala index c863ba4..83a23cc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala @@ -228,10 +228,10 @@ case class ArrayContains(left: Expression, right: Expression) override def dataType: DataType = BooleanType override def inputTypes: Seq[AbstractDataType] = right.dataType match { - case NullType => Seq() + case NullType => Seq.empty case _ => left.dataType match { case n @ ArrayType(element, _) => Seq(n, element) - case _ => Seq() + case _ => Seq.empty } } http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala index db7baf6..064ca68 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala @@ -150,7 +150,7 @@ object JoinReorderDP extends PredicateHelper with Logging { // Create the initial plans: each plan is a single item with zero cost. val itemIndex = items.zipWithIndex val foundPlans = mutable.Buffer[JoinPlanMap](itemIndex.map { - case (item, id) => Set(id) -> JoinPlan(Set(id), item, Set(), Cost(0, 0)) + case (item, id) => Set(id) -> JoinPlan(Set(id), item, Set.empty, Cost(0, 0)) }.toMap) // Build filters from the join graph to be used by the search algorithm. http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala index 6c83f47..79a6c86 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala @@ -134,7 +134,7 @@ object ReorderAssociativeOperator extends Rule[LogicalPlan] { private def collectGroupingExpressions(plan: LogicalPlan): ExpressionSet = plan match { case Aggregate(groupingExpressions, aggregateExpressions, child) => ExpressionSet.apply(groupingExpressions) - case _ => ExpressionSet(Seq()) + case _ => ExpressionSet(Seq.empty) } def apply(plan: LogicalPlan): LogicalPlan = plan transform { http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index ad359e7..45c1d3d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -877,7 +877,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging // Reverse the contexts to have them in the same sequence as in the SQL statement & turn them // into expressions. - val expressions = contexts.reverse.map(expression) + val expressions = contexts.reverseMap(expression) // Create a balanced tree. def reduceToExpressionTree(low: Int, high: Int): Expression = high - low match { http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 7f370fb..8d034c2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -173,7 +173,7 @@ object ExtractFiltersAndInnerJoins extends PredicateHelper { val (plans, conditions) = flattenJoin(j) (plans, conditions ++ splitConjunctivePredicates(filterCondition)) - case _ => (Seq((plan, parentJoinType)), Seq()) + case _ => (Seq((plan, parentJoinType)), Seq.empty) } def unapply(plan: LogicalPlan): Option[(Seq[(LogicalPlan, InnerLike)], Seq[Expression])] http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala index e13db85..74820eb 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala @@ -47,7 +47,7 @@ case class FilterEstimation(plan: Filter) extends Logging { // Estimate selectivity of this filter predicate, and update column stats if needed. // For not-supported condition, set filter selectivity to a conservative estimate 100% - val filterSelectivity = calculateFilterSelectivity(plan.condition).getOrElse(BigDecimal(1.0)) + val filterSelectivity = calculateFilterSelectivity(plan.condition).getOrElse(BigDecimal(1)) val filteredRowCount: BigInt = ceil(BigDecimal(childStats.rowCount.get) * filterSelectivity) val newColStats = if (filteredRowCount == 0) { @@ -83,13 +83,13 @@ case class FilterEstimation(plan: Filter) extends Logging { : Option[BigDecimal] = { condition match { case And(cond1, cond2) => - val percent1 = calculateFilterSelectivity(cond1, update).getOrElse(BigDecimal(1.0)) - val percent2 = calculateFilterSelectivity(cond2, update).getOrElse(BigDecimal(1.0)) + val percent1 = calculateFilterSelectivity(cond1, update).getOrElse(BigDecimal(1)) + val percent2 = calculateFilterSelectivity(cond2, update).getOrElse(BigDecimal(1)) Some(percent1 * percent2) case Or(cond1, cond2) => - val percent1 = calculateFilterSelectivity(cond1, update = false).getOrElse(BigDecimal(1.0)) - val percent2 = calculateFilterSelectivity(cond2, update = false).getOrElse(BigDecimal(1.0)) + val percent1 = calculateFilterSelectivity(cond1, update = false).getOrElse(BigDecimal(1)) + val percent2 = calculateFilterSelectivity(cond2, update = false).getOrElse(BigDecimal(1)) Some(percent1 + percent2 - (percent1 * percent2)) // Not-operator pushdown @@ -464,7 +464,7 @@ case class FilterEstimation(plan: Filter) extends Logging { (numericLiteral > max, numericLiteral <= min) } - var percent = BigDecimal(1.0) + var percent = BigDecimal(1) if (noOverlap) { percent = 0.0 } else if (completeOverlap) { @@ -630,7 +630,7 @@ case class FilterEstimation(plan: Filter) extends Logging { ) } - var percent = BigDecimal(1.0) + var percent = BigDecimal(1) if (noOverlap) { percent = 0.0 } else if (completeOverlap) { http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathExpressionsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathExpressionsSuite.scala index f4d5a44..9ee7775 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathExpressionsSuite.scala @@ -609,9 +609,9 @@ class MathExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(BRound(floatPi, scale), floatResults(i), EmptyRow) } - val bdResults: Seq[BigDecimal] = Seq(BigDecimal(3.0), BigDecimal(3.1), BigDecimal(3.14), - BigDecimal(3.142), BigDecimal(3.1416), BigDecimal(3.14159), - BigDecimal(3.141593), BigDecimal(3.1415927)) + val bdResults: Seq[BigDecimal] = Seq(BigDecimal(3), BigDecimal("3.1"), BigDecimal("3.14"), + BigDecimal("3.142"), BigDecimal("3.1416"), BigDecimal("3.14159"), + BigDecimal("3.141593"), BigDecimal("3.1415927")) (0 to 7).foreach { i => checkEvaluation(Round(bdPi, i), bdResults(i), EmptyRow) http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala index 144f3d6..3193d13 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala @@ -109,8 +109,8 @@ class DecimalSuite extends SparkFunSuite with PrivateMethodTester { test("small decimals represented as unscaled long") { checkCompact(new Decimal(), true) - checkCompact(Decimal(BigDecimal(10.03)), false) - checkCompact(Decimal(BigDecimal(1e20)), false) + checkCompact(Decimal(BigDecimal("10.03")), false) + checkCompact(Decimal(BigDecimal("100000000000000000000")), false) checkCompact(Decimal(17L), true) checkCompact(Decimal(17), true) checkCompact(Decimal(17L, 2, 1), true) http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index ba7ca84..dae160f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -671,11 +671,11 @@ case class AlterTableRecoverPartitionsCommand( } else { logWarning( s"expected partition column ${partitionNames.head}, but got ${ps(0)}, ignoring it") - Seq() + Seq.empty } } else { logWarning(s"ignore ${new Path(path, name)}") - Seq() + Seq.empty } } } http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala index a521fd1..658d137 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala @@ -23,7 +23,6 @@ import org.apache.spark.internal.Logging import org.apache.spark.Partition import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession, SQLContext} -import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.jdbc.JdbcDialects import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType @@ -81,7 +80,7 @@ private[sql] object JDBCRelation extends Logging { val column = partitioning.column var i: Int = 0 var currentValue: Long = lowerBound - var ans = new ArrayBuffer[Partition]() + val ans = new ArrayBuffer[Partition]() while (i < numPartitions) { val lBound = if (i != 0) s"$column >= $currentValue" else null currentValue += stride http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 87fbf8b..64eea26 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -220,7 +220,7 @@ class ParquetFileFormat val needMerged: Seq[FileStatus] = if (mergeRespectSummaries) { - Seq() + Seq.empty } else { filesByType.data } http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala index 3439181..9e4e02b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala @@ -35,8 +35,6 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalGroupState import org.apache.spark.sql.execution.streaming.GroupStateImpl import org.apache.spark.sql.streaming.GroupStateTimeout import org.apache.spark.sql.types._ -import org.apache.spark.util.Utils - /** * Physical version of `ObjectProducer`. @@ -403,8 +401,7 @@ case class FlatMapGroupsInRExec( Seq(groupingAttributes.map(SortOrder(_, Ascending))) override protected def doExecute(): RDD[InternalRow] = { - val isSerializedRData = - if (outputSchema == SERIALIZED_R_DATA_SCHEMA) true else false + val isSerializedRData = outputSchema == SERIALIZED_R_DATA_SCHEMA val serializerForR = if (!isSerializedRData) { SerializationFormats.ROW } else { http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala index d2178e9..b9835c7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala @@ -34,8 +34,7 @@ case class MapPartitionsRWrapper( outputSchema: StructType) extends (Iterator[Any] => Iterator[Any]) { def apply(iter: Iterator[Any]): Iterator[Any] = { // If the content of current DataFrame is serialized R data? - val isSerializedRData = - if (inputSchema == SERIALIZED_R_DATA_SCHEMA) true else false + val isSerializedRData = inputSchema == SERIALIZED_R_DATA_SCHEMA val (newIter, deserializer, colNames) = if (!isSerializedRData) { http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala index 408c8f8..e37033b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala @@ -170,12 +170,8 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( private def compact(batchId: Long, logs: Array[T]): Boolean = { val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval) val allLogs = validBatches.flatMap(batchId => super.get(batchId)).flatten ++ logs - if (super.add(batchId, compactLogs(allLogs).toArray)) { - true - } else { - // Return false as there is another writer. - false - } + // Return false as there is another writer. + super.add(batchId, compactLogs(allLogs).toArray) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/e26dac5f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 5ee596e..5711262 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -609,7 +609,7 @@ class StreamExecution( } // A list of attributes that will need to be updated. - var replacements = new ArrayBuffer[(Attribute, Attribute)] + val replacements = new ArrayBuffer[(Attribute, Attribute)] // Replace sources in the logical plan with data that has arrived since the last batch. val withNewSources = logicalPlan transform { case StreamingExecutionRelation(source, output) => --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
