http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala ---------------------------------------------------------------------- diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala deleted file mode 100644 index 7deaf0a..0000000 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ /dev/null @@ -1,462 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import java.io.{File, FileInputStream, FileOutputStream} -import java.net.URI -import java.util.Properties - -import scala.collection.JavaConverters._ -import scala.collection.mutable.{HashMap => MutableHashMap} -import scala.reflect.ClassTag -import scala.util.Try - -import org.apache.commons.lang3.SerializationUtils -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path -import org.apache.hadoop.mapreduce.MRJobConfig -import org.apache.hadoop.yarn.api.ApplicationConstants.Environment -import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse -import org.apache.hadoop.yarn.api.records._ -import org.apache.hadoop.yarn.client.api.YarnClientApplication -import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.util.Records -import org.mockito.Matchers.{eq => meq, _} -import org.mockito.Mockito._ -import org.scalatest.{BeforeAndAfterAll, Matchers} - -import org.apache.spark.{SparkConf, SparkFunSuite, TestUtils} -import org.apache.spark.deploy.yarn.config._ -import org.apache.spark.util.{ResetSystemProperties, SparkConfWithEnv, Utils} - -class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll - with ResetSystemProperties { - - import Client._ - - var oldSystemProperties: Properties = null - - override def beforeAll(): Unit = { - super.beforeAll() - oldSystemProperties = SerializationUtils.clone(System.getProperties) - System.setProperty("SPARK_YARN_MODE", "true") - } - - override def afterAll(): Unit = { - try { - System.setProperties(oldSystemProperties) - oldSystemProperties = null - } finally { - super.afterAll() - } - } - - test("default Yarn application classpath") { - getDefaultYarnApplicationClasspath should be(Some(Fixtures.knownDefYarnAppCP)) - } - - test("default MR application classpath") { - getDefaultMRApplicationClasspath should be(Some(Fixtures.knownDefMRAppCP)) - } - - test("resultant classpath for an application that defines a classpath for YARN") { - withAppConf(Fixtures.mapYARNAppConf) { conf => - val env = newEnv - populateHadoopClasspath(conf, env) - classpath(env) should be( - flatten(Fixtures.knownYARNAppCP, getDefaultMRApplicationClasspath)) - } - } - - test("resultant classpath for an application that defines a classpath for MR") { - withAppConf(Fixtures.mapMRAppConf) { conf => - val env = newEnv - populateHadoopClasspath(conf, env) - classpath(env) should be( - flatten(getDefaultYarnApplicationClasspath, Fixtures.knownMRAppCP)) - } - } - - test("resultant classpath for an application that defines both classpaths, YARN and MR") { - withAppConf(Fixtures.mapAppConf) { conf => - val env = newEnv - populateHadoopClasspath(conf, env) - classpath(env) should be(flatten(Fixtures.knownYARNAppCP, Fixtures.knownMRAppCP)) - } - } - - private val SPARK = "local:/sparkJar" - private val USER = "local:/userJar" - private val ADDED = "local:/addJar1,local:/addJar2,/addJar3" - - private val PWD = - if (classOf[Environment].getMethods().exists(_.getName == "$$")) { - "{{PWD}}" - } else if (Utils.isWindows) { - "%PWD%" - } else { - Environment.PWD.$() - } - - test("Local jar URIs") { - val conf = new Configuration() - val sparkConf = new SparkConf() - .set(SPARK_JARS, Seq(SPARK)) - .set(USER_CLASS_PATH_FIRST, true) - .set("spark.yarn.dist.jars", ADDED) - val env = new MutableHashMap[String, String]() - val args = new ClientArguments(Array("--jar", USER)) - - populateClasspath(args, conf, sparkConf, env) - - val cp = env("CLASSPATH").split(":|;|<CPS>") - s"$SPARK,$USER,$ADDED".split(",").foreach({ entry => - val uri = new URI(entry) - if (LOCAL_SCHEME.equals(uri.getScheme())) { - cp should contain (uri.getPath()) - } else { - cp should not contain (uri.getPath()) - } - }) - cp should contain(PWD) - cp should contain (s"$PWD${Path.SEPARATOR}${LOCALIZED_CONF_DIR}") - cp should not contain (APP_JAR) - } - - test("Jar path propagation through SparkConf") { - val conf = new Configuration() - val sparkConf = new SparkConf() - .set(SPARK_JARS, Seq(SPARK)) - .set("spark.yarn.dist.jars", ADDED) - val client = createClient(sparkConf, args = Array("--jar", USER)) - doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]), - any(classOf[Path]), anyShort(), anyBoolean(), any()) - - val tempDir = Utils.createTempDir() - try { - // Because we mocked "copyFileToRemote" above to avoid having to create fake local files, - // we need to create a fake config archive in the temp dir to avoid having - // prepareLocalResources throw an exception. - new FileOutputStream(new File(tempDir, LOCALIZED_CONF_ARCHIVE)).close() - - client.prepareLocalResources(new Path(tempDir.getAbsolutePath()), Nil) - sparkConf.get(APP_JAR) should be (Some(USER)) - - // The non-local path should be propagated by name only, since it will end up in the app's - // staging dir. - val expected = ADDED.split(",") - .map(p => { - val uri = new URI(p) - if (LOCAL_SCHEME == uri.getScheme()) { - p - } else { - Option(uri.getFragment()).getOrElse(new File(p).getName()) - } - }) - .mkString(",") - - sparkConf.get(SECONDARY_JARS) should be (Some(expected.split(",").toSeq)) - } finally { - Utils.deleteRecursively(tempDir) - } - } - - test("Cluster path translation") { - val conf = new Configuration() - val sparkConf = new SparkConf() - .set(SPARK_JARS, Seq("local:/localPath/spark.jar")) - .set(GATEWAY_ROOT_PATH, "/localPath") - .set(REPLACEMENT_ROOT_PATH, "/remotePath") - - getClusterPath(sparkConf, "/localPath") should be ("/remotePath") - getClusterPath(sparkConf, "/localPath/1:/localPath/2") should be ( - "/remotePath/1:/remotePath/2") - - val env = new MutableHashMap[String, String]() - populateClasspath(null, conf, sparkConf, env, extraClassPath = Some("/localPath/my1.jar")) - val cp = classpath(env) - cp should contain ("/remotePath/spark.jar") - cp should contain ("/remotePath/my1.jar") - } - - test("configuration and args propagate through createApplicationSubmissionContext") { - val conf = new Configuration() - // When parsing tags, duplicates and leading/trailing whitespace should be removed. - // Spaces between non-comma strings should be preserved as single tags. Empty strings may or - // may not be removed depending on the version of Hadoop being used. - val sparkConf = new SparkConf() - .set(APPLICATION_TAGS.key, ",tag1, dup,tag2 , ,multi word , dup") - .set(MAX_APP_ATTEMPTS, 42) - .set("spark.app.name", "foo-test-app") - .set(QUEUE_NAME, "staging-queue") - val args = new ClientArguments(Array()) - - val appContext = Records.newRecord(classOf[ApplicationSubmissionContext]) - val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse]) - val containerLaunchContext = Records.newRecord(classOf[ContainerLaunchContext]) - - val client = new Client(args, conf, sparkConf) - client.createApplicationSubmissionContext( - new YarnClientApplication(getNewApplicationResponse, appContext), - containerLaunchContext) - - appContext.getApplicationName should be ("foo-test-app") - appContext.getQueue should be ("staging-queue") - appContext.getAMContainerSpec should be (containerLaunchContext) - appContext.getApplicationType should be ("SPARK") - appContext.getClass.getMethods.filter(_.getName.equals("getApplicationTags")).foreach{ method => - val tags = method.invoke(appContext).asInstanceOf[java.util.Set[String]] - tags should contain allOf ("tag1", "dup", "tag2", "multi word") - tags.asScala.count(_.nonEmpty) should be (4) - } - appContext.getMaxAppAttempts should be (42) - } - - test("spark.yarn.jars with multiple paths and globs") { - val libs = Utils.createTempDir() - val single = Utils.createTempDir() - val jar1 = TestUtils.createJarWithFiles(Map(), libs) - val jar2 = TestUtils.createJarWithFiles(Map(), libs) - val jar3 = TestUtils.createJarWithFiles(Map(), single) - val jar4 = TestUtils.createJarWithFiles(Map(), single) - - val jarsConf = Seq( - s"${libs.getAbsolutePath()}/*", - jar3.getPath(), - s"local:${jar4.getPath()}", - s"local:${single.getAbsolutePath()}/*") - - val sparkConf = new SparkConf().set(SPARK_JARS, jarsConf) - val client = createClient(sparkConf) - - val tempDir = Utils.createTempDir() - client.prepareLocalResources(new Path(tempDir.getAbsolutePath()), Nil) - - assert(sparkConf.get(SPARK_JARS) === - Some(Seq(s"local:${jar4.getPath()}", s"local:${single.getAbsolutePath()}/*"))) - - verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar1.toURI())), anyShort(), - anyBoolean(), any()) - verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar2.toURI())), anyShort(), - anyBoolean(), any()) - verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar3.toURI())), anyShort(), - anyBoolean(), any()) - - val cp = classpath(client) - cp should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*")) - cp should not contain (jar3.getPath()) - cp should contain (jar4.getPath()) - cp should contain (buildPath(single.getAbsolutePath(), "*")) - } - - test("distribute jars archive") { - val temp = Utils.createTempDir() - val archive = TestUtils.createJarWithFiles(Map(), temp) - - val sparkConf = new SparkConf().set(SPARK_ARCHIVE, archive.getPath()) - val client = createClient(sparkConf) - client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil) - - verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(archive.toURI())), anyShort(), - anyBoolean(), any()) - classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*")) - - sparkConf.set(SPARK_ARCHIVE, LOCAL_SCHEME + ":" + archive.getPath()) - intercept[IllegalArgumentException] { - client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil) - } - } - - test("distribute archive multiple times") { - val libs = Utils.createTempDir() - // Create jars dir and RELEASE file to avoid IllegalStateException. - val jarsDir = new File(libs, "jars") - assert(jarsDir.mkdir()) - new FileOutputStream(new File(libs, "RELEASE")).close() - - val userLib1 = Utils.createTempDir() - val testJar = TestUtils.createJarWithFiles(Map(), userLib1) - - // Case 1: FILES_TO_DISTRIBUTE and ARCHIVES_TO_DISTRIBUTE can't have duplicate files - val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" -> libs.getAbsolutePath)) - .set(FILES_TO_DISTRIBUTE, Seq(testJar.getPath)) - .set(ARCHIVES_TO_DISTRIBUTE, Seq(testJar.getPath)) - - val client = createClient(sparkConf) - val tempDir = Utils.createTempDir() - intercept[IllegalArgumentException] { - client.prepareLocalResources(new Path(tempDir.getAbsolutePath()), Nil) - } - - // Case 2: FILES_TO_DISTRIBUTE can't have duplicate files. - val sparkConfFiles = new SparkConfWithEnv(Map("SPARK_HOME" -> libs.getAbsolutePath)) - .set(FILES_TO_DISTRIBUTE, Seq(testJar.getPath, testJar.getPath)) - - val clientFiles = createClient(sparkConfFiles) - val tempDirForFiles = Utils.createTempDir() - intercept[IllegalArgumentException] { - clientFiles.prepareLocalResources(new Path(tempDirForFiles.getAbsolutePath()), Nil) - } - - // Case 3: ARCHIVES_TO_DISTRIBUTE can't have duplicate files. - val sparkConfArchives = new SparkConfWithEnv(Map("SPARK_HOME" -> libs.getAbsolutePath)) - .set(ARCHIVES_TO_DISTRIBUTE, Seq(testJar.getPath, testJar.getPath)) - - val clientArchives = createClient(sparkConfArchives) - val tempDirForArchives = Utils.createTempDir() - intercept[IllegalArgumentException] { - clientArchives.prepareLocalResources(new Path(tempDirForArchives.getAbsolutePath()), Nil) - } - - // Case 4: FILES_TO_DISTRIBUTE can have unique file. - val sparkConfFilesUniq = new SparkConfWithEnv(Map("SPARK_HOME" -> libs.getAbsolutePath)) - .set(FILES_TO_DISTRIBUTE, Seq(testJar.getPath)) - - val clientFilesUniq = createClient(sparkConfFilesUniq) - val tempDirForFilesUniq = Utils.createTempDir() - clientFilesUniq.prepareLocalResources(new Path(tempDirForFilesUniq.getAbsolutePath()), Nil) - - // Case 5: ARCHIVES_TO_DISTRIBUTE can have unique file. - val sparkConfArchivesUniq = new SparkConfWithEnv(Map("SPARK_HOME" -> libs.getAbsolutePath)) - .set(ARCHIVES_TO_DISTRIBUTE, Seq(testJar.getPath)) - - val clientArchivesUniq = createClient(sparkConfArchivesUniq) - val tempDirArchivesUniq = Utils.createTempDir() - clientArchivesUniq.prepareLocalResources(new Path(tempDirArchivesUniq.getAbsolutePath()), Nil) - - } - - test("distribute local spark jars") { - val temp = Utils.createTempDir() - val jarsDir = new File(temp, "jars") - assert(jarsDir.mkdir()) - val jar = TestUtils.createJarWithFiles(Map(), jarsDir) - new FileOutputStream(new File(temp, "RELEASE")).close() - - val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" -> temp.getAbsolutePath())) - val client = createClient(sparkConf) - client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil) - classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*")) - } - - test("ignore same name jars") { - val libs = Utils.createTempDir() - val jarsDir = new File(libs, "jars") - assert(jarsDir.mkdir()) - new FileOutputStream(new File(libs, "RELEASE")).close() - val userLib1 = Utils.createTempDir() - val userLib2 = Utils.createTempDir() - - val jar1 = TestUtils.createJarWithFiles(Map(), jarsDir) - val jar2 = TestUtils.createJarWithFiles(Map(), userLib1) - // Copy jar2 to jar3 with same name - val jar3 = { - val target = new File(userLib2, new File(jar2.toURI).getName) - val input = new FileInputStream(jar2.getPath) - val output = new FileOutputStream(target) - Utils.copyStream(input, output, closeStreams = true) - target.toURI.toURL - } - - val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" -> libs.getAbsolutePath)) - .set(JARS_TO_DISTRIBUTE, Seq(jar2.getPath, jar3.getPath)) - - val client = createClient(sparkConf) - val tempDir = Utils.createTempDir() - client.prepareLocalResources(new Path(tempDir.getAbsolutePath()), Nil) - - // Only jar2 will be added to SECONDARY_JARS, jar3 which has the same name with jar2 will be - // ignored. - sparkConf.get(SECONDARY_JARS) should be (Some(Seq(new File(jar2.toURI).getName))) - } - - object Fixtures { - - val knownDefYarnAppCP: Seq[String] = - getFieldValue[Array[String], Seq[String]](classOf[YarnConfiguration], - "DEFAULT_YARN_APPLICATION_CLASSPATH", - Seq[String]())(a => a.toSeq) - - - val knownDefMRAppCP: Seq[String] = - getFieldValue2[String, Array[String], Seq[String]]( - classOf[MRJobConfig], - "DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH", - Seq[String]())(a => a.split(","))(a => a.toSeq) - - val knownYARNAppCP = Some(Seq("/known/yarn/path")) - - val knownMRAppCP = Some(Seq("/known/mr/path")) - - val mapMRAppConf = - Map("mapreduce.application.classpath" -> knownMRAppCP.map(_.mkString(":")).get) - - val mapYARNAppConf = - Map(YarnConfiguration.YARN_APPLICATION_CLASSPATH -> knownYARNAppCP.map(_.mkString(":")).get) - - val mapAppConf = mapYARNAppConf ++ mapMRAppConf - } - - def withAppConf(m: Map[String, String] = Map())(testCode: (Configuration) => Any) { - val conf = new Configuration - m.foreach { case (k, v) => conf.set(k, v, "ClientSpec") } - testCode(conf) - } - - def newEnv: MutableHashMap[String, String] = MutableHashMap[String, String]() - - def classpath(env: MutableHashMap[String, String]): Array[String] = - env(Environment.CLASSPATH.name).split(":|;|<CPS>") - - def flatten(a: Option[Seq[String]], b: Option[Seq[String]]): Array[String] = - (a ++ b).flatten.toArray - - def getFieldValue[A, B](clazz: Class[_], field: String, defaults: => B)(mapTo: A => B): B = { - Try(clazz.getField(field)) - .map(_.get(null).asInstanceOf[A]) - .toOption - .map(mapTo) - .getOrElse(defaults) - } - - def getFieldValue2[A: ClassTag, A1: ClassTag, B]( - clazz: Class[_], - field: String, - defaults: => B)(mapTo: A => B)(mapTo1: A1 => B): B = { - Try(clazz.getField(field)).map(_.get(null)).map { - case v: A => mapTo(v) - case v1: A1 => mapTo1(v1) - case _ => defaults - }.toOption.getOrElse(defaults) - } - - private def createClient( - sparkConf: SparkConf, - conf: Configuration = new Configuration(), - args: Array[String] = Array()): Client = { - val clientArgs = new ClientArguments(args) - spy(new Client(clientArgs, conf, sparkConf)) - } - - private def classpath(client: Client): Array[String] = { - val env = new MutableHashMap[String, String]() - populateClasspath(null, client.hadoopConf, client.sparkConf, env) - classpath(env) - } - -}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala ---------------------------------------------------------------------- diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala deleted file mode 100644 index afb4b69..0000000 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest -import org.scalatest.{BeforeAndAfterEach, Matchers} - -import org.apache.spark.SparkFunSuite - -class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with BeforeAndAfterEach { - - private val yarnAllocatorSuite = new YarnAllocatorSuite - import yarnAllocatorSuite._ - - def createContainerRequest(nodes: Array[String]): ContainerRequest = - new ContainerRequest(containerResource, nodes, null, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY) - - override def beforeEach() { - yarnAllocatorSuite.beforeEach() - } - - override def afterEach() { - yarnAllocatorSuite.afterEach() - } - - test("allocate locality preferred containers with enough resource and no matched existed " + - "containers") { - // 1. All the locations of current containers cannot satisfy the new requirements - // 2. Current requested container number can fully satisfy the pending tasks. - - val handler = createAllocator(2) - handler.updateResourceRequests() - handler.handleAllocatedContainers(Array(createContainer("host1"), createContainer("host2"))) - - val localities = handler.containerPlacementStrategy.localityOfRequestedContainers( - 3, 15, Map("host3" -> 15, "host4" -> 15, "host5" -> 10), - handler.allocatedHostToContainersMap, Seq.empty) - - assert(localities.map(_.nodes) === Array( - Array("host3", "host4", "host5"), - Array("host3", "host4", "host5"), - Array("host3", "host4"))) - } - - test("allocate locality preferred containers with enough resource and partially matched " + - "containers") { - // 1. Parts of current containers' locations can satisfy the new requirements - // 2. Current requested container number can fully satisfy the pending tasks. - - val handler = createAllocator(3) - handler.updateResourceRequests() - handler.handleAllocatedContainers(Array( - createContainer("host1"), - createContainer("host1"), - createContainer("host2") - )) - - val localities = handler.containerPlacementStrategy.localityOfRequestedContainers( - 3, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10), - handler.allocatedHostToContainersMap, Seq.empty) - - assert(localities.map(_.nodes) === - Array(null, Array("host2", "host3"), Array("host2", "host3"))) - } - - test("allocate locality preferred containers with limited resource and partially matched " + - "containers") { - // 1. Parts of current containers' locations can satisfy the new requirements - // 2. Current requested container number cannot fully satisfy the pending tasks. - - val handler = createAllocator(3) - handler.updateResourceRequests() - handler.handleAllocatedContainers(Array( - createContainer("host1"), - createContainer("host1"), - createContainer("host2") - )) - - val localities = handler.containerPlacementStrategy.localityOfRequestedContainers( - 1, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10), - handler.allocatedHostToContainersMap, Seq.empty) - - assert(localities.map(_.nodes) === Array(Array("host2", "host3"))) - } - - test("allocate locality preferred containers with fully matched containers") { - // Current containers' locations can fully satisfy the new requirements - - val handler = createAllocator(5) - handler.updateResourceRequests() - handler.handleAllocatedContainers(Array( - createContainer("host1"), - createContainer("host1"), - createContainer("host2"), - createContainer("host2"), - createContainer("host3") - )) - - val localities = handler.containerPlacementStrategy.localityOfRequestedContainers( - 3, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10), - handler.allocatedHostToContainersMap, Seq.empty) - - assert(localities.map(_.nodes) === Array(null, null, null)) - } - - test("allocate containers with no locality preference") { - // Request new container without locality preference - - val handler = createAllocator(2) - handler.updateResourceRequests() - handler.handleAllocatedContainers(Array(createContainer("host1"), createContainer("host2"))) - - val localities = handler.containerPlacementStrategy.localityOfRequestedContainers( - 1, 0, Map.empty, handler.allocatedHostToContainersMap, Seq.empty) - - assert(localities.map(_.nodes) === Array(null)) - } - - test("allocate locality preferred containers by considering the localities of pending requests") { - val handler = createAllocator(3) - handler.updateResourceRequests() - handler.handleAllocatedContainers(Array( - createContainer("host1"), - createContainer("host1"), - createContainer("host2") - )) - - val pendingAllocationRequests = Seq( - createContainerRequest(Array("host2", "host3")), - createContainerRequest(Array("host1", "host4"))) - - val localities = handler.containerPlacementStrategy.localityOfRequestedContainers( - 1, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10), - handler.allocatedHostToContainersMap, pendingAllocationRequests) - - assert(localities.map(_.nodes) === Array(Array("host3"))) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala ---------------------------------------------------------------------- diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala deleted file mode 100644 index 994dc75..0000000 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ /dev/null @@ -1,344 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import java.util.{Arrays, List => JList} - -import org.apache.hadoop.fs.CommonConfigurationKeysPublic -import org.apache.hadoop.net.DNSToSwitchMapping -import org.apache.hadoop.yarn.api.records._ -import org.apache.hadoop.yarn.client.api.AMRMClient -import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest -import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.mockito.Mockito._ -import org.scalatest.{BeforeAndAfterEach, Matchers} - -import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} -import org.apache.spark.deploy.yarn.config._ -import org.apache.spark.deploy.yarn.YarnAllocator._ -import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ -import org.apache.spark.rpc.RpcEndpointRef -import org.apache.spark.scheduler.SplitInfo -import org.apache.spark.util.ManualClock - -class MockResolver extends DNSToSwitchMapping { - - override def resolve(names: JList[String]): JList[String] = { - if (names.size > 0 && names.get(0) == "host3") Arrays.asList("/rack2") - else Arrays.asList("/rack1") - } - - override def reloadCachedMappings() {} - - def reloadCachedMappings(names: JList[String]) {} -} - -class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach { - val conf = new YarnConfiguration() - conf.setClass( - CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, - classOf[MockResolver], classOf[DNSToSwitchMapping]) - - val sparkConf = new SparkConf() - sparkConf.set("spark.driver.host", "localhost") - sparkConf.set("spark.driver.port", "4040") - sparkConf.set(SPARK_JARS, Seq("notarealjar.jar")) - sparkConf.set("spark.yarn.launchContainers", "false") - - val appAttemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0) - - // Resource returned by YARN. YARN can give larger containers than requested, so give 6 cores - // instead of the 5 requested and 3 GB instead of the 2 requested. - val containerResource = Resource.newInstance(3072, 6) - - var rmClient: AMRMClient[ContainerRequest] = _ - - var containerNum = 0 - - override def beforeEach() { - super.beforeEach() - rmClient = AMRMClient.createAMRMClient() - rmClient.init(conf) - rmClient.start() - } - - override def afterEach() { - try { - rmClient.stop() - } finally { - super.afterEach() - } - } - - class MockSplitInfo(host: String) extends SplitInfo(null, host, null, 1, null) { - override def hashCode(): Int = 0 - override def equals(other: Any): Boolean = false - } - - def createAllocator(maxExecutors: Int = 5): YarnAllocator = { - val args = Array( - "--jar", "somejar.jar", - "--class", "SomeClass") - val sparkConfClone = sparkConf.clone() - sparkConfClone - .set("spark.executor.instances", maxExecutors.toString) - .set("spark.executor.cores", "5") - .set("spark.executor.memory", "2048") - new YarnAllocator( - "not used", - mock(classOf[RpcEndpointRef]), - conf, - sparkConfClone, - rmClient, - appAttemptId, - new SecurityManager(sparkConf), - Map()) - } - - def createContainer(host: String): Container = { - // When YARN 2.6+ is required, avoid deprecation by using version with long second arg - val containerId = ContainerId.newInstance(appAttemptId, containerNum) - containerNum += 1 - val nodeId = NodeId.newInstance(host, 1000) - Container.newInstance(containerId, nodeId, "", containerResource, RM_REQUEST_PRIORITY, null) - } - - test("single container allocated") { - // request a single container and receive it - val handler = createAllocator(1) - handler.updateResourceRequests() - handler.getNumExecutorsRunning should be (0) - handler.getPendingAllocate.size should be (1) - - val container = createContainer("host1") - handler.handleAllocatedContainers(Array(container)) - - handler.getNumExecutorsRunning should be (1) - handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1") - handler.allocatedHostToContainersMap.get("host1").get should contain (container.getId) - - val size = rmClient.getMatchingRequests(container.getPriority, "host1", containerResource).size - size should be (0) - } - - test("container should not be created if requested number if met") { - // request a single container and receive it - val handler = createAllocator(1) - handler.updateResourceRequests() - handler.getNumExecutorsRunning should be (0) - handler.getPendingAllocate.size should be (1) - - val container = createContainer("host1") - handler.handleAllocatedContainers(Array(container)) - - handler.getNumExecutorsRunning should be (1) - handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1") - handler.allocatedHostToContainersMap.get("host1").get should contain (container.getId) - - val container2 = createContainer("host2") - handler.handleAllocatedContainers(Array(container2)) - handler.getNumExecutorsRunning should be (1) - } - - test("some containers allocated") { - // request a few containers and receive some of them - val handler = createAllocator(4) - handler.updateResourceRequests() - handler.getNumExecutorsRunning should be (0) - handler.getPendingAllocate.size should be (4) - - val container1 = createContainer("host1") - val container2 = createContainer("host1") - val container3 = createContainer("host2") - handler.handleAllocatedContainers(Array(container1, container2, container3)) - - handler.getNumExecutorsRunning should be (3) - handler.allocatedContainerToHostMap.get(container1.getId).get should be ("host1") - handler.allocatedContainerToHostMap.get(container2.getId).get should be ("host1") - handler.allocatedContainerToHostMap.get(container3.getId).get should be ("host2") - handler.allocatedHostToContainersMap.get("host1").get should contain (container1.getId) - handler.allocatedHostToContainersMap.get("host1").get should contain (container2.getId) - handler.allocatedHostToContainersMap.get("host2").get should contain (container3.getId) - } - - test("receive more containers than requested") { - val handler = createAllocator(2) - handler.updateResourceRequests() - handler.getNumExecutorsRunning should be (0) - handler.getPendingAllocate.size should be (2) - - val container1 = createContainer("host1") - val container2 = createContainer("host2") - val container3 = createContainer("host4") - handler.handleAllocatedContainers(Array(container1, container2, container3)) - - handler.getNumExecutorsRunning should be (2) - handler.allocatedContainerToHostMap.get(container1.getId).get should be ("host1") - handler.allocatedContainerToHostMap.get(container2.getId).get should be ("host2") - handler.allocatedContainerToHostMap.contains(container3.getId) should be (false) - handler.allocatedHostToContainersMap.get("host1").get should contain (container1.getId) - handler.allocatedHostToContainersMap.get("host2").get should contain (container2.getId) - handler.allocatedHostToContainersMap.contains("host4") should be (false) - } - - test("decrease total requested executors") { - val handler = createAllocator(4) - handler.updateResourceRequests() - handler.getNumExecutorsRunning should be (0) - handler.getPendingAllocate.size should be (4) - - handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty) - handler.updateResourceRequests() - handler.getPendingAllocate.size should be (3) - - val container = createContainer("host1") - handler.handleAllocatedContainers(Array(container)) - - handler.getNumExecutorsRunning should be (1) - handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1") - handler.allocatedHostToContainersMap.get("host1").get should contain (container.getId) - - handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map.empty) - handler.updateResourceRequests() - handler.getPendingAllocate.size should be (1) - } - - test("decrease total requested executors to less than currently running") { - val handler = createAllocator(4) - handler.updateResourceRequests() - handler.getNumExecutorsRunning should be (0) - handler.getPendingAllocate.size should be (4) - - handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty) - handler.updateResourceRequests() - handler.getPendingAllocate.size should be (3) - - val container1 = createContainer("host1") - val container2 = createContainer("host2") - handler.handleAllocatedContainers(Array(container1, container2)) - - handler.getNumExecutorsRunning should be (2) - - handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map.empty) - handler.updateResourceRequests() - handler.getPendingAllocate.size should be (0) - handler.getNumExecutorsRunning should be (2) - } - - test("kill executors") { - val handler = createAllocator(4) - handler.updateResourceRequests() - handler.getNumExecutorsRunning should be (0) - handler.getPendingAllocate.size should be (4) - - val container1 = createContainer("host1") - val container2 = createContainer("host2") - handler.handleAllocatedContainers(Array(container1, container2)) - - handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map.empty) - handler.executorIdToContainer.keys.foreach { id => handler.killExecutor(id ) } - - val statuses = Seq(container1, container2).map { c => - ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, "Finished", 0) - } - handler.updateResourceRequests() - handler.processCompletedContainers(statuses.toSeq) - handler.getNumExecutorsRunning should be (0) - handler.getPendingAllocate.size should be (1) - } - - test("lost executor removed from backend") { - val handler = createAllocator(4) - handler.updateResourceRequests() - handler.getNumExecutorsRunning should be (0) - handler.getPendingAllocate.size should be (4) - - val container1 = createContainer("host1") - val container2 = createContainer("host2") - handler.handleAllocatedContainers(Array(container1, container2)) - - handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map()) - - val statuses = Seq(container1, container2).map { c => - ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, "Failed", -1) - } - handler.updateResourceRequests() - handler.processCompletedContainers(statuses.toSeq) - handler.updateResourceRequests() - handler.getNumExecutorsRunning should be (0) - handler.getPendingAllocate.size should be (2) - handler.getNumExecutorsFailed should be (2) - handler.getNumUnexpectedContainerRelease should be (2) - } - - test("memory exceeded diagnostic regexes") { - val diagnostics = - "Container [pid=12465,containerID=container_1412887393566_0003_01_000002] is running " + - "beyond physical memory limits. Current usage: 2.1 MB of 2 GB physical memory used; " + - "5.8 GB of 4.2 GB virtual memory used. Killing container." - val vmemMsg = memLimitExceededLogMessage(diagnostics, VMEM_EXCEEDED_PATTERN) - val pmemMsg = memLimitExceededLogMessage(diagnostics, PMEM_EXCEEDED_PATTERN) - assert(vmemMsg.contains("5.8 GB of 4.2 GB virtual memory used.")) - assert(pmemMsg.contains("2.1 MB of 2 GB physical memory used.")) - } - - test("window based failure executor counting") { - sparkConf.set("spark.yarn.executor.failuresValidityInterval", "100s") - val handler = createAllocator(4) - val clock = new ManualClock(0L) - handler.setClock(clock) - - handler.updateResourceRequests() - handler.getNumExecutorsRunning should be (0) - handler.getPendingAllocate.size should be (4) - - val containers = Seq( - createContainer("host1"), - createContainer("host2"), - createContainer("host3"), - createContainer("host4") - ) - handler.handleAllocatedContainers(containers) - - val failedStatuses = containers.map { c => - ContainerStatus.newInstance(c.getId, ContainerState.COMPLETE, "Failed", -1) - } - - handler.getNumExecutorsFailed should be (0) - - clock.advance(100 * 1000L) - handler.processCompletedContainers(failedStatuses.slice(0, 1)) - handler.getNumExecutorsFailed should be (1) - - clock.advance(101 * 1000L) - handler.getNumExecutorsFailed should be (0) - - handler.processCompletedContainers(failedStatuses.slice(1, 3)) - handler.getNumExecutorsFailed should be (2) - - clock.advance(50 * 1000L) - handler.processCompletedContainers(failedStatuses.slice(3, 4)) - handler.getNumExecutorsFailed should be (3) - - clock.advance(51 * 1000L) - handler.getNumExecutorsFailed should be (1) - - clock.advance(50 * 1000L) - handler.getNumExecutorsFailed should be (0) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala ---------------------------------------------------------------------- diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala deleted file mode 100644 index 99fb58a..0000000 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ /dev/null @@ -1,493 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import java.io.File -import java.net.URL -import java.nio.charset.StandardCharsets -import java.util.{HashMap => JHashMap} - -import scala.collection.mutable -import scala.concurrent.duration._ -import scala.language.postfixOps - -import com.google.common.io.{ByteStreams, Files} -import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.scalatest.Matchers -import org.scalatest.concurrent.Eventually._ - -import org.apache.spark._ -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.deploy.yarn.config._ -import org.apache.spark.internal.Logging -import org.apache.spark.launcher._ -import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationStart, - SparkListenerExecutorAdded} -import org.apache.spark.scheduler.cluster.ExecutorInfo -import org.apache.spark.tags.ExtendedYarnTest -import org.apache.spark.util.Utils - -/** - * Integration tests for YARN; these tests use a mini Yarn cluster to run Spark-on-YARN - * applications, and require the Spark assembly to be built before they can be successfully - * run. - */ -@ExtendedYarnTest -class YarnClusterSuite extends BaseYarnClusterSuite { - - override def newYarnConfig(): YarnConfiguration = new YarnConfiguration() - - private val TEST_PYFILE = """ - |import mod1, mod2 - |import sys - |from operator import add - | - |from pyspark import SparkConf , SparkContext - |if __name__ == "__main__": - | if len(sys.argv) != 2: - | print >> sys.stderr, "Usage: test.py [result file]" - | exit(-1) - | sc = SparkContext(conf=SparkConf()) - | status = open(sys.argv[1],'w') - | result = "failure" - | rdd = sc.parallelize(range(10)).map(lambda x: x * mod1.func() * mod2.func()) - | cnt = rdd.count() - | if cnt == 10: - | result = "success" - | status.write(result) - | status.close() - | sc.stop() - """.stripMargin - - private val TEST_PYMODULE = """ - |def func(): - | return 42 - """.stripMargin - - test("run Spark in yarn-client mode") { - testBasicYarnApp(true) - } - - test("run Spark in yarn-cluster mode") { - testBasicYarnApp(false) - } - - test("run Spark in yarn-client mode with different configurations") { - testBasicYarnApp(true, - Map( - "spark.driver.memory" -> "512m", - "spark.executor.cores" -> "1", - "spark.executor.memory" -> "512m", - "spark.executor.instances" -> "2" - )) - } - - test("run Spark in yarn-cluster mode with different configurations") { - testBasicYarnApp(false, - Map( - "spark.driver.memory" -> "512m", - "spark.driver.cores" -> "1", - "spark.executor.cores" -> "1", - "spark.executor.memory" -> "512m", - "spark.executor.instances" -> "2" - )) - } - - test("run Spark in yarn-cluster mode with using SparkHadoopUtil.conf") { - testYarnAppUseSparkHadoopUtilConf() - } - - test("run Spark in yarn-client mode with additional jar") { - testWithAddJar(true) - } - - test("run Spark in yarn-cluster mode with additional jar") { - testWithAddJar(false) - } - - test("run Spark in yarn-cluster mode unsuccessfully") { - // Don't provide arguments so the driver will fail. - val finalState = runSpark(false, mainClassName(YarnClusterDriver.getClass)) - finalState should be (SparkAppHandle.State.FAILED) - } - - test("run Spark in yarn-cluster mode failure after sc initialized") { - val finalState = runSpark(false, mainClassName(YarnClusterDriverWithFailure.getClass)) - finalState should be (SparkAppHandle.State.FAILED) - } - - test("run Python application in yarn-client mode") { - testPySpark(true) - } - - test("run Python application in yarn-cluster mode") { - testPySpark(false) - } - - test("run Python application in yarn-cluster mode using " + - " spark.yarn.appMasterEnv to override local envvar") { - testPySpark( - clientMode = false, - extraConf = Map( - "spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON" - -> sys.env.getOrElse("PYSPARK_DRIVER_PYTHON", "python"), - "spark.yarn.appMasterEnv.PYSPARK_PYTHON" - -> sys.env.getOrElse("PYSPARK_PYTHON", "python")), - extraEnv = Map( - "PYSPARK_DRIVER_PYTHON" -> "not python", - "PYSPARK_PYTHON" -> "not python")) - } - - test("user class path first in client mode") { - testUseClassPathFirst(true) - } - - test("user class path first in cluster mode") { - testUseClassPathFirst(false) - } - - test("monitor app using launcher library") { - val env = new JHashMap[String, String]() - env.put("YARN_CONF_DIR", hadoopConfDir.getAbsolutePath()) - - val propsFile = createConfFile() - val handle = new SparkLauncher(env) - .setSparkHome(sys.props("spark.test.home")) - .setConf("spark.ui.enabled", "false") - .setPropertiesFile(propsFile) - .setMaster("yarn") - .setDeployMode("client") - .setAppResource(SparkLauncher.NO_RESOURCE) - .setMainClass(mainClassName(YarnLauncherTestApp.getClass)) - .startApplication() - - try { - eventually(timeout(30 seconds), interval(100 millis)) { - handle.getState() should be (SparkAppHandle.State.RUNNING) - } - - handle.getAppId() should not be (null) - handle.getAppId() should startWith ("application_") - handle.stop() - - eventually(timeout(30 seconds), interval(100 millis)) { - handle.getState() should be (SparkAppHandle.State.KILLED) - } - } finally { - handle.kill() - } - } - - test("timeout to get SparkContext in cluster mode triggers failure") { - val timeout = 2000 - val finalState = runSpark(false, mainClassName(SparkContextTimeoutApp.getClass), - appArgs = Seq((timeout * 4).toString), - extraConf = Map(AM_MAX_WAIT_TIME.key -> timeout.toString)) - finalState should be (SparkAppHandle.State.FAILED) - } - - private def testBasicYarnApp(clientMode: Boolean, conf: Map[String, String] = Map()): Unit = { - val result = File.createTempFile("result", null, tempDir) - val finalState = runSpark(clientMode, mainClassName(YarnClusterDriver.getClass), - appArgs = Seq(result.getAbsolutePath()), - extraConf = conf) - checkResult(finalState, result) - } - - private def testYarnAppUseSparkHadoopUtilConf(): Unit = { - val result = File.createTempFile("result", null, tempDir) - val finalState = runSpark(false, - mainClassName(YarnClusterDriverUseSparkHadoopUtilConf.getClass), - appArgs = Seq("key=value", result.getAbsolutePath()), - extraConf = Map("spark.hadoop.key" -> "value")) - checkResult(finalState, result) - } - - private def testWithAddJar(clientMode: Boolean): Unit = { - val originalJar = TestUtils.createJarWithFiles(Map("test.resource" -> "ORIGINAL"), tempDir) - val driverResult = File.createTempFile("driver", null, tempDir) - val executorResult = File.createTempFile("executor", null, tempDir) - val finalState = runSpark(clientMode, mainClassName(YarnClasspathTest.getClass), - appArgs = Seq(driverResult.getAbsolutePath(), executorResult.getAbsolutePath()), - extraClassPath = Seq(originalJar.getPath()), - extraJars = Seq("local:" + originalJar.getPath())) - checkResult(finalState, driverResult, "ORIGINAL") - checkResult(finalState, executorResult, "ORIGINAL") - } - - private def testPySpark( - clientMode: Boolean, - extraConf: Map[String, String] = Map(), - extraEnv: Map[String, String] = Map()): Unit = { - val primaryPyFile = new File(tempDir, "test.py") - Files.write(TEST_PYFILE, primaryPyFile, StandardCharsets.UTF_8) - - // When running tests, let's not assume the user has built the assembly module, which also - // creates the pyspark archive. Instead, let's use PYSPARK_ARCHIVES_PATH to point at the - // needed locations. - val sparkHome = sys.props("spark.test.home") - val pythonPath = Seq( - s"$sparkHome/python/lib/py4j-0.10.4-src.zip", - s"$sparkHome/python") - val extraEnvVars = Map( - "PYSPARK_ARCHIVES_PATH" -> pythonPath.map("local:" + _).mkString(File.pathSeparator), - "PYTHONPATH" -> pythonPath.mkString(File.pathSeparator)) ++ extraEnv - - val moduleDir = - if (clientMode) { - // In client-mode, .py files added with --py-files are not visible in the driver. - // This is something that the launcher library would have to handle. - tempDir - } else { - val subdir = new File(tempDir, "pyModules") - subdir.mkdir() - subdir - } - val pyModule = new File(moduleDir, "mod1.py") - Files.write(TEST_PYMODULE, pyModule, StandardCharsets.UTF_8) - - val mod2Archive = TestUtils.createJarWithFiles(Map("mod2.py" -> TEST_PYMODULE), moduleDir) - val pyFiles = Seq(pyModule.getAbsolutePath(), mod2Archive.getPath()).mkString(",") - val result = File.createTempFile("result", null, tempDir) - - val finalState = runSpark(clientMode, primaryPyFile.getAbsolutePath(), - sparkArgs = Seq("--py-files" -> pyFiles), - appArgs = Seq(result.getAbsolutePath()), - extraEnv = extraEnvVars, - extraConf = extraConf) - checkResult(finalState, result) - } - - private def testUseClassPathFirst(clientMode: Boolean): Unit = { - // Create a jar file that contains a different version of "test.resource". - val originalJar = TestUtils.createJarWithFiles(Map("test.resource" -> "ORIGINAL"), tempDir) - val userJar = TestUtils.createJarWithFiles(Map("test.resource" -> "OVERRIDDEN"), tempDir) - val driverResult = File.createTempFile("driver", null, tempDir) - val executorResult = File.createTempFile("executor", null, tempDir) - val finalState = runSpark(clientMode, mainClassName(YarnClasspathTest.getClass), - appArgs = Seq(driverResult.getAbsolutePath(), executorResult.getAbsolutePath()), - extraClassPath = Seq(originalJar.getPath()), - extraJars = Seq("local:" + userJar.getPath()), - extraConf = Map( - "spark.driver.userClassPathFirst" -> "true", - "spark.executor.userClassPathFirst" -> "true")) - checkResult(finalState, driverResult, "OVERRIDDEN") - checkResult(finalState, executorResult, "OVERRIDDEN") - } - -} - -private[spark] class SaveExecutorInfo extends SparkListener { - val addedExecutorInfos = mutable.Map[String, ExecutorInfo]() - var driverLogs: Option[collection.Map[String, String]] = None - - override def onExecutorAdded(executor: SparkListenerExecutorAdded) { - addedExecutorInfos(executor.executorId) = executor.executorInfo - } - - override def onApplicationStart(appStart: SparkListenerApplicationStart): Unit = { - driverLogs = appStart.driverLogs - } -} - -private object YarnClusterDriverWithFailure extends Logging with Matchers { - def main(args: Array[String]): Unit = { - val sc = new SparkContext(new SparkConf() - .set("spark.extraListeners", classOf[SaveExecutorInfo].getName) - .setAppName("yarn test with failure")) - - throw new Exception("exception after sc initialized") - } -} - -private object YarnClusterDriverUseSparkHadoopUtilConf extends Logging with Matchers { - def main(args: Array[String]): Unit = { - if (args.length != 2) { - // scalastyle:off println - System.err.println( - s""" - |Invalid command line: ${args.mkString(" ")} - | - |Usage: YarnClusterDriverUseSparkHadoopUtilConf [hadoopConfKey=value] [result file] - """.stripMargin) - // scalastyle:on println - System.exit(1) - } - - val sc = new SparkContext(new SparkConf() - .set("spark.extraListeners", classOf[SaveExecutorInfo].getName) - .setAppName("yarn test using SparkHadoopUtil's conf")) - - val kv = args(0).split("=") - val status = new File(args(1)) - var result = "failure" - try { - SparkHadoopUtil.get.conf.get(kv(0)) should be (kv(1)) - result = "success" - } finally { - Files.write(result, status, StandardCharsets.UTF_8) - sc.stop() - } - } -} - -private object YarnClusterDriver extends Logging with Matchers { - - val WAIT_TIMEOUT_MILLIS = 10000 - - def main(args: Array[String]): Unit = { - if (args.length != 1) { - // scalastyle:off println - System.err.println( - s""" - |Invalid command line: ${args.mkString(" ")} - | - |Usage: YarnClusterDriver [result file] - """.stripMargin) - // scalastyle:on println - System.exit(1) - } - - val sc = new SparkContext(new SparkConf() - .set("spark.extraListeners", classOf[SaveExecutorInfo].getName) - .setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns")) - val conf = sc.getConf - val status = new File(args(0)) - var result = "failure" - try { - val data = sc.parallelize(1 to 4, 4).collect().toSet - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) - data should be (Set(1, 2, 3, 4)) - result = "success" - - // Verify that the config archive is correctly placed in the classpath of all containers. - val confFile = "/" + Client.SPARK_CONF_FILE - assert(getClass().getResource(confFile) != null) - val configFromExecutors = sc.parallelize(1 to 4, 4) - .map { _ => Option(getClass().getResource(confFile)).map(_.toString).orNull } - .collect() - assert(configFromExecutors.find(_ == null) === None) - } finally { - Files.write(result, status, StandardCharsets.UTF_8) - sc.stop() - } - - // verify log urls are present - val listeners = sc.listenerBus.findListenersByClass[SaveExecutorInfo] - assert(listeners.size === 1) - val listener = listeners(0) - val executorInfos = listener.addedExecutorInfos.values - assert(executorInfos.nonEmpty) - executorInfos.foreach { info => - assert(info.logUrlMap.nonEmpty) - } - - // If we are running in yarn-cluster mode, verify that driver logs links and present and are - // in the expected format. - if (conf.get("spark.submit.deployMode") == "cluster") { - assert(listener.driverLogs.nonEmpty) - val driverLogs = listener.driverLogs.get - assert(driverLogs.size === 2) - assert(driverLogs.contains("stderr")) - assert(driverLogs.contains("stdout")) - val urlStr = driverLogs("stderr") - // Ensure that this is a valid URL, else this will throw an exception - new URL(urlStr) - val containerId = YarnSparkHadoopUtil.get.getContainerId - val user = Utils.getCurrentUserName() - assert(urlStr.endsWith(s"/node/containerlogs/$containerId/$user/stderr?start=-4096")) - } - } - -} - -private object YarnClasspathTest extends Logging { - def error(m: String, ex: Throwable = null): Unit = { - logError(m, ex) - // scalastyle:off println - System.out.println(m) - if (ex != null) { - ex.printStackTrace(System.out) - } - // scalastyle:on println - } - - def main(args: Array[String]): Unit = { - if (args.length != 2) { - error( - s""" - |Invalid command line: ${args.mkString(" ")} - | - |Usage: YarnClasspathTest [driver result file] [executor result file] - """.stripMargin) - // scalastyle:on println - } - - readResource(args(0)) - val sc = new SparkContext(new SparkConf()) - try { - sc.parallelize(Seq(1)).foreach { x => readResource(args(1)) } - } finally { - sc.stop() - } - } - - private def readResource(resultPath: String): Unit = { - var result = "failure" - try { - val ccl = Thread.currentThread().getContextClassLoader() - val resource = ccl.getResourceAsStream("test.resource") - val bytes = ByteStreams.toByteArray(resource) - result = new String(bytes, 0, bytes.length, StandardCharsets.UTF_8) - } catch { - case t: Throwable => - error(s"loading test.resource to $resultPath", t) - } finally { - Files.write(result, new File(resultPath), StandardCharsets.UTF_8) - } - } - -} - -private object YarnLauncherTestApp { - - def main(args: Array[String]): Unit = { - // Do not stop the application; the test will stop it using the launcher lib. Just run a task - // that will prevent the process from exiting. - val sc = new SparkContext(new SparkConf()) - sc.parallelize(Seq(1)).foreach { i => - this.synchronized { - wait() - } - } - } - -} - -/** - * Used to test code in the AM that detects the SparkContext instance. Expects a single argument - * with the duration to sleep for, in ms. - */ -private object SparkContextTimeoutApp { - - def main(args: Array[String]): Unit = { - val Array(sleepTime) = args - Thread.sleep(java.lang.Long.parseLong(sleepTime)) - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala ---------------------------------------------------------------------- diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala deleted file mode 100644 index 950ebd9..0000000 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala +++ /dev/null @@ -1,112 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.spark.deploy.yarn - -import java.io.File -import java.nio.charset.StandardCharsets - -import com.google.common.io.Files -import org.apache.commons.io.FileUtils -import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.scalatest.Matchers - -import org.apache.spark._ -import org.apache.spark.internal.Logging -import org.apache.spark.network.shuffle.ShuffleTestAccessor -import org.apache.spark.network.yarn.{YarnShuffleService, YarnTestAccessor} -import org.apache.spark.tags.ExtendedYarnTest - -/** - * Integration test for the external shuffle service with a yarn mini-cluster - */ -@ExtendedYarnTest -class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite { - - override def newYarnConfig(): YarnConfiguration = { - val yarnConfig = new YarnConfiguration() - yarnConfig.set(YarnConfiguration.NM_AUX_SERVICES, "spark_shuffle") - yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark_shuffle"), - classOf[YarnShuffleService].getCanonicalName) - yarnConfig.set("spark.shuffle.service.port", "0") - yarnConfig - } - - test("external shuffle service") { - val shuffleServicePort = YarnTestAccessor.getShuffleServicePort - val shuffleService = YarnTestAccessor.getShuffleServiceInstance - - val registeredExecFile = YarnTestAccessor.getRegisteredExecutorFile(shuffleService) - - logInfo("Shuffle service port = " + shuffleServicePort) - val result = File.createTempFile("result", null, tempDir) - val finalState = runSpark( - false, - mainClassName(YarnExternalShuffleDriver.getClass), - appArgs = Seq(result.getAbsolutePath(), registeredExecFile.getAbsolutePath), - extraConf = Map( - "spark.shuffle.service.enabled" -> "true", - "spark.shuffle.service.port" -> shuffleServicePort.toString - ) - ) - checkResult(finalState, result) - assert(YarnTestAccessor.getRegisteredExecutorFile(shuffleService).exists()) - } -} - -private object YarnExternalShuffleDriver extends Logging with Matchers { - - val WAIT_TIMEOUT_MILLIS = 10000 - - def main(args: Array[String]): Unit = { - if (args.length != 2) { - // scalastyle:off println - System.err.println( - s""" - |Invalid command line: ${args.mkString(" ")} - | - |Usage: ExternalShuffleDriver [result file] [registered exec file] - """.stripMargin) - // scalastyle:on println - System.exit(1) - } - - val sc = new SparkContext(new SparkConf() - .setAppName("External Shuffle Test")) - val conf = sc.getConf - val status = new File(args(0)) - val registeredExecFile = new File(args(1)) - logInfo("shuffle service executor file = " + registeredExecFile) - var result = "failure" - val execStateCopy = new File(registeredExecFile.getAbsolutePath + "_dup") - try { - val data = sc.parallelize(0 until 100, 10).map { x => (x % 10) -> x }.reduceByKey{ _ + _ }. - collect().toSet - sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) - data should be ((0 until 10).map{x => x -> (x * 10 + 450)}.toSet) - result = "success" - // only one process can open a leveldb file at a time, so we copy the files - FileUtils.copyDirectory(registeredExecFile, execStateCopy) - assert(!ShuffleTestAccessor.reloadRegisteredExecutors(execStateCopy).isEmpty) - } finally { - sc.stop() - FileUtils.deleteDirectory(execStateCopy) - Files.write(result, status, StandardCharsets.UTF_8) - } - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala ---------------------------------------------------------------------- diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala deleted file mode 100644 index 7fbbe12..0000000 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala +++ /dev/null @@ -1,213 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import java.io.{File, IOException} -import java.nio.charset.StandardCharsets - -import com.google.common.io.{ByteStreams, Files} -import org.apache.hadoop.io.Text -import org.apache.hadoop.yarn.api.ApplicationConstants -import org.apache.hadoop.yarn.api.ApplicationConstants.Environment -import org.apache.hadoop.yarn.api.records.ApplicationAccessType -import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.scalatest.Matchers - -import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.internal.Logging -import org.apache.spark.util.{ResetSystemProperties, Utils} - -class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging - with ResetSystemProperties { - - val hasBash = - try { - val exitCode = Runtime.getRuntime().exec(Array("bash", "--version")).waitFor() - exitCode == 0 - } catch { - case e: IOException => - false - } - - if (!hasBash) { - logWarning("Cannot execute bash, skipping bash tests.") - } - - def bashTest(name: String)(fn: => Unit): Unit = - if (hasBash) test(name)(fn) else ignore(name)(fn) - - bashTest("shell script escaping") { - val scriptFile = File.createTempFile("script.", ".sh", Utils.createTempDir()) - val args = Array("arg1", "${arg.2}", "\"arg3\"", "'arg4'", "$arg5", "\\arg6") - try { - val argLine = args.map(a => YarnSparkHadoopUtil.escapeForShell(a)).mkString(" ") - Files.write(("bash -c \"echo " + argLine + "\"").getBytes(StandardCharsets.UTF_8), scriptFile) - scriptFile.setExecutable(true) - - val proc = Runtime.getRuntime().exec(Array(scriptFile.getAbsolutePath())) - val out = new String(ByteStreams.toByteArray(proc.getInputStream())).trim() - val err = new String(ByteStreams.toByteArray(proc.getErrorStream())) - val exitCode = proc.waitFor() - exitCode should be (0) - out should be (args.mkString(" ")) - } finally { - scriptFile.delete() - } - } - - test("Yarn configuration override") { - val key = "yarn.nodemanager.hostname" - val default = new YarnConfiguration() - - val sparkConf = new SparkConf() - .set("spark.hadoop." + key, "someHostName") - val yarnConf = new YarnSparkHadoopUtil().newConfiguration(sparkConf) - - yarnConf.getClass() should be (classOf[YarnConfiguration]) - yarnConf.get(key) should not be default.get(key) - } - - - test("test getApplicationAclsForYarn acls on") { - - // spark acls on, just pick up default user - val sparkConf = new SparkConf() - sparkConf.set("spark.acls.enable", "true") - - val securityMgr = new SecurityManager(sparkConf) - val acls = YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr) - - val viewAcls = acls.get(ApplicationAccessType.VIEW_APP) - val modifyAcls = acls.get(ApplicationAccessType.MODIFY_APP) - - viewAcls match { - case Some(vacls) => - val aclSet = vacls.split(',').map(_.trim).toSet - assert(aclSet.contains(System.getProperty("user.name", "invalid"))) - case None => - fail() - } - modifyAcls match { - case Some(macls) => - val aclSet = macls.split(',').map(_.trim).toSet - assert(aclSet.contains(System.getProperty("user.name", "invalid"))) - case None => - fail() - } - } - - test("test getApplicationAclsForYarn acls on and specify users") { - - // default spark acls are on and specify acls - val sparkConf = new SparkConf() - sparkConf.set("spark.acls.enable", "true") - sparkConf.set("spark.ui.view.acls", "user1,user2") - sparkConf.set("spark.modify.acls", "user3,user4") - - val securityMgr = new SecurityManager(sparkConf) - val acls = YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr) - - val viewAcls = acls.get(ApplicationAccessType.VIEW_APP) - val modifyAcls = acls.get(ApplicationAccessType.MODIFY_APP) - - viewAcls match { - case Some(vacls) => - val aclSet = vacls.split(',').map(_.trim).toSet - assert(aclSet.contains("user1")) - assert(aclSet.contains("user2")) - assert(aclSet.contains(System.getProperty("user.name", "invalid"))) - case None => - fail() - } - modifyAcls match { - case Some(macls) => - val aclSet = macls.split(',').map(_.trim).toSet - assert(aclSet.contains("user3")) - assert(aclSet.contains("user4")) - assert(aclSet.contains(System.getProperty("user.name", "invalid"))) - case None => - fail() - } - - } - - test("test expandEnvironment result") { - val target = Environment.PWD - if (classOf[Environment].getMethods().exists(_.getName == "$$")) { - YarnSparkHadoopUtil.expandEnvironment(target) should be ("{{" + target + "}}") - } else if (Utils.isWindows) { - YarnSparkHadoopUtil.expandEnvironment(target) should be ("%" + target + "%") - } else { - YarnSparkHadoopUtil.expandEnvironment(target) should be ("$" + target) - } - - } - - test("test getClassPathSeparator result") { - if (classOf[ApplicationConstants].getFields().exists(_.getName == "CLASS_PATH_SEPARATOR")) { - YarnSparkHadoopUtil.getClassPathSeparator() should be ("<CPS>") - } else if (Utils.isWindows) { - YarnSparkHadoopUtil.getClassPathSeparator() should be (";") - } else { - YarnSparkHadoopUtil.getClassPathSeparator() should be (":") - } - } - - test("check different hadoop utils based on env variable") { - try { - System.setProperty("SPARK_YARN_MODE", "true") - assert(SparkHadoopUtil.get.getClass === classOf[YarnSparkHadoopUtil]) - System.setProperty("SPARK_YARN_MODE", "false") - assert(SparkHadoopUtil.get.getClass === classOf[SparkHadoopUtil]) - } finally { - System.clearProperty("SPARK_YARN_MODE") - } - } - - - - // This test needs to live here because it depends on isYarnMode returning true, which can only - // happen in the YARN module. - test("security manager token generation") { - try { - System.setProperty("SPARK_YARN_MODE", "true") - val initial = SparkHadoopUtil.get - .getSecretKeyFromUserCredentials(SecurityManager.SECRET_LOOKUP_KEY) - assert(initial === null || initial.length === 0) - - val conf = new SparkConf() - .set(SecurityManager.SPARK_AUTH_CONF, "true") - .set(SecurityManager.SPARK_AUTH_SECRET_CONF, "unused") - val sm = new SecurityManager(conf) - - val generated = SparkHadoopUtil.get - .getSecretKeyFromUserCredentials(SecurityManager.SECRET_LOOKUP_KEY) - assert(generated != null) - val genString = new Text(generated).toString() - assert(genString != "unused") - assert(sm.getSecretKey() === genString) - } finally { - // removeSecretKey() was only added in Hadoop 2.6, so instead we just set the secret - // to an empty string. - SparkHadoopUtil.get.addSecretKeyToUserCredentials(SecurityManager.SECRET_LOOKUP_KEY, "") - System.clearProperty("SPARK_YARN_MODE") - } - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala ---------------------------------------------------------------------- diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala deleted file mode 100644 index db4619e..0000000 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn.security - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.io.Text -import org.apache.hadoop.security.Credentials -import org.apache.hadoop.security.token.Token -import org.scalatest.{BeforeAndAfter, Matchers} - -import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.yarn.config._ - -class ConfigurableCredentialManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfter { - private var credentialManager: ConfigurableCredentialManager = null - private var sparkConf: SparkConf = null - private var hadoopConf: Configuration = null - - override def beforeAll(): Unit = { - super.beforeAll() - - sparkConf = new SparkConf() - hadoopConf = new Configuration() - System.setProperty("SPARK_YARN_MODE", "true") - } - - override def afterAll(): Unit = { - System.clearProperty("SPARK_YARN_MODE") - - super.afterAll() - } - - test("Correctly load default credential providers") { - credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf) - - credentialManager.getServiceCredentialProvider("hdfs") should not be (None) - credentialManager.getServiceCredentialProvider("hbase") should not be (None) - credentialManager.getServiceCredentialProvider("hive") should not be (None) - } - - test("disable hive credential provider") { - sparkConf.set("spark.yarn.security.credentials.hive.enabled", "false") - credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf) - - credentialManager.getServiceCredentialProvider("hdfs") should not be (None) - credentialManager.getServiceCredentialProvider("hbase") should not be (None) - credentialManager.getServiceCredentialProvider("hive") should be (None) - } - - test("using deprecated configurations") { - sparkConf.set("spark.yarn.security.tokens.hdfs.enabled", "false") - sparkConf.set("spark.yarn.security.tokens.hive.enabled", "false") - credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf) - - credentialManager.getServiceCredentialProvider("hdfs") should be (None) - credentialManager.getServiceCredentialProvider("hive") should be (None) - credentialManager.getServiceCredentialProvider("test") should not be (None) - credentialManager.getServiceCredentialProvider("hbase") should not be (None) - } - - test("verify obtaining credentials from provider") { - credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf) - val creds = new Credentials() - - // Tokens can only be obtained from TestTokenProvider, for hdfs, hbase and hive tokens cannot - // be obtained. - credentialManager.obtainCredentials(hadoopConf, creds) - val tokens = creds.getAllTokens - tokens.size() should be (1) - tokens.iterator().next().getService should be (new Text("test")) - } - - test("verify getting credential renewal info") { - credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf) - val creds = new Credentials() - - val testCredentialProvider = credentialManager.getServiceCredentialProvider("test").get - .asInstanceOf[TestCredentialProvider] - // Only TestTokenProvider can get the time of next token renewal - val nextRenewal = credentialManager.obtainCredentials(hadoopConf, creds) - nextRenewal should be (testCredentialProvider.timeOfNextTokenRenewal) - } - - test("obtain tokens For HiveMetastore") { - val hadoopConf = new Configuration() - hadoopConf.set("hive.metastore.kerberos.principal", "bob") - // thrift picks up on port 0 and bails out, without trying to talk to endpoint - hadoopConf.set("hive.metastore.uris", "http://localhost:0") - - val hiveCredentialProvider = new HiveCredentialProvider() - val credentials = new Credentials() - hiveCredentialProvider.obtainCredentials(hadoopConf, sparkConf, credentials) - - credentials.getAllTokens.size() should be (0) - } - - test("Obtain tokens For HBase") { - val hadoopConf = new Configuration() - hadoopConf.set("hbase.security.authentication", "kerberos") - - val hbaseTokenProvider = new HBaseCredentialProvider() - val creds = new Credentials() - hbaseTokenProvider.obtainCredentials(hadoopConf, sparkConf, creds) - - creds.getAllTokens.size should be (0) - } -} - -class TestCredentialProvider extends ServiceCredentialProvider { - val tokenRenewalInterval = 86400 * 1000L - var timeOfNextTokenRenewal = 0L - - override def serviceName: String = "test" - - override def credentialsRequired(conf: Configuration): Boolean = true - - override def obtainCredentials( - hadoopConf: Configuration, - sparkConf: SparkConf, - creds: Credentials): Option[Long] = { - if (creds == null) { - // Guard out other unit test failures. - return None - } - - val emptyToken = new Token() - emptyToken.setService(new Text("test")) - creds.addToken(emptyToken.getService, emptyToken) - - val currTime = System.currentTimeMillis() - timeOfNextTokenRenewal = (currTime - currTime % tokenRenewalInterval) + tokenRenewalInterval - - Some(timeOfNextTokenRenewal) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProviderSuite.scala ---------------------------------------------------------------------- diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProviderSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProviderSuite.scala deleted file mode 100644 index 7b2da3f..0000000 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HDFSCredentialProviderSuite.scala +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn.security - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path -import org.scalatest.{Matchers, PrivateMethodTester} - -import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} - -class HDFSCredentialProviderSuite - extends SparkFunSuite - with PrivateMethodTester - with Matchers { - private val _getTokenRenewer = PrivateMethod[String]('getTokenRenewer) - - private def getTokenRenewer( - hdfsCredentialProvider: HDFSCredentialProvider, conf: Configuration): String = { - hdfsCredentialProvider invokePrivate _getTokenRenewer(conf) - } - - private var hdfsCredentialProvider: HDFSCredentialProvider = null - - override def beforeAll() { - super.beforeAll() - - if (hdfsCredentialProvider == null) { - hdfsCredentialProvider = new HDFSCredentialProvider() - } - } - - override def afterAll() { - if (hdfsCredentialProvider != null) { - hdfsCredentialProvider = null - } - - super.afterAll() - } - - test("check token renewer") { - val hadoopConf = new Configuration() - hadoopConf.set("yarn.resourcemanager.address", "myrm:8033") - hadoopConf.set("yarn.resourcemanager.principal", "yarn/myrm:[email protected]") - val renewer = getTokenRenewer(hdfsCredentialProvider, hadoopConf) - renewer should be ("yarn/myrm:[email protected]") - } - - test("check token renewer default") { - val hadoopConf = new Configuration() - val caught = - intercept[SparkException] { - getTokenRenewer(hdfsCredentialProvider, hadoopConf) - } - assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer") - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/test/scala/org/apache/spark/launcher/TestClasspathBuilder.scala ---------------------------------------------------------------------- diff --git a/yarn/src/test/scala/org/apache/spark/launcher/TestClasspathBuilder.scala b/yarn/src/test/scala/org/apache/spark/launcher/TestClasspathBuilder.scala deleted file mode 100644 index da9e8e2..0000000 --- a/yarn/src/test/scala/org/apache/spark/launcher/TestClasspathBuilder.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.launcher - -import java.util.{List => JList, Map => JMap} - -/** - * Exposes AbstractCommandBuilder to the YARN tests, so that they can build classpaths the same - * way other cluster managers do. - */ -private[spark] class TestClasspathBuilder extends AbstractCommandBuilder { - - childEnv.put(CommandBuilderUtils.ENV_SPARK_HOME, sys.props("spark.test.home")) - - override def buildClassPath(extraCp: String): JList[String] = super.buildClassPath(extraCp) - - /** Not used by the YARN tests. */ - override def buildCommand(env: JMap[String, String]): JList[String] = - throw new UnsupportedOperationException() - -} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala ---------------------------------------------------------------------- diff --git a/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala b/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala deleted file mode 100644 index 1fed256..0000000 --- a/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.network.shuffle - -import java.io.File -import java.util.concurrent.ConcurrentMap - -import org.apache.hadoop.yarn.api.records.ApplicationId -import org.fusesource.leveldbjni.JniDBFactory -import org.iq80.leveldb.{DB, Options} - -import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.AppExecId -import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo - -/** - * just a cheat to get package-visible members in tests - */ -object ShuffleTestAccessor { - - def getBlockResolver(handler: ExternalShuffleBlockHandler): ExternalShuffleBlockResolver = { - handler.blockManager - } - - def getExecutorInfo( - appId: ApplicationId, - execId: String, - resolver: ExternalShuffleBlockResolver - ): Option[ExecutorShuffleInfo] = { - val id = new AppExecId(appId.toString, execId) - Option(resolver.executors.get(id)) - } - - def registeredExecutorFile(resolver: ExternalShuffleBlockResolver): File = { - resolver.registeredExecutorFile - } - - def shuffleServiceLevelDB(resolver: ExternalShuffleBlockResolver): DB = { - resolver.db - } - - def reloadRegisteredExecutors( - file: File): ConcurrentMap[ExternalShuffleBlockResolver.AppExecId, ExecutorShuffleInfo] = { - val options: Options = new Options - options.createIfMissing(true) - val factory = new JniDBFactory - val db = factory.open(file, options) - val result = ExternalShuffleBlockResolver.reloadRegisteredExecutors(db) - db.close() - result - } - - def reloadRegisteredExecutors( - db: DB): ConcurrentMap[ExternalShuffleBlockResolver.AppExecId, ExecutorShuffleInfo] = { - ExternalShuffleBlockResolver.reloadRegisteredExecutors(db) - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
