Repository: spark Updated Branches: refs/heads/master 40566e10a -> 7edbea41b
http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala index c1e8b29..96a5a12 100644 --- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala @@ -18,21 +18,22 @@ package org.apache.spark.metrics import org.scalatest.{BeforeAndAfter, FunSuite} - -import org.apache.spark.SparkConf +import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.deploy.master.MasterSource class MetricsSystemSuite extends FunSuite with BeforeAndAfter { var filePath: String = _ var conf: SparkConf = null + var securityMgr: SecurityManager = null before { filePath = getClass.getClassLoader.getResource("test_metrics_system.properties").getFile() conf = new SparkConf(false).set("spark.metrics.conf", filePath) + securityMgr = new SecurityManager(conf) } test("MetricsSystem with default config") { - val metricsSystem = MetricsSystem.createMetricsSystem("default", conf) + val metricsSystem = MetricsSystem.createMetricsSystem("default", conf, securityMgr) val sources = metricsSystem.sources val sinks = metricsSystem.sinks @@ -42,7 +43,7 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter { } test("MetricsSystem with sources add") { - val metricsSystem = MetricsSystem.createMetricsSystem("test", conf) + val metricsSystem = MetricsSystem.createMetricsSystem("test", conf, securityMgr) val sources = metricsSystem.sources val sinks = metricsSystem.sinks http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 9f011d9..121e47c 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -28,7 +28,7 @@ import org.scalatest.concurrent.Timeouts._ import org.scalatest.matchers.ShouldMatchers._ import org.scalatest.time.SpanSugar._ -import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.{SecurityManager, SparkConf, SparkContext} import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, Utils} @@ -39,6 +39,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT var actorSystem: ActorSystem = null var master: BlockManagerMaster = null var oldArch: String = null + conf.set("spark.authenticate", "false") + val securityMgr = new SecurityManager(conf) // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test conf.set("spark.kryoserializer.buffer.mb", "1") @@ -49,7 +51,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT def rdd(rddId: Int, splitId: Int) = RDDBlockId(rddId, splitId) before { - val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", "localhost", 0, conf = conf) + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", "localhost", 0, conf = conf, + securityManager = securityMgr) this.actorSystem = actorSystem conf.set("spark.driver.port", boundPort.toString) @@ -125,7 +128,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("master + 1 manager interaction") { - store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf) + store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf, securityMgr) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -155,8 +158,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("master + 2 managers interaction") { - store = new BlockManager("exec1", actorSystem, master, serializer, 2000, conf) - store2 = new BlockManager("exec2", actorSystem, master, new KryoSerializer(conf), 2000, conf) + store = new BlockManager("exec1", actorSystem, master, serializer, 2000, conf, securityMgr) + store2 = new BlockManager("exec2", actorSystem, master, new KryoSerializer(conf), 2000, conf, + securityMgr) val peers = master.getPeers(store.blockManagerId, 1) assert(peers.size === 1, "master did not return the other manager as a peer") @@ -171,7 +175,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("removing block") { - store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf) + store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf, securityMgr) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -219,7 +223,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("removing rdd") { - store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf) + store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf, securityMgr) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -253,7 +257,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT test("reregistration on heart beat") { val heartBeat = PrivateMethod[Unit]('heartBeat) - store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf) + store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf, securityMgr) val a1 = new Array[Byte](400) store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY) @@ -269,7 +273,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("reregistration on block update") { - store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf) + store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf, securityMgr) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) @@ -288,7 +292,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT test("reregistration doesn't dead lock") { val heartBeat = PrivateMethod[Unit]('heartBeat) - store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf) + store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf, securityMgr) val a1 = new Array[Byte](400) val a2 = List(new Array[Byte](400)) @@ -325,7 +329,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU storage") { - store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf) + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf, securityMgr) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -344,7 +348,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU storage with serialization") { - store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf) + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf, securityMgr) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -363,7 +367,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU for partitions of same RDD") { - store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf) + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf, securityMgr) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -382,7 +386,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU for partitions of multiple RDDs") { - store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf) + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf, securityMgr) store.putSingle(rdd(0, 1), new Array[Byte](400), StorageLevel.MEMORY_ONLY) store.putSingle(rdd(0, 2), new Array[Byte](400), StorageLevel.MEMORY_ONLY) store.putSingle(rdd(1, 1), new Array[Byte](400), StorageLevel.MEMORY_ONLY) @@ -405,7 +409,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("on-disk storage") { - store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf) + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf, securityMgr) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -418,7 +422,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("disk and memory storage") { - store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf) + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf, securityMgr) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -433,7 +437,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("disk and memory storage with getLocalBytes") { - store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf) + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf, securityMgr) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -448,7 +452,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("disk and memory storage with serialization") { - store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf) + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf, securityMgr) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -463,7 +467,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("disk and memory storage with serialization and getLocalBytes") { - store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf) + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf, securityMgr) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -478,7 +482,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("LRU with mixed storage levels") { - store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf) + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf, securityMgr) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -503,7 +507,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("in-memory LRU with streams") { - store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf) + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf, securityMgr) val list1 = List(new Array[Byte](200), new Array[Byte](200)) val list2 = List(new Array[Byte](200), new Array[Byte](200)) val list3 = List(new Array[Byte](200), new Array[Byte](200)) @@ -527,7 +531,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("LRU with mixed storage levels and streams") { - store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf) + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf, securityMgr) val list1 = List(new Array[Byte](200), new Array[Byte](200)) val list2 = List(new Array[Byte](200), new Array[Byte](200)) val list3 = List(new Array[Byte](200), new Array[Byte](200)) @@ -573,7 +577,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } test("overly large block") { - store = new BlockManager("<driver>", actorSystem, master, serializer, 500, conf) + store = new BlockManager("<driver>", actorSystem, master, serializer, 500, conf, securityMgr) store.putSingle("a1", new Array[Byte](1000), StorageLevel.MEMORY_ONLY) assert(store.getSingle("a1") === None, "a1 was in store") store.putSingle("a2", new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK) @@ -584,7 +588,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT test("block compression") { try { conf.set("spark.shuffle.compress", "true") - store = new BlockManager("exec1", actorSystem, master, serializer, 2000, conf) + store = new BlockManager("exec1", actorSystem, master, serializer, 2000, conf, securityMgr) store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) <= 100, "shuffle_0_0_0 was not compressed") @@ -592,7 +596,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT store = null conf.set("spark.shuffle.compress", "false") - store = new BlockManager("exec2", actorSystem, master, serializer, 2000, conf) + store = new BlockManager("exec2", actorSystem, master, serializer, 2000, conf, securityMgr) store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) >= 1000, "shuffle_0_0_0 was compressed") @@ -600,7 +604,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT store = null conf.set("spark.broadcast.compress", "true") - store = new BlockManager("exec3", actorSystem, master, serializer, 2000, conf) + store = new BlockManager("exec3", actorSystem, master, serializer, 2000, conf, securityMgr) store.putSingle(BroadcastBlockId(0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize(BroadcastBlockId(0)) <= 100, "broadcast_0 was not compressed") @@ -608,28 +612,28 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT store = null conf.set("spark.broadcast.compress", "false") - store = new BlockManager("exec4", actorSystem, master, serializer, 2000, conf) + store = new BlockManager("exec4", actorSystem, master, serializer, 2000, conf, securityMgr) store.putSingle(BroadcastBlockId(0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize(BroadcastBlockId(0)) >= 1000, "broadcast_0 was compressed") store.stop() store = null conf.set("spark.rdd.compress", "true") - store = new BlockManager("exec5", actorSystem, master, serializer, 2000, conf) + store = new BlockManager("exec5", actorSystem, master, serializer, 2000, conf, securityMgr) store.putSingle(rdd(0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize(rdd(0, 0)) <= 100, "rdd_0_0 was not compressed") store.stop() store = null conf.set("spark.rdd.compress", "false") - store = new BlockManager("exec6", actorSystem, master, serializer, 2000, conf) + store = new BlockManager("exec6", actorSystem, master, serializer, 2000, conf, securityMgr) store.putSingle(rdd(0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize(rdd(0, 0)) >= 1000, "rdd_0_0 was compressed") store.stop() store = null // Check that any other block types are also kept uncompressed - store = new BlockManager("exec7", actorSystem, master, serializer, 2000, conf) + store = new BlockManager("exec7", actorSystem, master, serializer, 2000, conf, securityMgr) store.putSingle("other_block", new Array[Byte](1000), StorageLevel.MEMORY_ONLY) assert(store.memoryStore.getSize("other_block") >= 1000, "other_block was compressed") store.stop() @@ -643,7 +647,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT test("block store put failure") { // Use Java serializer so we can create an unserializable error. - store = new BlockManager("<driver>", actorSystem, master, new JavaSerializer(conf), 1200, conf) + store = new BlockManager("<driver>", actorSystem, master, new JavaSerializer(conf), 1200, conf, + securityMgr) // The put should fail since a1 is not serializable. class UnserializableClass http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/core/src/test/scala/org/apache/spark/ui/UISuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index 20ebb18..3041581 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -24,6 +24,8 @@ import scala.util.{Failure, Success, Try} import org.eclipse.jetty.server.Server import org.scalatest.FunSuite +import org.apache.spark.SparkConf + class UISuite extends FunSuite { test("jetty port increases under contention") { val startPort = 4040 @@ -34,15 +36,17 @@ class UISuite extends FunSuite { case Failure(e) => // Either case server port is busy hence setup for test complete } - val (jettyServer1, boundPort1) = JettyUtils.startJettyServer("0.0.0.0", startPort, Seq()) - val (jettyServer2, boundPort2) = JettyUtils.startJettyServer("0.0.0.0", startPort, Seq()) + val (jettyServer1, boundPort1) = JettyUtils.startJettyServer("0.0.0.0", startPort, Seq(), + new SparkConf) + val (jettyServer2, boundPort2) = JettyUtils.startJettyServer("0.0.0.0", startPort, Seq(), + new SparkConf) // Allow some wiggle room in case ports on the machine are under contention assert(boundPort1 > startPort && boundPort1 < startPort + 10) assert(boundPort2 > boundPort1 && boundPort2 < boundPort1 + 10) } test("jetty binds to port 0 correctly") { - val (jettyServer, boundPort) = JettyUtils.startJettyServer("0.0.0.0", 0, Seq()) + val (jettyServer, boundPort) = JettyUtils.startJettyServer("0.0.0.0", 0, Seq(), new SparkConf) assert(jettyServer.getState === "STARTED") assert(boundPort != 0) Try {new ServerSocket(boundPort)} match { http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/docs/configuration.md ---------------------------------------------------------------------- diff --git a/docs/configuration.md b/docs/configuration.md index 017d509..913c653 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -147,6 +147,34 @@ Apart from these, the following properties are also available, and may be useful How many stages the Spark UI remembers before garbage collecting. </td> </tr> +</tr> + <td>spark.ui.filters</td> + <td>None</td> + <td> + Comma separated list of filter class names to apply to the Spark web ui. The filter should be a + standard javax servlet Filter. Parameters to each filter can also be specified by setting a + java system property of spark.<class name of filter>.params='param1=value1,param2=value2' + (e.g.-Dspark.ui.filters=com.test.filter1 -Dspark.com.test.filter1.params='param1=foo,param2=testing') + </td> +</tr> +<tr> + <td>spark.ui.acls.enable</td> + <td>false</td> + <td> + Whether spark web ui acls should are enabled. If enabled, this checks to see if the user has + access permissions to view the web ui. See <code>spark.ui.view.acls</code> for more details. + Also note this requires the user to be known, if the user comes across as null no checks + are done. Filters can be used to authenticate and set the user. + </td> +</tr> +<tr> + <td>spark.ui.view.acls</td> + <td>Empty</td> + <td> + Comma separated list of users that have view access to the spark web ui. By default only the + user that started the Spark job has view access. + </td> +</tr> <tr> <td>spark.shuffle.compress</td> <td>true</td> @@ -495,6 +523,29 @@ Apart from these, the following properties are also available, and may be useful <td> Whether to overwrite files added through SparkContext.addFile() when the target file exists and its contents do not match those of the source. </td> +<tr> + <td>spark.authenticate</td> + <td>false</td> + <td> + Whether spark authenticates its internal connections. See <code>spark.authenticate.secret</code> if not + running on Yarn. + </td> +</tr> +<tr> + <td>spark.authenticate.secret</td> + <td>None</td> + <td> + Set the secret key used for Spark to authenticate between components. This needs to be set if + not running on Yarn and authentication is enabled. + </td> +</tr> +<tr> + <td>spark.core.connection.auth.wait.timeout</td> + <td>30</td> + <td> + Number of seconds for the connection to wait for authentication to occur before timing + out and giving up. + </td> </tr> </table> http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/docs/index.md ---------------------------------------------------------------------- diff --git a/docs/index.md b/docs/index.md index 4eb297d..c4f4d79 100644 --- a/docs/index.md +++ b/docs/index.md @@ -103,6 +103,7 @@ For this version of Spark (0.8.1) Hadoop 2.2.x (or newer) users will have to bui * [Configuration](configuration.html): customize Spark via its configuration system * [Tuning Guide](tuning.html): best practices to optimize performance and memory use +* [Security](security.html): Spark security support * [Hardware Provisioning](hardware-provisioning.html): recommendations for cluster hardware * [Job Scheduling](job-scheduling.html): scheduling resources across and within Spark applications * [Building Spark with Maven](building-with-maven.html): build Spark using the Maven system http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/docs/security.md ---------------------------------------------------------------------- diff --git a/docs/security.md b/docs/security.md new file mode 100644 index 0000000..9e4218f --- /dev/null +++ b/docs/security.md @@ -0,0 +1,18 @@ +--- +layout: global +title: Spark Security +--- + +Spark currently supports authentication via a shared secret. Authentication can be configured to be on via the `spark.authenticate` configuration parameter. This parameter controls whether the Spark communication protocols do authentication using the shared secret. This authentication is a basic handshake to make sure both sides have the same shared secret and are allowed to communicate. If the shared secret is not identical they will not be allowed to communicate. + +The Spark UI can also be secured by using javax servlet filters. A user may want to secure the UI if it has data that other users should not be allowed to see. The javax servlet filter specified by the user can authenticate the user and then once the user is logged in, Spark can compare that user versus the view acls to make sure they are authorized to view the UI. The configs 'spark.ui.acls.enable' and 'spark.ui.view.acls' control the behavior of the acls. Note that the person who started the application always has view access to the UI. + +For Spark on Yarn deployments, configuring `spark.authenticate` to true will automatically handle generating and distributing the shared secret. Each application will use a unique shared secret. The Spark UI uses the standard YARN web application proxy mechanism and will authenticate via any installed Hadoop filters. If an authentication filter is enabled, the acls controls can be used by control which users can via the Spark UI. + +For other types of Spark deployments, the spark config `spark.authenticate.secret` should be configured on each of the nodes. This secret will be used by all the Master/Workers and applications. The UI can be secured using a javax servlet filter installed via `spark.ui.filters`. If an authentication filter is enabled, the acls controls can be used by control which users can via the Spark UI. + +IMPORTANT NOTE: The NettyBlockFetcherIterator is not secured so do not use netty for the shuffle is running with authentication on. + +See [Spark Configuration](configuration.html) for more details on the security configs. + +See <a href="api/core/index.html#org.apache.spark.SecurityManager"><code>org.apache.spark.SecurityManager</code></a> for implementation details about security. http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala index 3d7b390..62d3a52 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala @@ -23,7 +23,7 @@ import scala.util.Random import akka.actor.{Actor, ActorRef, Props, actorRef2Scala} -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SecurityManager} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions import org.apache.spark.streaming.receivers.Receiver @@ -112,8 +112,9 @@ object FeederActor { } val Seq(host, port) = args.toSeq - - val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt, conf = new SparkConf)._1 + val conf = new SparkConf + val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt, conf = conf, + securityManager = new SecurityManager(conf))._1 val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor") println("Feeder started as:" + feeder) http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index c59fada..3b86385 100644 --- a/pom.xml +++ b/pom.xml @@ -157,6 +157,21 @@ <dependencies> <dependency> <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-util</artifactId> + <version>7.6.8.v20121106</version> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-security</artifactId> + <version>7.6.8.v20121106</version> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-plus</artifactId> + <version>7.6.8.v20121106</version> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-server</artifactId> <version>7.6.8.v20121106</version> </dependency> @@ -296,6 +311,11 @@ <version>${mesos.version}</version> </dependency> <dependency> + <groupId>commons-net</groupId> + <artifactId>commons-net</artifactId> + <version>2.2</version> + </dependency> + <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.0.17.Final</version> http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/project/SparkBuild.scala ---------------------------------------------------------------------- diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index aa17848..138aad7 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -226,6 +226,9 @@ object SparkBuild extends Build { libraryDependencies ++= Seq( "io.netty" % "netty-all" % "4.0.17.Final", "org.eclipse.jetty" % "jetty-server" % "7.6.8.v20121106", + "org.eclipse.jetty" % "jetty-util" % "7.6.8.v20121106", + "org.eclipse.jetty" % "jetty-plus" % "7.6.8.v20121106", + "org.eclipse.jetty" % "jetty-security" % "7.6.8.v20121106", /** Workaround for SPARK-959. Dependency used by org.eclipse.jetty. Fixed in ivy 2.3.0. */ "org.eclipse.jetty.orbit" % "javax.servlet" % "2.5.0.v201103041518" artifacts Artifact("javax.servlet", "jar", "jar"), "org.scalatest" %% "scalatest" % "1.9.1" % "test", @@ -285,6 +288,7 @@ object SparkBuild extends Build { "it.unimi.dsi" % "fastutil" % "6.4.4", "colt" % "colt" % "1.2.0", "org.apache.mesos" % "mesos" % "0.13.0", + "commons-net" % "commons-net" % "2.2", "net.java.dev.jets3t" % "jets3t" % "0.7.1" excludeAll(excludeCommonsLogging), "org.apache.derby" % "derby" % "10.4.2.0" % "test", "org.apache.hadoop" % hadoopClient % hadoopVersion excludeAll(excludeNetty, excludeAsm, excludeCommonsLogging, excludeSLF4J), http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala ---------------------------------------------------------------------- diff --git a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala index e3bcf7f..1aa9407 100644 --- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala +++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala @@ -18,12 +18,15 @@ package org.apache.spark.repl import java.io.{ByteArrayOutputStream, InputStream} -import java.net.{URI, URL, URLClassLoader, URLEncoder} +import java.net.{URI, URL, URLEncoder} import java.util.concurrent.{Executors, ExecutorService} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.spark.SparkEnv +import org.apache.spark.util.Utils + import org.objectweb.asm._ import org.objectweb.asm.Opcodes._ @@ -53,7 +56,13 @@ extends ClassLoader(parent) { if (fileSystem != null) { fileSystem.open(new Path(directory, pathInDirectory)) } else { - new URL(classUri + "/" + urlEncode(pathInDirectory)).openStream() + if (SparkEnv.get.securityManager.isAuthenticationEnabled()) { + val uri = new URI(classUri + "/" + urlEncode(pathInDirectory)) + val newuri = Utils.constructURIForAuthentication(uri, SparkEnv.get.securityManager) + newuri.toURL().openStream() + } else { + new URL(classUri + "/" + urlEncode(pathInDirectory)).openStream() + } } } val bytes = readAndTransformClass(name, inputStream) http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala ---------------------------------------------------------------------- diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala index f52ebe4..9b1da19 100644 --- a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -881,6 +881,8 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter, }) def process(settings: Settings): Boolean = savingContextLoader { + if (getMaster() == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true") + this.settings = settings createInterpreter() @@ -939,16 +941,9 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter, def createSparkContext(): SparkContext = { val execUri = System.getenv("SPARK_EXECUTOR_URI") - val master = this.master match { - case Some(m) => m - case None => { - val prop = System.getenv("MASTER") - if (prop != null) prop else "local" - } - } val jars = SparkILoop.getAddedJars.map(new java.io.File(_).getAbsolutePath) val conf = new SparkConf() - .setMaster(master) + .setMaster(getMaster()) .setAppName("Spark shell") .setJars(jars) .set("spark.repl.class.uri", intp.classServer.uri) @@ -963,6 +958,17 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter, sparkContext } + private def getMaster(): String = { + val master = this.master match { + case Some(m) => m + case None => { + val prop = System.getenv("MASTER") + if (prop != null) prop else "local" + } + } + master + } + /** process command-line arguments and do as they request */ def process(args: Array[String]): Boolean = { val command = new SparkCommandLine(args.toList, msg => echo(msg)) http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala ---------------------------------------------------------------------- diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala index 1d73d0b..90a96ad 100644 --- a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala @@ -36,7 +36,7 @@ import scala.tools.reflect.StdRuntimeTags._ import scala.util.control.ControlThrowable import util.stackTraceString -import org.apache.spark.{HttpServer, SparkConf, Logging} +import org.apache.spark.{Logging, HttpServer, SecurityManager, SparkConf} import org.apache.spark.util.Utils // /** directory to save .class files to */ @@ -83,15 +83,17 @@ import org.apache.spark.util.Utils * @author Moez A. Abdel-Gawad * @author Lex Spoon */ - class SparkIMain(initialSettings: Settings, val out: JPrintWriter) extends SparkImports with Logging { + class SparkIMain(initialSettings: Settings, val out: JPrintWriter) + extends SparkImports with Logging { imain => - val SPARK_DEBUG_REPL: Boolean = (System.getenv("SPARK_DEBUG_REPL") == "1") + val conf = new SparkConf() + val SPARK_DEBUG_REPL: Boolean = (System.getenv("SPARK_DEBUG_REPL") == "1") /** Local directory to save .class files too */ val outputDir = { val tmp = System.getProperty("java.io.tmpdir") - val rootDir = new SparkConf().get("spark.repl.classdir", tmp) + val rootDir = conf.get("spark.repl.classdir", tmp) Utils.createTempDir(rootDir) } if (SPARK_DEBUG_REPL) { @@ -99,7 +101,8 @@ import org.apache.spark.util.Utils } val virtualDirectory = new PlainFile(outputDir) // "directory" for classfiles - val classServer = new HttpServer(outputDir) /** Jetty server that will serve our classes to worker nodes */ + val classServer = new HttpServer(outputDir, + new SecurityManager(conf)) /** Jetty server that will serve our classes to worker nodes */ private var currentSettings: Settings = initialSettings var printResults = true // whether to print result lines var totalSilence = false // whether to print anything http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---------------------------------------------------------------------- diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index e045b9f..bb574f4 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -27,7 +27,6 @@ import scala.collection.JavaConversions._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.net.NetUtils -import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.util.ShutdownHookManager import org.apache.hadoop.yarn.api._ import org.apache.hadoop.yarn.api.records._ @@ -36,7 +35,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{ConverterUtils, Records} -import org.apache.spark.{SparkConf, SparkContext, Logging} +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.util.Utils @@ -87,27 +86,16 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts resourceManager = registerWithResourceManager() - // Workaround until hadoop moves to something which has - // https://issues.apache.org/jira/browse/HADOOP-8406 - fixed in (2.0.2-alpha but no 0.23 line) - // ignore result. - // This does not, unfortunately, always work reliably ... but alleviates the bug a lot of times - // Hence args.workerCores = numCore disabled above. Any better option? - - // Compute number of threads for akka - //val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory() - //if (minimumMemory > 0) { - // val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD - // val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0) - - // if (numCore > 0) { - // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406 - // TODO: Uncomment when hadoop is on a version which has this fixed. - // args.workerCores = numCore - // } - //} - // org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(conf) + // setup AmIpFilter for the SparkUI - do this before we start the UI + addAmIpFilter() ApplicationMaster.register(this) + + // Call this to force generation of secret so it gets populated into the + // hadoop UGI. This has to happen before the startUserClass which does a + // doAs in order for the credentials to be passed on to the worker containers. + val securityMgr = new SecurityManager(sparkConf) + // Start the user's JAR userThread = startUserClass() @@ -132,6 +120,20 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, System.exit(0) } + // add the yarn amIpFilter that Yarn requires for properly securing the UI + private def addAmIpFilter() { + val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter" + System.setProperty("spark.ui.filters", amFilter) + val proxy = YarnConfiguration.getProxyHostAndPort(conf) + val parts : Array[String] = proxy.split(":") + val uriBase = "http://" + proxy + + System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV) + + val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase + System.setProperty("spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params", + params) + } + /** Get the Yarn approved local directories. */ private def getLocalDirs(): String = { // Hadoop 0.23 and 2.x have different Environment variable names for the http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala ---------------------------------------------------------------------- diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala index 138c279..b735d01 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import akka.actor._ import akka.remote._ import akka.actor.Terminated -import org.apache.spark.{SparkConf, SparkContext, Logging} +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext} import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.SplitInfo @@ -50,8 +50,9 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar private var yarnAllocator: YarnAllocationHandler = _ private var driverClosed:Boolean = false + val securityManager = new SecurityManager(sparkConf) val actorSystem : ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0, - conf = sparkConf)._1 + conf = sparkConf, securityManager = securityManager)._1 var actor: ActorRef = _ // This actor just working as a monitor to watch on Driver Actor. @@ -110,6 +111,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar // we want to be reasonably responsive without causing too many requests to RM. val schedulerInterval = System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong + // must be <= timeoutInterval / 2. val interval = math.min(timeoutInterval / 2, schedulerInterval) http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala ---------------------------------------------------------------------- diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index fe37168..11322b1 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -134,7 +134,7 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { " --args ARGS Arguments to be passed to your application's main class.\n" + " Mutliple invocations are possible, each will be passed in order.\n" + " --num-workers NUM Number of workers to start (Default: 2)\n" + - " --worker-cores NUM Number of cores for the workers (Default: 1). This is unsused right now.\n" + + " --worker-cores NUM Number of cores for the workers (Default: 1).\n" + " --master-class CLASS_NAME Class Name for Master (Default: spark.deploy.yarn.ApplicationMaster)\n" + " --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" + " --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" + http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala ---------------------------------------------------------------------- diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index d6c12a9..4c6e1dc 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -17,11 +17,13 @@ package org.apache.spark.deploy.yarn -import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.hadoop.io.Text import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.security.Credentials import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.conf.Configuration +import org.apache.spark.deploy.SparkHadoopUtil /** * Contains util methods to interact with Hadoop from spark. @@ -44,4 +46,24 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { val jobCreds = conf.getCredentials() jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials()) } + + override def getCurrentUserCredentials(): Credentials = { + UserGroupInformation.getCurrentUser().getCredentials() + } + + override def addCurrentUserCredentials(creds: Credentials) { + UserGroupInformation.getCurrentUser().addCredentials(creds) + } + + override def addSecretKeyToUserCredentials(key: String, secret: String) { + val creds = new Credentials() + creds.addSecretKey(new Text(key), secret.getBytes()) + addCurrentUserCredentials(creds) + } + + override def getSecretKeyFromUserCredentials(key: String): Array[Byte] = { + val credentials = getCurrentUserCredentials() + if (credentials != null) credentials.getSecretKey(new Text(key)) else null + } + } http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---------------------------------------------------------------------- diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index dd117d5..b48a2d5 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -27,7 +27,6 @@ import scala.collection.JavaConversions._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.net.NetUtils -import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.util.ShutdownHookManager import org.apache.hadoop.yarn.api._ import org.apache.hadoop.yarn.api.protocolrecords._ @@ -37,8 +36,9 @@ import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{ConverterUtils, Records} +import org.apache.hadoop.yarn.webapp.util.WebAppUtils; -import org.apache.spark.{SparkConf, SparkContext, Logging} +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.util.Utils @@ -91,12 +91,16 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, amClient.init(yarnConf) amClient.start() - // Workaround until hadoop moves to something which has - // https://issues.apache.org/jira/browse/HADOOP-8406 - fixed in (2.0.2-alpha but no 0.23 line) - // org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(conf) + // setup AmIpFilter for the SparkUI - do this before we start the UI + addAmIpFilter() ApplicationMaster.register(this) + // Call this to force generation of secret so it gets populated into the + // hadoop UGI. This has to happen before the startUserClass which does a + // doAs in order for the credentials to be passed on to the worker containers. + val securityMgr = new SecurityManager(sparkConf) + // Start the user's JAR userThread = startUserClass() @@ -121,6 +125,19 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, System.exit(0) } + // add the yarn amIpFilter that Yarn requires for properly securing the UI + private def addAmIpFilter() { + val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter" + System.setProperty("spark.ui.filters", amFilter) + val proxy = WebAppUtils.getProxyHostAndPort(conf) + val parts : Array[String] = proxy.split(":") + val uriBase = "http://" + proxy + + System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV) + + val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase + System.setProperty("spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params", params) + } + /** Get the Yarn approved local directories. */ private def getLocalDirs(): String = { // Hadoop 0.23 and 2.x have different Environment variable names for the @@ -261,7 +278,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, val schedulerInterval = sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000) - // must be <= timeoutInterval / 2. val interval = math.min(timeoutInterval / 2, schedulerInterval) http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala ---------------------------------------------------------------------- diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala index 40600f3..f1c1fea 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala @@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import akka.actor._ import akka.remote._ import akka.actor.Terminated -import org.apache.spark.{SparkConf, SparkContext, Logging} +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext} import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.SplitInfo @@ -52,8 +52,9 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar private var amClient: AMRMClient[ContainerRequest] = _ + val securityManager = new SecurityManager(sparkConf) val actorSystem: ActorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0, - conf = sparkConf)._1 + conf = sparkConf, securityManager = securityManager)._1 var actor: ActorRef = _ // This actor just working as a monitor to watch on Driver Actor. @@ -105,6 +106,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration, spar val interval = math.min(timeoutInterval / 2, schedulerInterval) reporterThread = launchReporterThread(interval) + // Wait for the reporter thread to Finish. reporterThread.join()
