Repository: spark Updated Branches: refs/heads/master 7bf9b1201 -> c3e23bc0c
[SPARK-10653][CORE] Remove unnecessary things from SparkEnv ## What changes were proposed in this pull request? Removed blockTransferService and sparkFilesDir from SparkEnv since they're rarely used and don't need to be in stored in the env. Edited their few usages to accommodate the change. ## How was this patch tested? ran dev/run-tests locally Author: Alex Bozarth <[email protected]> Closes #12970 from ajbozarth/spark10653. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c3e23bc0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c3e23bc0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c3e23bc0 Branch: refs/heads/master Commit: c3e23bc0c3e87546d0575c3c4c45a2b0e2dfec6a Parents: 7bf9b12 Author: Alex Bozarth <[email protected]> Authored: Mon May 9 11:51:37 2016 -0700 Committer: Andrew Or <[email protected]> Committed: Mon May 9 11:51:37 2016 -0700 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/SparkEnv.scala | 26 ++++---------------- .../scala/org/apache/spark/SparkFiles.scala | 2 +- .../org/apache/spark/storage/BlockManager.scala | 2 +- .../org/apache/spark/DistributedSuite.scala | 2 +- project/MimaExcludes.scala | 4 +++ 5 files changed, 12 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c3e23bc0/core/src/main/scala/org/apache/spark/SparkEnv.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 4bf8890..af50a6d 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -31,7 +31,6 @@ import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.internal.Logging import org.apache.spark.memory.{MemoryManager, StaticMemoryManager, UnifiedMemoryManager} import org.apache.spark.metrics.MetricsSystem -import org.apache.spark.network.BlockTransferService import org.apache.spark.network.netty.NettyBlockTransferService import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv} import org.apache.spark.scheduler.{LiveListenerBus, OutputCommitCoordinator} @@ -61,10 +60,8 @@ class SparkEnv ( val mapOutputTracker: MapOutputTracker, val shuffleManager: ShuffleManager, val broadcastManager: BroadcastManager, - val blockTransferService: BlockTransferService, val blockManager: BlockManager, val securityManager: SecurityManager, - val sparkFilesDir: String, val metricsSystem: MetricsSystem, val memoryManager: MemoryManager, val outputCommitCoordinator: OutputCommitCoordinator, @@ -77,7 +74,7 @@ class SparkEnv ( // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats). private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]() - private var driverTmpDirToDelete: Option[String] = None + private[spark] var driverTmpDir: Option[String] = None private[spark] def stop() { @@ -94,13 +91,10 @@ class SparkEnv ( rpcEnv.shutdown() rpcEnv.awaitTermination() - // Note that blockTransferService is stopped by BlockManager since it is started by it. - // If we only stop sc, but the driver process still run as a services then we need to delete // the tmp dir, if not, it will create too many tmp dirs. - // We only need to delete the tmp dir create by driver, because sparkFilesDir is point to the - // current working dir in executor which we do not need to delete. - driverTmpDirToDelete match { + // We only need to delete the tmp dir create by driver + driverTmpDir match { case Some(path) => try { Utils.deleteRecursively(new File(path)) @@ -342,15 +336,6 @@ object SparkEnv extends Logging { ms } - // Set the sparkFiles directory, used when downloading dependencies. In local mode, - // this is a temporary directory; in distributed mode, this is the executor's current working - // directory. - val sparkFilesDir: String = if (isDriver) { - Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath - } else { - "." - } - val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse { new OutputCommitCoordinator(conf, isDriver) } @@ -367,10 +352,8 @@ object SparkEnv extends Logging { mapOutputTracker, shuffleManager, broadcastManager, - blockTransferService, blockManager, securityManager, - sparkFilesDir, metricsSystem, memoryManager, outputCommitCoordinator, @@ -380,7 +363,8 @@ object SparkEnv extends Logging { // called, and we only need to do it for driver. Because driver may run as a service, and if we // don't delete this tmp dir when sc is stopped, then will create too many tmp dirs. if (isDriver) { - envInstance.driverTmpDirToDelete = Some(sparkFilesDir) + val sparkFilesDir = Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath + envInstance.driverTmpDir = Some(sparkFilesDir) } envInstance http://git-wip-us.apache.org/repos/asf/spark/blob/c3e23bc0/core/src/main/scala/org/apache/spark/SparkFiles.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkFiles.scala b/core/src/main/scala/org/apache/spark/SparkFiles.scala index e85b89f..44f4444 100644 --- a/core/src/main/scala/org/apache/spark/SparkFiles.scala +++ b/core/src/main/scala/org/apache/spark/SparkFiles.scala @@ -34,6 +34,6 @@ object SparkFiles { * Get the root directory that contains files added through `SparkContext.addFile()`. */ def getRootDirectory(): String = - SparkEnv.get.sparkFilesDir + SparkEnv.get.driverTmpDir.getOrElse(".") } http://git-wip-us.apache.org/repos/asf/spark/blob/c3e23bc0/core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index f2d06c7..c56e451 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -65,7 +65,7 @@ private[spark] class BlockManager( memoryManager: MemoryManager, mapOutputTracker: MapOutputTracker, shuffleManager: ShuffleManager, - blockTransferService: BlockTransferService, + val blockTransferService: BlockTransferService, securityManager: SecurityManager, numUsableCores: Int) extends BlockDataManager with BlockEvictionHandler with Logging { http://git-wip-us.apache.org/repos/asf/spark/blob/c3e23bc0/core/src/test/scala/org/apache/spark/DistributedSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala index a0086e1..0be25e9 100644 --- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala @@ -196,7 +196,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex val blockIds = data.partitions.indices.map(index => RDDBlockId(data.id, index)).toArray val blockId = blockIds(0) val blockManager = SparkEnv.get.blockManager - val blockTransfer = SparkEnv.get.blockTransferService + val blockTransfer = blockManager.blockTransferService val serializerManager = SparkEnv.get.serializerManager blockManager.master.getLocations(blockId).foreach { cmId => val bytes = blockTransfer.fetchBlockSync(cmId.host, cmId.port, cmId.executorId, http://git-wip-us.apache.org/repos/asf/spark/blob/c3e23bc0/project/MimaExcludes.scala ---------------------------------------------------------------------- diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 33e0db6..a5d57e1 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -694,6 +694,10 @@ object MimaExcludes { ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.classification.LogisticRegressionModel.weights"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.regression.LinearRegressionModel.weights") ) ++ Seq( + // [SPARK-10653] [Core] Remove unnecessary things from SparkEnv + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkEnv.sparkFilesDir"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkEnv.blockTransferService") + ) ++ Seq( // SPARK-14654: New accumulator API ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ExceptionFailure$"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ExceptionFailure.apply"), --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
