Repository: spark Updated Branches: refs/heads/master 96136f222 -> 6e9ef10fd
[SPARK-4277] Support external shuffle service on Standalone Worker Author: Aaron Davidson <[email protected]> Closes #3142 from aarondav/worker and squashes the following commits: 3780bd7 [Aaron Davidson] Address comments 2dcdfc1 [Aaron Davidson] Add private[worker] 47f49d3 [Aaron Davidson] NettyBlockTransferService shouldn't care about app ids (it's only b/t executors) 258417c [Aaron Davidson] [SPARK-4277] Support external shuffle service on executor Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6e9ef10f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6e9ef10f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6e9ef10f Branch: refs/heads/master Commit: 6e9ef10fd7446a11f37446c961916ba2a8e02cb8 Parents: 96136f2 Author: Aaron Davidson <[email protected]> Authored: Thu Nov 6 17:20:46 2014 -0800 Committer: Andrew Or <[email protected]> Committed: Thu Nov 6 17:20:46 2014 -0800 ---------------------------------------------------------------------- .../org/apache/spark/SecurityManager.scala | 14 +---- .../worker/StandaloneWorkerShuffleService.scala | 66 ++++++++++++++++++++ .../org/apache/spark/deploy/worker/Worker.scala | 8 ++- .../storage/ShuffleBlockFetcherIterator.scala | 2 +- .../netty/NettyBlockTransferSecuritySuite.scala | 12 ---- .../apache/spark/network/sasl/SaslMessage.java | 3 +- 6 files changed, 79 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/6e9ef10f/core/src/main/scala/org/apache/spark/SecurityManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index dee935f..dbff9d1 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -343,15 +343,7 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging with */ def getSecretKey(): String = secretKey - override def getSaslUser(appId: String): String = { - val myAppId = sparkConf.getAppId - require(appId == myAppId, s"SASL appId $appId did not match my appId ${myAppId}") - getSaslUser() - } - - override def getSecretKey(appId: String): String = { - val myAppId = sparkConf.getAppId - require(appId == myAppId, s"SASL appId $appId did not match my appId ${myAppId}") - getSecretKey() - } + // Default SecurityManager only has a single secret key, so ignore appId. + override def getSaslUser(appId: String): String = getSaslUser() + override def getSecretKey(appId: String): String = getSecretKey() } http://git-wip-us.apache.org/repos/asf/spark/blob/6e9ef10f/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala new file mode 100644 index 0000000..88118e2 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.worker + +import org.apache.spark.{Logging, SparkConf, SecurityManager} +import org.apache.spark.network.TransportContext +import org.apache.spark.network.netty.SparkTransportConf +import org.apache.spark.network.sasl.SaslRpcHandler +import org.apache.spark.network.server.TransportServer +import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler + +/** + * Provides a server from which Executors can read shuffle files (rather than reading directly from + * each other), to provide uninterrupted access to the files in the face of executors being turned + * off or killed. + * + * Optionally requires SASL authentication in order to read. See [[SecurityManager]]. + */ +private[worker] +class StandaloneWorkerShuffleService(sparkConf: SparkConf, securityManager: SecurityManager) + extends Logging { + + private val enabled = sparkConf.getBoolean("spark.shuffle.service.enabled", false) + private val port = sparkConf.getInt("spark.shuffle.service.port", 7337) + private val useSasl: Boolean = securityManager.isAuthenticationEnabled() + + private val transportConf = SparkTransportConf.fromSparkConf(sparkConf) + private val blockHandler = new ExternalShuffleBlockHandler() + private val transportContext: TransportContext = { + val handler = if (useSasl) new SaslRpcHandler(blockHandler, securityManager) else blockHandler + new TransportContext(transportConf, handler) + } + + private var server: TransportServer = _ + + /** Starts the external shuffle service if the user has configured us to. */ + def startIfEnabled() { + if (enabled) { + require(server == null, "Shuffle server already started") + logInfo(s"Starting shuffle service on port $port with useSasl = $useSasl") + server = transportContext.createServer(port) + } + } + + def stop() { + if (enabled && server != null) { + server.close() + server = null + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/6e9ef10f/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index f1f66d0..ca262de 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -111,6 +111,9 @@ private[spark] class Worker( val drivers = new HashMap[String, DriverRunner] val finishedDrivers = new HashMap[String, DriverRunner] + // The shuffle service is not actually started unless configured. + val shuffleService = new StandaloneWorkerShuffleService(conf, securityMgr) + val publicAddress = { val envVar = System.getenv("SPARK_PUBLIC_DNS") if (envVar != null) envVar else host @@ -154,6 +157,7 @@ private[spark] class Worker( logInfo("Spark home: " + sparkHome) createWorkDir() context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) + shuffleService.startIfEnabled() webUi = new WorkerWebUI(this, workDir, webUiPort) webUi.bind() registerWithMaster() @@ -419,6 +423,7 @@ private[spark] class Worker( registrationRetryTimer.foreach(_.cancel()) executors.values.foreach(_.kill()) drivers.values.foreach(_.kill()) + shuffleService.stop() webUi.stop() metricsSystem.stop() } @@ -441,7 +446,8 @@ private[spark] object Worker extends Logging { cores: Int, memory: Int, masterUrls: Array[String], - workDir: String, workerNumber: Option[Int] = None): (ActorSystem, Int) = { + workDir: String, + workerNumber: Option[Int] = None): (ActorSystem, Int) = { // The LocalSparkCluster runs multiple local sparkWorkerX actor systems val conf = new SparkConf http://git-wip-us.apache.org/repos/asf/spark/blob/6e9ef10f/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 1e57918..6b1f57a 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -92,7 +92,7 @@ final class ShuffleBlockFetcherIterator( * Current [[FetchResult]] being processed. We track this so we can release the current buffer * in case of a runtime exception when processing the current buffer. */ - private[this] var currentResult: FetchResult = null + @volatile private[this] var currentResult: FetchResult = null /** * Queue of fetch requests to issue; we'll pull requests off this gradually to make sure that http://git-wip-us.apache.org/repos/asf/spark/blob/6e9ef10f/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala index bed0ed9..9162ec9 100644 --- a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala +++ b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala @@ -89,18 +89,6 @@ class NettyBlockTransferSecuritySuite extends FunSuite with MockitoSugar with Sh } } - test("security mismatch app ids") { - val conf0 = new SparkConf() - .set("spark.authenticate", "true") - .set("spark.authenticate.secret", "good") - .set("spark.app.id", "app-id") - val conf1 = conf0.clone.set("spark.app.id", "other-id") - testConnection(conf0, conf1) match { - case Success(_) => fail("Should have failed") - case Failure(t) => t.getMessage should include ("SASL appId app-id did not match") - } - } - /** * Creates two servers with different configurations and sees if they can talk. * Returns Success() if they can transfer a block, and Failure() if the block transfer was failed http://git-wip-us.apache.org/repos/asf/spark/blob/6e9ef10f/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslMessage.java ---------------------------------------------------------------------- diff --git a/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslMessage.java b/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslMessage.java index 5b77e18..599cc64 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslMessage.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslMessage.java @@ -58,7 +58,8 @@ class SaslMessage implements Encodable { public static SaslMessage decode(ByteBuf buf) { if (buf.readByte() != TAG_BYTE) { - throw new IllegalStateException("Expected SaslMessage, received something else"); + throw new IllegalStateException("Expected SaslMessage, received something else" + + " (maybe your client does not have SASL enabled?)"); } int idLength = buf.readInt(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
