Repository: spark Updated Branches: refs/heads/branch-2.0 e3f000a36 -> 40d24686a
[SPARK-10653][CORE] Remove unnecessary things from SparkEnv ## What changes were proposed in this pull request? Removed blockTransferService and sparkFilesDir from SparkEnv since they're rarely used and don't need to be in stored in the env. Edited their few usages to accommodate the change. ## How was this patch tested? ran dev/run-tests locally Author: Alex Bozarth <[email protected]> Closes #12970 from ajbozarth/spark10653. (cherry picked from commit c3e23bc0c3e87546d0575c3c4c45a2b0e2dfec6a) Signed-off-by: Andrew Or <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/40d24686 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/40d24686 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/40d24686 Branch: refs/heads/branch-2.0 Commit: 40d24686aecc6b655b497b01303d0fdd4d4d480f Parents: e3f000a Author: Alex Bozarth <[email protected]> Authored: Mon May 9 11:51:37 2016 -0700 Committer: Andrew Or <[email protected]> Committed: Mon May 9 11:51:47 2016 -0700 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/SparkEnv.scala | 26 ++++---------------- .../scala/org/apache/spark/SparkFiles.scala | 2 +- .../org/apache/spark/storage/BlockManager.scala | 2 +- .../org/apache/spark/DistributedSuite.scala | 2 +- project/MimaExcludes.scala | 4 +++ 5 files changed, 12 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/40d24686/core/src/main/scala/org/apache/spark/SparkEnv.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 4bf8890..af50a6d 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -31,7 +31,6 @@ import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.internal.Logging import org.apache.spark.memory.{MemoryManager, StaticMemoryManager, UnifiedMemoryManager} import org.apache.spark.metrics.MetricsSystem -import org.apache.spark.network.BlockTransferService import org.apache.spark.network.netty.NettyBlockTransferService import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv} import org.apache.spark.scheduler.{LiveListenerBus, OutputCommitCoordinator} @@ -61,10 +60,8 @@ class SparkEnv ( val mapOutputTracker: MapOutputTracker, val shuffleManager: ShuffleManager, val broadcastManager: BroadcastManager, - val blockTransferService: BlockTransferService, val blockManager: BlockManager, val securityManager: SecurityManager, - val sparkFilesDir: String, val metricsSystem: MetricsSystem, val memoryManager: MemoryManager, val outputCommitCoordinator: OutputCommitCoordinator, @@ -77,7 +74,7 @@ class SparkEnv ( // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats). private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]() - private var driverTmpDirToDelete: Option[String] = None + private[spark] var driverTmpDir: Option[String] = None private[spark] def stop() { @@ -94,13 +91,10 @@ class SparkEnv ( rpcEnv.shutdown() rpcEnv.awaitTermination() - // Note that blockTransferService is stopped by BlockManager since it is started by it. - // If we only stop sc, but the driver process still run as a services then we need to delete // the tmp dir, if not, it will create too many tmp dirs. - // We only need to delete the tmp dir create by driver, because sparkFilesDir is point to the - // current working dir in executor which we do not need to delete. - driverTmpDirToDelete match { + // We only need to delete the tmp dir create by driver + driverTmpDir match { case Some(path) => try { Utils.deleteRecursively(new File(path)) @@ -342,15 +336,6 @@ object SparkEnv extends Logging { ms } - // Set the sparkFiles directory, used when downloading dependencies. In local mode, - // this is a temporary directory; in distributed mode, this is the executor's current working - // directory. - val sparkFilesDir: String = if (isDriver) { - Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath - } else { - "." - } - val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse { new OutputCommitCoordinator(conf, isDriver) } @@ -367,10 +352,8 @@ object SparkEnv extends Logging { mapOutputTracker, shuffleManager, broadcastManager, - blockTransferService, blockManager, securityManager, - sparkFilesDir, metricsSystem, memoryManager, outputCommitCoordinator, @@ -380,7 +363,8 @@ object SparkEnv extends Logging { // called, and we only need to do it for driver. Because driver may run as a service, and if we // don't delete this tmp dir when sc is stopped, then will create too many tmp dirs. if (isDriver) { - envInstance.driverTmpDirToDelete = Some(sparkFilesDir) + val sparkFilesDir = Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath + envInstance.driverTmpDir = Some(sparkFilesDir) } envInstance http://git-wip-us.apache.org/repos/asf/spark/blob/40d24686/core/src/main/scala/org/apache/spark/SparkFiles.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkFiles.scala b/core/src/main/scala/org/apache/spark/SparkFiles.scala index e85b89f..44f4444 100644 --- a/core/src/main/scala/org/apache/spark/SparkFiles.scala +++ b/core/src/main/scala/org/apache/spark/SparkFiles.scala @@ -34,6 +34,6 @@ object SparkFiles { * Get the root directory that contains files added through `SparkContext.addFile()`. */ def getRootDirectory(): String = - SparkEnv.get.sparkFilesDir + SparkEnv.get.driverTmpDir.getOrElse(".") } http://git-wip-us.apache.org/repos/asf/spark/blob/40d24686/core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index f2d06c7..c56e451 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -65,7 +65,7 @@ private[spark] class BlockManager( memoryManager: MemoryManager, mapOutputTracker: MapOutputTracker, shuffleManager: ShuffleManager, - blockTransferService: BlockTransferService, + val blockTransferService: BlockTransferService, securityManager: SecurityManager, numUsableCores: Int) extends BlockDataManager with BlockEvictionHandler with Logging { http://git-wip-us.apache.org/repos/asf/spark/blob/40d24686/core/src/test/scala/org/apache/spark/DistributedSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala index a0086e1..0be25e9 100644 --- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala @@ -196,7 +196,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex val blockIds = data.partitions.indices.map(index => RDDBlockId(data.id, index)).toArray val blockId = blockIds(0) val blockManager = SparkEnv.get.blockManager - val blockTransfer = SparkEnv.get.blockTransferService + val blockTransfer = blockManager.blockTransferService val serializerManager = SparkEnv.get.serializerManager blockManager.master.getLocations(blockId).foreach { cmId => val bytes = blockTransfer.fetchBlockSync(cmId.host, cmId.port, cmId.executorId, http://git-wip-us.apache.org/repos/asf/spark/blob/40d24686/project/MimaExcludes.scala ---------------------------------------------------------------------- diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 33e0db6..a5d57e1 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -694,6 +694,10 @@ object MimaExcludes { ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.classification.LogisticRegressionModel.weights"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.regression.LinearRegressionModel.weights") ) ++ Seq( + // [SPARK-10653] [Core] Remove unnecessary things from SparkEnv + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkEnv.sparkFilesDir"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkEnv.blockTransferService") + ) ++ Seq( // SPARK-14654: New accumulator API ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ExceptionFailure$"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ExceptionFailure.apply"), --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
