http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala ---------------------------------------------------------------------- diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala deleted file mode 100644 index 1d742fe..0000000 --- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ /dev/null @@ -1,524 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster.mesos - -import java.util.{List => JList} -import java.util.concurrent.CountDownLatch - -import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer -import scala.util.control.NonFatal - -import com.google.common.base.Splitter -import org.apache.mesos.{MesosSchedulerDriver, Protos, Scheduler, SchedulerDriver} -import org.apache.mesos.Protos.{TaskState => MesosTaskState, _} -import org.apache.mesos.Protos.FrameworkInfo.Capability -import org.apache.mesos.protobuf.{ByteString, GeneratedMessage} - -import org.apache.spark.{SparkConf, SparkContext, SparkException} -import org.apache.spark.TaskState -import org.apache.spark.internal.Logging -import org.apache.spark.internal.config._ -import org.apache.spark.util.Utils - - - -/** - * Shared trait for implementing a Mesos Scheduler. This holds common state and helper - * methods and Mesos scheduler will use. - */ -trait MesosSchedulerUtils extends Logging { - // Lock used to wait for scheduler to be registered - private final val registerLatch = new CountDownLatch(1) - - // Driver for talking to Mesos - protected var mesosDriver: SchedulerDriver = null - - /** - * Creates a new MesosSchedulerDriver that communicates to the Mesos master. - * - * @param masterUrl The url to connect to Mesos master - * @param scheduler the scheduler class to receive scheduler callbacks - * @param sparkUser User to impersonate with when running tasks - * @param appName The framework name to display on the Mesos UI - * @param conf Spark configuration - * @param webuiUrl The WebUI url to link from Mesos UI - * @param checkpoint Option to checkpoint tasks for failover - * @param failoverTimeout Duration Mesos master expect scheduler to reconnect on disconnect - * @param frameworkId The id of the new framework - */ - protected def createSchedulerDriver( - masterUrl: String, - scheduler: Scheduler, - sparkUser: String, - appName: String, - conf: SparkConf, - webuiUrl: Option[String] = None, - checkpoint: Option[Boolean] = None, - failoverTimeout: Option[Double] = None, - frameworkId: Option[String] = None): SchedulerDriver = { - val fwInfoBuilder = FrameworkInfo.newBuilder().setUser(sparkUser).setName(appName) - val credBuilder = Credential.newBuilder() - webuiUrl.foreach { url => fwInfoBuilder.setWebuiUrl(url) } - checkpoint.foreach { checkpoint => fwInfoBuilder.setCheckpoint(checkpoint) } - failoverTimeout.foreach { timeout => fwInfoBuilder.setFailoverTimeout(timeout) } - frameworkId.foreach { id => - fwInfoBuilder.setId(FrameworkID.newBuilder().setValue(id).build()) - } - fwInfoBuilder.setHostname(Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse( - conf.get(DRIVER_HOST_ADDRESS))) - conf.getOption("spark.mesos.principal").foreach { principal => - fwInfoBuilder.setPrincipal(principal) - credBuilder.setPrincipal(principal) - } - conf.getOption("spark.mesos.secret").foreach { secret => - credBuilder.setSecret(secret) - } - if (credBuilder.hasSecret && !fwInfoBuilder.hasPrincipal) { - throw new SparkException( - "spark.mesos.principal must be configured when spark.mesos.secret is set") - } - conf.getOption("spark.mesos.role").foreach { role => - fwInfoBuilder.setRole(role) - } - val maxGpus = conf.getInt("spark.mesos.gpus.max", 0) - if (maxGpus > 0) { - fwInfoBuilder.addCapabilities(Capability.newBuilder().setType(Capability.Type.GPU_RESOURCES)) - } - if (credBuilder.hasPrincipal) { - new MesosSchedulerDriver( - scheduler, fwInfoBuilder.build(), masterUrl, credBuilder.build()) - } else { - new MesosSchedulerDriver(scheduler, fwInfoBuilder.build(), masterUrl) - } - } - - /** - * Starts the MesosSchedulerDriver and stores the current running driver to this new instance. - * This driver is expected to not be running. - * This method returns only after the scheduler has registered with Mesos. - */ - def startScheduler(newDriver: SchedulerDriver): Unit = { - synchronized { - if (mesosDriver != null) { - registerLatch.await() - return - } - @volatile - var error: Option[Exception] = None - - // We create a new thread that will block inside `mesosDriver.run` - // until the scheduler exists - new Thread(Utils.getFormattedClassName(this) + "-mesos-driver") { - setDaemon(true) - override def run() { - try { - mesosDriver = newDriver - val ret = mesosDriver.run() - logInfo("driver.run() returned with code " + ret) - if (ret != null && ret.equals(Status.DRIVER_ABORTED)) { - error = Some(new SparkException("Error starting driver, DRIVER_ABORTED")) - markErr() - } - } catch { - case e: Exception => - logError("driver.run() failed", e) - error = Some(e) - markErr() - } - } - }.start() - - registerLatch.await() - - // propagate any error to the calling thread. This ensures that SparkContext creation fails - // without leaving a broken context that won't be able to schedule any tasks - error.foreach(throw _) - } - } - - def getResource(res: JList[Resource], name: String): Double = { - // A resource can have multiple values in the offer since it can either be from - // a specific role or wildcard. - res.asScala.filter(_.getName == name).map(_.getScalar.getValue).sum - } - - /** - * Transforms a range resource to a list of ranges - * - * @param res the mesos resource list - * @param name the name of the resource - * @return the list of ranges returned - */ - protected def getRangeResource(res: JList[Resource], name: String): List[(Long, Long)] = { - // A resource can have multiple values in the offer since it can either be from - // a specific role or wildcard. - res.asScala.filter(_.getName == name).flatMap(_.getRanges.getRangeList.asScala - .map(r => (r.getBegin, r.getEnd)).toList).toList - } - - /** - * Signal that the scheduler has registered with Mesos. - */ - protected def markRegistered(): Unit = { - registerLatch.countDown() - } - - protected def markErr(): Unit = { - registerLatch.countDown() - } - - def createResource(name: String, amount: Double, role: Option[String] = None): Resource = { - val builder = Resource.newBuilder() - .setName(name) - .setType(Value.Type.SCALAR) - .setScalar(Value.Scalar.newBuilder().setValue(amount).build()) - - role.foreach { r => builder.setRole(r) } - - builder.build() - } - - /** - * Partition the existing set of resources into two groups, those remaining to be - * scheduled and those requested to be used for a new task. - * - * @param resources The full list of available resources - * @param resourceName The name of the resource to take from the available resources - * @param amountToUse The amount of resources to take from the available resources - * @return The remaining resources list and the used resources list. - */ - def partitionResources( - resources: JList[Resource], - resourceName: String, - amountToUse: Double): (List[Resource], List[Resource]) = { - var remain = amountToUse - var requestedResources = new ArrayBuffer[Resource] - val remainingResources = resources.asScala.map { - case r => - if (remain > 0 && - r.getType == Value.Type.SCALAR && - r.getScalar.getValue > 0.0 && - r.getName == resourceName) { - val usage = Math.min(remain, r.getScalar.getValue) - requestedResources += createResource(resourceName, usage, Some(r.getRole)) - remain -= usage - createResource(resourceName, r.getScalar.getValue - usage, Some(r.getRole)) - } else { - r - } - } - - // Filter any resource that has depleted. - val filteredResources = - remainingResources.filter(r => r.getType != Value.Type.SCALAR || r.getScalar.getValue > 0.0) - - (filteredResources.toList, requestedResources.toList) - } - - /** Helper method to get the key,value-set pair for a Mesos Attribute protobuf */ - protected def getAttribute(attr: Attribute): (String, Set[String]) = { - (attr.getName, attr.getText.getValue.split(',').toSet) - } - - - /** Build a Mesos resource protobuf object */ - protected def createResource(resourceName: String, quantity: Double): Protos.Resource = { - Resource.newBuilder() - .setName(resourceName) - .setType(Value.Type.SCALAR) - .setScalar(Value.Scalar.newBuilder().setValue(quantity).build()) - .build() - } - - /** - * Converts the attributes from the resource offer into a Map of name -> Attribute Value - * The attribute values are the mesos attribute types and they are - * - * @param offerAttributes the attributes offered - * @return - */ - protected def toAttributeMap(offerAttributes: JList[Attribute]): Map[String, GeneratedMessage] = { - offerAttributes.asScala.map { attr => - val attrValue = attr.getType match { - case Value.Type.SCALAR => attr.getScalar - case Value.Type.RANGES => attr.getRanges - case Value.Type.SET => attr.getSet - case Value.Type.TEXT => attr.getText - } - (attr.getName, attrValue) - }.toMap - } - - - /** - * Match the requirements (if any) to the offer attributes. - * if attribute requirements are not specified - return true - * else if attribute is defined and no values are given, simple attribute presence is performed - * else if attribute name and value is specified, subset match is performed on slave attributes - */ - def matchesAttributeRequirements( - slaveOfferConstraints: Map[String, Set[String]], - offerAttributes: Map[String, GeneratedMessage]): Boolean = { - slaveOfferConstraints.forall { - // offer has the required attribute and subsumes the required values for that attribute - case (name, requiredValues) => - offerAttributes.get(name) match { - case None => false - case Some(_) if requiredValues.isEmpty => true // empty value matches presence - case Some(scalarValue: Value.Scalar) => - // check if provided values is less than equal to the offered values - requiredValues.map(_.toDouble).exists(_ <= scalarValue.getValue) - case Some(rangeValue: Value.Range) => - val offerRange = rangeValue.getBegin to rangeValue.getEnd - // Check if there is some required value that is between the ranges specified - // Note: We only support the ability to specify discrete values, in the future - // we may expand it to subsume ranges specified with a XX..YY value or something - // similar to that. - requiredValues.map(_.toLong).exists(offerRange.contains(_)) - case Some(offeredValue: Value.Set) => - // check if the specified required values is a subset of offered set - requiredValues.subsetOf(offeredValue.getItemList.asScala.toSet) - case Some(textValue: Value.Text) => - // check if the specified value is equal, if multiple values are specified - // we succeed if any of them match. - requiredValues.contains(textValue.getValue) - } - } - } - - /** - * Parses the attributes constraints provided to spark and build a matching data struct: - * Map[<attribute-name>, Set[values-to-match]] - * The constraints are specified as ';' separated key-value pairs where keys and values - * are separated by ':'. The ':' implies equality (for singular values) and "is one of" for - * multiple values (comma separated). For example: - * {{{ - * parseConstraintString("os:centos7;zone:us-east-1a,us-east-1b") - * // would result in - * <code> - * Map( - * "os" -> Set("centos7"), - * "zone": -> Set("us-east-1a", "us-east-1b") - * ) - * }}} - * - * Mesos documentation: http://mesos.apache.org/documentation/attributes-resources/ - * https://github.com/apache/mesos/blob/master/src/common/values.cpp - * https://github.com/apache/mesos/blob/master/src/common/attributes.cpp - * - * @param constraintsVal constaints string consisting of ';' separated key-value pairs (separated - * by ':') - * @return Map of constraints to match resources offers. - */ - def parseConstraintString(constraintsVal: String): Map[String, Set[String]] = { - /* - Based on mesos docs: - attributes : attribute ( ";" attribute )* - attribute : labelString ":" ( labelString | "," )+ - labelString : [a-zA-Z0-9_/.-] - */ - val splitter = Splitter.on(';').trimResults().withKeyValueSeparator(':') - // kv splitter - if (constraintsVal.isEmpty) { - Map() - } else { - try { - splitter.split(constraintsVal).asScala.toMap.mapValues(v => - if (v == null || v.isEmpty) { - Set[String]() - } else { - v.split(',').toSet - } - ) - } catch { - case NonFatal(e) => - throw new IllegalArgumentException(s"Bad constraint string: $constraintsVal", e) - } - } - } - - // These defaults copied from YARN - private val MEMORY_OVERHEAD_FRACTION = 0.10 - private val MEMORY_OVERHEAD_MINIMUM = 384 - - /** - * Return the amount of memory to allocate to each executor, taking into account - * container overheads. - * - * @param sc SparkContext to use to get `spark.mesos.executor.memoryOverhead` value - * @return memory requirement as (0.1 * <memoryOverhead>) or MEMORY_OVERHEAD_MINIMUM - * (whichever is larger) - */ - def executorMemory(sc: SparkContext): Int = { - sc.conf.getInt("spark.mesos.executor.memoryOverhead", - math.max(MEMORY_OVERHEAD_FRACTION * sc.executorMemory, MEMORY_OVERHEAD_MINIMUM).toInt) + - sc.executorMemory - } - - def setupUris(uris: String, - builder: CommandInfo.Builder, - useFetcherCache: Boolean = false): Unit = { - uris.split(",").foreach { uri => - builder.addUris(CommandInfo.URI.newBuilder().setValue(uri.trim()).setCache(useFetcherCache)) - } - } - - protected def getRejectOfferDurationForUnmetConstraints(sc: SparkContext): Long = { - sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s") - } - - protected def getRejectOfferDurationForReachedMaxCores(sc: SparkContext): Long = { - sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", "120s") - } - - /** - * Checks executor ports if they are within some range of the offered list of ports ranges, - * - * @param conf the Spark Config - * @param ports the list of ports to check - * @return true if ports are within range false otherwise - */ - protected def checkPorts(conf: SparkConf, ports: List[(Long, Long)]): Boolean = { - - def checkIfInRange(port: Long, ps: List[(Long, Long)]): Boolean = { - ps.exists{case (rangeStart, rangeEnd) => rangeStart <= port & rangeEnd >= port } - } - - val portsToCheck = nonZeroPortValuesFromConfig(conf) - val withinRange = portsToCheck.forall(p => checkIfInRange(p, ports)) - // make sure we have enough ports to allocate per offer - val enoughPorts = - ports.map{case (rangeStart, rangeEnd) => rangeEnd - rangeStart + 1}.sum >= portsToCheck.size - enoughPorts && withinRange - } - - /** - * Partitions port resources. - * - * @param requestedPorts non-zero ports to assign - * @param offeredResources the resources offered - * @return resources left, port resources to be used. - */ - def partitionPortResources(requestedPorts: List[Long], offeredResources: List[Resource]) - : (List[Resource], List[Resource]) = { - if (requestedPorts.isEmpty) { - (offeredResources, List[Resource]()) - } else { - // partition port offers - val (resourcesWithoutPorts, portResources) = filterPortResources(offeredResources) - - val portsAndRoles = requestedPorts. - map(x => (x, findPortAndGetAssignedRangeRole(x, portResources))) - - val assignedPortResources = createResourcesFromPorts(portsAndRoles) - - // ignore non-assigned port resources, they will be declined implicitly by mesos - // no need for splitting port resources. - (resourcesWithoutPorts, assignedPortResources) - } - } - - val managedPortNames = List("spark.executor.port", BLOCK_MANAGER_PORT.key) - - /** - * The values of the non-zero ports to be used by the executor process. - * @param conf the spark config to use - * @return the ono-zero values of the ports - */ - def nonZeroPortValuesFromConfig(conf: SparkConf): List[Long] = { - managedPortNames.map(conf.getLong(_, 0)).filter( _ != 0) - } - - /** Creates a mesos resource for a specific port number. */ - private def createResourcesFromPorts(portsAndRoles: List[(Long, String)]) : List[Resource] = { - portsAndRoles.flatMap{ case (port, role) => - createMesosPortResource(List((port, port)), Some(role))} - } - - /** Helper to create mesos resources for specific port ranges. */ - private def createMesosPortResource( - ranges: List[(Long, Long)], - role: Option[String] = None): List[Resource] = { - ranges.map { case (rangeStart, rangeEnd) => - val rangeValue = Value.Range.newBuilder() - .setBegin(rangeStart) - .setEnd(rangeEnd) - val builder = Resource.newBuilder() - .setName("ports") - .setType(Value.Type.RANGES) - .setRanges(Value.Ranges.newBuilder().addRange(rangeValue)) - role.foreach(r => builder.setRole(r)) - builder.build() - } - } - - /** - * Helper to assign a port to an offered range and get the latter's role - * info to use it later on. - */ - private def findPortAndGetAssignedRangeRole(port: Long, portResources: List[Resource]) - : String = { - - val ranges = portResources. - map(resource => - (resource.getRole, resource.getRanges.getRangeList.asScala - .map(r => (r.getBegin, r.getEnd)).toList)) - - val rangePortRole = ranges - .find { case (role, rangeList) => rangeList - .exists{ case (rangeStart, rangeEnd) => rangeStart <= port & rangeEnd >= port}} - // this is safe since we have previously checked about the ranges (see checkPorts method) - rangePortRole.map{ case (role, rangeList) => role}.get - } - - /** Retrieves the port resources from a list of mesos offered resources */ - private def filterPortResources(resources: List[Resource]): (List[Resource], List[Resource]) = { - resources.partition { r => !(r.getType == Value.Type.RANGES && r.getName == "ports") } - } - - /** - * spark.mesos.driver.frameworkId is set by the cluster dispatcher to correlate driver - * submissions with frameworkIDs. However, this causes issues when a driver process launches - * more than one framework (more than one SparkContext(, because they all try to register with - * the same frameworkID. To enforce that only the first driver registers with the configured - * framework ID, the driver calls this method after the first registration. - */ - def unsetFrameworkID(sc: SparkContext) { - sc.conf.remove("spark.mesos.driver.frameworkId") - System.clearProperty("spark.mesos.driver.frameworkId") - } - - def mesosToTaskState(state: MesosTaskState): TaskState.TaskState = state match { - case MesosTaskState.TASK_STAGING | MesosTaskState.TASK_STARTING => TaskState.LAUNCHING - case MesosTaskState.TASK_RUNNING | MesosTaskState.TASK_KILLING => TaskState.RUNNING - case MesosTaskState.TASK_FINISHED => TaskState.FINISHED - case MesosTaskState.TASK_FAILED => TaskState.FAILED - case MesosTaskState.TASK_KILLED => TaskState.KILLED - case MesosTaskState.TASK_LOST | MesosTaskState.TASK_ERROR => TaskState.LOST - } - - def taskStateToMesos(state: TaskState.TaskState): MesosTaskState = state match { - case TaskState.LAUNCHING => MesosTaskState.TASK_STARTING - case TaskState.RUNNING => MesosTaskState.TASK_RUNNING - case TaskState.FINISHED => MesosTaskState.TASK_FINISHED - case TaskState.FAILED => MesosTaskState.TASK_FAILED - case TaskState.KILLED => MesosTaskState.TASK_KILLED - case TaskState.LOST => MesosTaskState.TASK_LOST - } -}
http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala ---------------------------------------------------------------------- diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala deleted file mode 100644 index 8370b61..0000000 --- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster.mesos - -import java.nio.ByteBuffer - -import org.apache.mesos.protobuf.ByteString - -import org.apache.spark.internal.Logging - -/** - * Wrapper for serializing the data sent when launching Mesos tasks. - */ -private[spark] case class MesosTaskLaunchData( - serializedTask: ByteBuffer, - attemptNumber: Int) extends Logging { - - def toByteString: ByteString = { - val dataBuffer = ByteBuffer.allocate(4 + serializedTask.limit) - dataBuffer.putInt(attemptNumber) - dataBuffer.put(serializedTask) - dataBuffer.rewind - logDebug(s"ByteBuffer size: [${dataBuffer.remaining}]") - ByteString.copyFrom(dataBuffer) - } -} - -private[spark] object MesosTaskLaunchData extends Logging { - def fromByteString(byteString: ByteString): MesosTaskLaunchData = { - val byteBuffer = byteString.asReadOnlyByteBuffer() - logDebug(s"ByteBuffer size: [${byteBuffer.remaining}]") - val attemptNumber = byteBuffer.getInt // updates the position by 4 bytes - val serializedTask = byteBuffer.slice() // subsequence starting at the current position - MesosTaskLaunchData(serializedTask, attemptNumber) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/mesos/src/test/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArgumentsSuite.scala ---------------------------------------------------------------------- diff --git a/mesos/src/test/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArgumentsSuite.scala b/mesos/src/test/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArgumentsSuite.scala deleted file mode 100644 index 33e7d69..0000000 --- a/mesos/src/test/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArgumentsSuite.scala +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.mesos - -import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.TestPrematureExit - -class MesosClusterDispatcherArgumentsSuite extends SparkFunSuite - with TestPrematureExit { - - test("test if spark config args are passed sucessfully") { - val args = Array[String]("--master", "mesos://localhost:5050", "--conf", "key1=value1", - "--conf", "spark.mesos.key2=value2", "--verbose") - val conf = new SparkConf() - new MesosClusterDispatcherArguments(args, conf) - - assert(conf.getOption("key1").isEmpty) - assert(conf.get("spark.mesos.key2") == "value2") - } - - test("test non conf settings") { - val masterUrl = "mesos://localhost:5050" - val port = "1212" - val zookeeperUrl = "zk://localhost:2181" - val host = "localhost" - val webUiPort = "2323" - val name = "myFramework" - - val args1 = Array("--master", masterUrl, "--verbose", "--name", name) - val args2 = Array("-p", port, "-h", host, "-z", zookeeperUrl) - val args3 = Array("--webui-port", webUiPort) - - val args = args1 ++ args2 ++ args3 - val conf = new SparkConf() - val mesosDispClusterArgs = new MesosClusterDispatcherArguments(args, conf) - - assert(mesosDispClusterArgs.verbose) - assert(mesosDispClusterArgs.confProperties.isEmpty) - assert(mesosDispClusterArgs.host == host) - assert(Option(mesosDispClusterArgs.masterUrl).isDefined) - assert(mesosDispClusterArgs.masterUrl == masterUrl.stripPrefix("mesos://")) - assert(Option(mesosDispClusterArgs.zookeeperUrl).isDefined) - assert(mesosDispClusterArgs.zookeeperUrl == Some(zookeeperUrl)) - assert(mesosDispClusterArgs.name == name) - assert(mesosDispClusterArgs.webUiPort == webUiPort.toInt) - assert(mesosDispClusterArgs.port == port.toInt) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/mesos/src/test/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherSuite.scala ---------------------------------------------------------------------- diff --git a/mesos/src/test/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherSuite.scala b/mesos/src/test/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherSuite.scala deleted file mode 100644 index 7484e3b..0000000 --- a/mesos/src/test/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherSuite.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.mesos - -import org.apache.spark.SparkFunSuite -import org.apache.spark.deploy.TestPrematureExit - -class MesosClusterDispatcherSuite extends SparkFunSuite - with TestPrematureExit{ - - test("prints usage on empty input") { - testPrematureExit(Array[String](), - "Usage: MesosClusterDispatcher", MesosClusterDispatcher) - } - - test("prints usage with only --help") { - testPrematureExit(Array("--help"), - "Usage: MesosClusterDispatcher", MesosClusterDispatcher) - } - - test("prints error with unrecognized options") { - testPrematureExit(Array("--blarg"), "Unrecognized option: '--blarg'", MesosClusterDispatcher) - testPrematureExit(Array("-bleg"), "Unrecognized option: '-bleg'", MesosClusterDispatcher) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManagerSuite.scala ---------------------------------------------------------------------- diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManagerSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManagerSuite.scala deleted file mode 100644 index a558554..0000000 --- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManagerSuite.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster.mesos - -import org.apache.spark._ -import org.apache.spark.internal.config._ - -class MesosClusterManagerSuite extends SparkFunSuite with LocalSparkContext { - def testURL(masterURL: String, expectedClass: Class[_], coarse: Boolean) { - val conf = new SparkConf().set("spark.mesos.coarse", coarse.toString) - sc = new SparkContext("local", "test", conf) - val clusterManager = new MesosClusterManager() - - assert(clusterManager.canCreate(masterURL)) - val taskScheduler = clusterManager.createTaskScheduler(sc, masterURL) - val sched = clusterManager.createSchedulerBackend(sc, masterURL, taskScheduler) - assert(sched.getClass === expectedClass) - } - - test("mesos fine-grained") { - testURL("mesos://localhost:1234", classOf[MesosFineGrainedSchedulerBackend], coarse = false) - } - - test("mesos coarse-grained") { - testURL("mesos://localhost:1234", classOf[MesosCoarseGrainedSchedulerBackend], coarse = true) - } - - test("mesos with zookeeper") { - testURL("mesos://zk://localhost:1234,localhost:2345", - classOf[MesosFineGrainedSchedulerBackend], - coarse = false) - } - - test("mesos with i/o encryption throws error") { - val se = intercept[SparkException] { - val conf = new SparkConf().setAppName("test").set(IO_ENCRYPTION_ENABLED, true) - sc = new SparkContext("mesos", "test", conf) - } - assert(se.getCause().isInstanceOf[IllegalArgumentException]) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala ---------------------------------------------------------------------- diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala deleted file mode 100644 index 74e5ce2..0000000 --- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala +++ /dev/null @@ -1,239 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster.mesos - -import java.util.{Collection, Collections, Date} - -import scala.collection.JavaConverters._ - -import org.apache.mesos.Protos._ -import org.apache.mesos.Protos.Value.{Scalar, Type} -import org.apache.mesos.SchedulerDriver -import org.mockito.{ArgumentCaptor, Matchers} -import org.mockito.Mockito._ -import org.scalatest.mock.MockitoSugar - -import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite} -import org.apache.spark.deploy.Command -import org.apache.spark.deploy.mesos.MesosDriverDescription - -class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar { - - private val command = new Command("mainClass", Seq("arg"), Map(), Seq(), Seq(), Seq()) - private var driver: SchedulerDriver = _ - private var scheduler: MesosClusterScheduler = _ - - private def setScheduler(sparkConfVars: Map[String, String] = null): Unit = { - val conf = new SparkConf() - conf.setMaster("mesos://localhost:5050") - conf.setAppName("spark mesos") - - if (sparkConfVars != null) { - conf.setAll(sparkConfVars) - } - - driver = mock[SchedulerDriver] - scheduler = new MesosClusterScheduler( - new BlackHoleMesosClusterPersistenceEngineFactory, conf) { - override def start(): Unit = { ready = true } - } - scheduler.start() - } - - test("can queue drivers") { - setScheduler() - - val response = scheduler.submitDriver( - new MesosDriverDescription("d1", "jar", 1000, 1, true, - command, Map[String, String](), "s1", new Date())) - assert(response.success) - val response2 = - scheduler.submitDriver(new MesosDriverDescription( - "d1", "jar", 1000, 1, true, command, Map[String, String](), "s2", new Date())) - assert(response2.success) - val state = scheduler.getSchedulerState() - val queuedDrivers = state.queuedDrivers.toList - assert(queuedDrivers(0).submissionId == response.submissionId) - assert(queuedDrivers(1).submissionId == response2.submissionId) - } - - test("can kill queued drivers") { - setScheduler() - - val response = scheduler.submitDriver( - new MesosDriverDescription("d1", "jar", 1000, 1, true, - command, Map[String, String](), "s1", new Date())) - assert(response.success) - val killResponse = scheduler.killDriver(response.submissionId) - assert(killResponse.success) - val state = scheduler.getSchedulerState() - assert(state.queuedDrivers.isEmpty) - } - - test("can handle multiple roles") { - setScheduler() - - val driver = mock[SchedulerDriver] - val response = scheduler.submitDriver( - new MesosDriverDescription("d1", "jar", 1200, 1.5, true, - command, - Map(("spark.mesos.executor.home", "test"), ("spark.app.name", "test")), - "s1", - new Date())) - assert(response.success) - val offer = Offer.newBuilder() - .addResources( - Resource.newBuilder().setRole("*") - .setScalar(Scalar.newBuilder().setValue(1).build()).setName("cpus").setType(Type.SCALAR)) - .addResources( - Resource.newBuilder().setRole("*") - .setScalar(Scalar.newBuilder().setValue(1000).build()) - .setName("mem") - .setType(Type.SCALAR)) - .addResources( - Resource.newBuilder().setRole("role2") - .setScalar(Scalar.newBuilder().setValue(1).build()).setName("cpus").setType(Type.SCALAR)) - .addResources( - Resource.newBuilder().setRole("role2") - .setScalar(Scalar.newBuilder().setValue(500).build()).setName("mem").setType(Type.SCALAR)) - .setId(OfferID.newBuilder().setValue("o1").build()) - .setFrameworkId(FrameworkID.newBuilder().setValue("f1").build()) - .setSlaveId(SlaveID.newBuilder().setValue("s1").build()) - .setHostname("host1") - .build() - - val capture = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]]) - - when( - driver.launchTasks( - Matchers.eq(Collections.singleton(offer.getId)), - capture.capture()) - ).thenReturn(Status.valueOf(1)) - - scheduler.resourceOffers(driver, Collections.singletonList(offer)) - - val taskInfos = capture.getValue - assert(taskInfos.size() == 1) - val taskInfo = taskInfos.iterator().next() - val resources = taskInfo.getResourcesList - assert(scheduler.getResource(resources, "cpus") == 1.5) - assert(scheduler.getResource(resources, "mem") == 1200) - val resourcesSeq: Seq[Resource] = resources.asScala - val cpus = resourcesSeq.filter(_.getName.equals("cpus")).toList - assert(cpus.size == 2) - assert(cpus.exists(_.getRole().equals("role2"))) - assert(cpus.exists(_.getRole().equals("*"))) - val mem = resourcesSeq.filter(_.getName.equals("mem")).toList - assert(mem.size == 2) - assert(mem.exists(_.getRole().equals("role2"))) - assert(mem.exists(_.getRole().equals("*"))) - - verify(driver, times(1)).launchTasks( - Matchers.eq(Collections.singleton(offer.getId)), - capture.capture() - ) - } - - test("escapes commandline args for the shell") { - setScheduler() - - val conf = new SparkConf() - conf.setMaster("mesos://localhost:5050") - conf.setAppName("spark mesos") - val scheduler = new MesosClusterScheduler( - new BlackHoleMesosClusterPersistenceEngineFactory, conf) { - override def start(): Unit = { ready = true } - } - val escape = scheduler.shellEscape _ - def wrapped(str: String): String = "\"" + str + "\"" - - // Wrapped in quotes - assert(escape("'should be left untouched'") === "'should be left untouched'") - assert(escape("\"should be left untouched\"") === "\"should be left untouched\"") - - // Harmless - assert(escape("") === "") - assert(escape("harmless") === "harmless") - assert(escape("har-m.l3ss") === "har-m.l3ss") - - // Special Chars escape - assert(escape("should escape this \" quote") === wrapped("should escape this \\\" quote")) - assert(escape("shouldescape\"quote") === wrapped("shouldescape\\\"quote")) - assert(escape("should escape this $ dollar") === wrapped("should escape this \\$ dollar")) - assert(escape("should escape this ` backtick") === wrapped("should escape this \\` backtick")) - assert(escape("""should escape this \ backslash""") - === wrapped("""should escape this \\ backslash""")) - assert(escape("""\"?""") === wrapped("""\\\"?""")) - - - // Special Chars no escape only wrap - List(" ", "'", "<", ">", "&", "|", "?", "*", ";", "!", "#", "(", ")").foreach(char => { - assert(escape(s"onlywrap${char}this") === wrapped(s"onlywrap${char}this")) - }) - } - - test("supports spark.mesos.driverEnv.*") { - setScheduler() - - val mem = 1000 - val cpu = 1 - - val response = scheduler.submitDriver( - new MesosDriverDescription("d1", "jar", mem, cpu, true, - command, - Map("spark.mesos.executor.home" -> "test", - "spark.app.name" -> "test", - "spark.mesos.driverEnv.TEST_ENV" -> "TEST_VAL"), - "s1", - new Date())) - assert(response.success) - - val offer = Utils.createOffer("o1", "s1", mem, cpu) - scheduler.resourceOffers(driver, List(offer).asJava) - val tasks = Utils.verifyTaskLaunched(driver, "o1") - val env = tasks.head.getCommand.getEnvironment.getVariablesList.asScala.map(v => - (v.getName, v.getValue)).toMap - assert(env.getOrElse("TEST_ENV", null) == "TEST_VAL") - } - - test("supports spark.mesos.network.name") { - setScheduler() - - val mem = 1000 - val cpu = 1 - - val response = scheduler.submitDriver( - new MesosDriverDescription("d1", "jar", mem, cpu, true, - command, - Map("spark.mesos.executor.home" -> "test", - "spark.app.name" -> "test", - "spark.mesos.network.name" -> "test-network-name"), - "s1", - new Date())) - - assert(response.success) - - val offer = Utils.createOffer("o1", "s1", mem, cpu) - scheduler.resourceOffers(driver, List(offer).asJava) - - val launchedTasks = Utils.verifyTaskLaunched(driver, "o1") - val networkInfos = launchedTasks.head.getContainer.getNetworkInfosList - assert(networkInfos.size == 1) - assert(networkInfos.get(0).getName == "test-network-name") - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala ---------------------------------------------------------------------- diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala deleted file mode 100644 index a674da4..0000000 --- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ /dev/null @@ -1,601 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster.mesos - -import java.util.concurrent.TimeUnit - -import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer -import scala.concurrent.duration._ -import scala.concurrent.Promise -import scala.reflect.ClassTag - -import org.apache.mesos.{Protos, Scheduler, SchedulerDriver} -import org.apache.mesos.Protos._ -import org.mockito.Matchers -import org.mockito.Matchers._ -import org.mockito.Mockito._ -import org.scalatest.concurrent.ScalaFutures -import org.scalatest.mock.MockitoSugar -import org.scalatest.BeforeAndAfter - -import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite} -import org.apache.spark.internal.config._ -import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient -import org.apache.spark.rpc.RpcEndpointRef -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor -import org.apache.spark.scheduler.TaskSchedulerImpl -import org.apache.spark.scheduler.cluster.mesos.Utils._ - -class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite - with LocalSparkContext - with MockitoSugar - with BeforeAndAfter - with ScalaFutures { - - private var sparkConf: SparkConf = _ - private var driver: SchedulerDriver = _ - private var taskScheduler: TaskSchedulerImpl = _ - private var backend: MesosCoarseGrainedSchedulerBackend = _ - private var externalShuffleClient: MesosExternalShuffleClient = _ - private var driverEndpoint: RpcEndpointRef = _ - @volatile private var stopCalled = false - - // All 'requests' to the scheduler run immediately on the same thread, so - // demand that all futures have their value available immediately. - implicit override val patienceConfig = PatienceConfig(timeout = Duration(0, TimeUnit.SECONDS)) - - test("mesos supports killing and limiting executors") { - setBackend() - sparkConf.set("spark.driver.host", "driverHost") - sparkConf.set("spark.driver.port", "1234") - - val minMem = backend.executorMemory(sc) - val minCpu = 4 - val offers = List(Resources(minMem, minCpu)) - - // launches a task on a valid offer - offerResources(offers) - verifyTaskLaunched(driver, "o1") - - // kills executors - assert(backend.doRequestTotalExecutors(0).futureValue) - assert(backend.doKillExecutors(Seq("0")).futureValue) - val taskID0 = createTaskId("0") - verify(driver, times(1)).killTask(taskID0) - - // doesn't launch a new task when requested executors == 0 - offerResources(offers, 2) - verifyDeclinedOffer(driver, createOfferId("o2")) - - // Launches a new task when requested executors is positive - backend.doRequestTotalExecutors(2) - offerResources(offers, 2) - verifyTaskLaunched(driver, "o2") - } - - test("mesos supports killing and relaunching tasks with executors") { - setBackend() - - // launches a task on a valid offer - val minMem = backend.executorMemory(sc) + 1024 - val minCpu = 4 - val offer1 = Resources(minMem, minCpu) - val offer2 = Resources(minMem, 1) - offerResources(List(offer1, offer2)) - verifyTaskLaunched(driver, "o1") - - // accounts for a killed task - val status = createTaskStatus("0", "s1", TaskState.TASK_KILLED) - backend.statusUpdate(driver, status) - verify(driver, times(1)).reviveOffers() - - // Launches a new task on a valid offer from the same slave - offerResources(List(offer2)) - verifyTaskLaunched(driver, "o2") - } - - test("mesos supports spark.executor.cores") { - val executorCores = 4 - setBackend(Map("spark.executor.cores" -> executorCores.toString)) - - val executorMemory = backend.executorMemory(sc) - val offers = List(Resources(executorMemory * 2, executorCores + 1)) - offerResources(offers) - - val taskInfos = verifyTaskLaunched(driver, "o1") - assert(taskInfos.length == 1) - - val cpus = backend.getResource(taskInfos.head.getResourcesList, "cpus") - assert(cpus == executorCores) - } - - test("mesos supports unset spark.executor.cores") { - setBackend() - - val executorMemory = backend.executorMemory(sc) - val offerCores = 10 - offerResources(List(Resources(executorMemory * 2, offerCores))) - - val taskInfos = verifyTaskLaunched(driver, "o1") - assert(taskInfos.length == 1) - - val cpus = backend.getResource(taskInfos.head.getResourcesList, "cpus") - assert(cpus == offerCores) - } - - test("mesos does not acquire more than spark.cores.max") { - val maxCores = 10 - setBackend(Map("spark.cores.max" -> maxCores.toString)) - - val executorMemory = backend.executorMemory(sc) - offerResources(List(Resources(executorMemory, maxCores + 1))) - - val taskInfos = verifyTaskLaunched(driver, "o1") - assert(taskInfos.length == 1) - - val cpus = backend.getResource(taskInfos.head.getResourcesList, "cpus") - assert(cpus == maxCores) - } - - test("mesos does not acquire gpus if not specified") { - setBackend() - - val executorMemory = backend.executorMemory(sc) - offerResources(List(Resources(executorMemory, 1, 1))) - - val taskInfos = verifyTaskLaunched(driver, "o1") - assert(taskInfos.length == 1) - - val gpus = backend.getResource(taskInfos.head.getResourcesList, "gpus") - assert(gpus == 0.0) - } - - - test("mesos does not acquire more than spark.mesos.gpus.max") { - val maxGpus = 5 - setBackend(Map("spark.mesos.gpus.max" -> maxGpus.toString)) - - val executorMemory = backend.executorMemory(sc) - offerResources(List(Resources(executorMemory, 1, maxGpus + 1))) - - val taskInfos = verifyTaskLaunched(driver, "o1") - assert(taskInfos.length == 1) - - val gpus = backend.getResource(taskInfos.head.getResourcesList, "gpus") - assert(gpus == maxGpus) - } - - - test("mesos declines offers that violate attribute constraints") { - setBackend(Map("spark.mesos.constraints" -> "x:true")) - offerResources(List(Resources(backend.executorMemory(sc), 4))) - verifyDeclinedOffer(driver, createOfferId("o1"), true) - } - - test("mesos declines offers with a filter when reached spark.cores.max") { - val maxCores = 3 - setBackend(Map("spark.cores.max" -> maxCores.toString)) - - val executorMemory = backend.executorMemory(sc) - offerResources(List( - Resources(executorMemory, maxCores + 1), - Resources(executorMemory, maxCores + 1))) - - verifyTaskLaunched(driver, "o1") - verifyDeclinedOffer(driver, createOfferId("o2"), true) - } - - test("mesos assigns tasks round-robin on offers") { - val executorCores = 4 - val maxCores = executorCores * 2 - setBackend(Map("spark.executor.cores" -> executorCores.toString, - "spark.cores.max" -> maxCores.toString)) - - val executorMemory = backend.executorMemory(sc) - offerResources(List( - Resources(executorMemory * 2, executorCores * 2), - Resources(executorMemory * 2, executorCores * 2))) - - verifyTaskLaunched(driver, "o1") - verifyTaskLaunched(driver, "o2") - } - - test("mesos creates multiple executors on a single slave") { - val executorCores = 4 - setBackend(Map("spark.executor.cores" -> executorCores.toString)) - - // offer with room for two executors - val executorMemory = backend.executorMemory(sc) - offerResources(List(Resources(executorMemory * 2, executorCores * 2))) - - // verify two executors were started on a single offer - val taskInfos = verifyTaskLaunched(driver, "o1") - assert(taskInfos.length == 2) - } - - test("mesos doesn't register twice with the same shuffle service") { - setBackend(Map("spark.shuffle.service.enabled" -> "true")) - val (mem, cpu) = (backend.executorMemory(sc), 4) - - val offer1 = createOffer("o1", "s1", mem, cpu) - backend.resourceOffers(driver, List(offer1).asJava) - verifyTaskLaunched(driver, "o1") - - val offer2 = createOffer("o2", "s1", mem, cpu) - backend.resourceOffers(driver, List(offer2).asJava) - verifyTaskLaunched(driver, "o2") - - val status1 = createTaskStatus("0", "s1", TaskState.TASK_RUNNING) - backend.statusUpdate(driver, status1) - - val status2 = createTaskStatus("1", "s1", TaskState.TASK_RUNNING) - backend.statusUpdate(driver, status2) - verify(externalShuffleClient, times(1)) - .registerDriverWithShuffleService(anyString, anyInt, anyLong, anyLong) - } - - test("Port offer decline when there is no appropriate range") { - setBackend(Map(BLOCK_MANAGER_PORT.key -> "30100")) - val offeredPorts = (31100L, 31200L) - val (mem, cpu) = (backend.executorMemory(sc), 4) - - val offer1 = createOffer("o1", "s1", mem, cpu, Some(offeredPorts)) - backend.resourceOffers(driver, List(offer1).asJava) - verify(driver, times(1)).declineOffer(offer1.getId) - } - - test("Port offer accepted when ephemeral ports are used") { - setBackend() - val offeredPorts = (31100L, 31200L) - val (mem, cpu) = (backend.executorMemory(sc), 4) - - val offer1 = createOffer("o1", "s1", mem, cpu, Some(offeredPorts)) - backend.resourceOffers(driver, List(offer1).asJava) - verifyTaskLaunched(driver, "o1") - } - - test("Port offer accepted with user defined port numbers") { - val port = 30100 - setBackend(Map(BLOCK_MANAGER_PORT.key -> s"$port")) - val offeredPorts = (30000L, 31000L) - val (mem, cpu) = (backend.executorMemory(sc), 4) - - val offer1 = createOffer("o1", "s1", mem, cpu, Some(offeredPorts)) - backend.resourceOffers(driver, List(offer1).asJava) - val taskInfo = verifyTaskLaunched(driver, "o1") - - val taskPortResources = taskInfo.head.getResourcesList.asScala. - find(r => r.getType == Value.Type.RANGES && r.getName == "ports") - - val isPortInOffer = (r: Resource) => { - r.getRanges().getRangeList - .asScala.exists(range => range.getBegin == port && range.getEnd == port) - } - assert(taskPortResources.exists(isPortInOffer)) - } - - test("mesos kills an executor when told") { - setBackend() - - val (mem, cpu) = (backend.executorMemory(sc), 4) - - val offer1 = createOffer("o1", "s1", mem, cpu) - backend.resourceOffers(driver, List(offer1).asJava) - verifyTaskLaunched(driver, "o1") - - backend.doKillExecutors(List("0")) - verify(driver, times(1)).killTask(createTaskId("0")) - } - - test("weburi is set in created scheduler driver") { - setBackend() - val taskScheduler = mock[TaskSchedulerImpl] - when(taskScheduler.sc).thenReturn(sc) - val driver = mock[SchedulerDriver] - when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING) - val securityManager = mock[SecurityManager] - - val backend = new MesosCoarseGrainedSchedulerBackend( - taskScheduler, sc, "master", securityManager) { - override protected def createSchedulerDriver( - masterUrl: String, - scheduler: Scheduler, - sparkUser: String, - appName: String, - conf: SparkConf, - webuiUrl: Option[String] = None, - checkpoint: Option[Boolean] = None, - failoverTimeout: Option[Double] = None, - frameworkId: Option[String] = None): SchedulerDriver = { - markRegistered() - assert(webuiUrl.isDefined) - assert(webuiUrl.get.equals("http://webui")) - driver - } - } - - backend.start() - } - - test("honors unset spark.mesos.containerizer") { - setBackend(Map("spark.mesos.executor.docker.image" -> "test")) - - val (mem, cpu) = (backend.executorMemory(sc), 4) - - val offer1 = createOffer("o1", "s1", mem, cpu) - backend.resourceOffers(driver, List(offer1).asJava) - - val taskInfos = verifyTaskLaunched(driver, "o1") - assert(taskInfos.head.getContainer.getType == ContainerInfo.Type.DOCKER) - } - - test("honors spark.mesos.containerizer=\"mesos\"") { - setBackend(Map( - "spark.mesos.executor.docker.image" -> "test", - "spark.mesos.containerizer" -> "mesos")) - - val (mem, cpu) = (backend.executorMemory(sc), 4) - - val offer1 = createOffer("o1", "s1", mem, cpu) - backend.resourceOffers(driver, List(offer1).asJava) - - val taskInfos = verifyTaskLaunched(driver, "o1") - assert(taskInfos.head.getContainer.getType == ContainerInfo.Type.MESOS) - } - - test("docker settings are reflected in created tasks") { - setBackend(Map( - "spark.mesos.executor.docker.image" -> "some_image", - "spark.mesos.executor.docker.forcePullImage" -> "true", - "spark.mesos.executor.docker.volumes" -> "/host_vol:/container_vol:ro", - "spark.mesos.executor.docker.portmaps" -> "8080:80:tcp" - )) - - val (mem, cpu) = (backend.executorMemory(sc), 4) - - val offer1 = createOffer("o1", "s1", mem, cpu) - backend.resourceOffers(driver, List(offer1).asJava) - - val launchedTasks = verifyTaskLaunched(driver, "o1") - assert(launchedTasks.size == 1) - - val containerInfo = launchedTasks.head.getContainer - assert(containerInfo.getType == ContainerInfo.Type.DOCKER) - - val volumes = containerInfo.getVolumesList.asScala - assert(volumes.size == 1) - - val volume = volumes.head - assert(volume.getHostPath == "/host_vol") - assert(volume.getContainerPath == "/container_vol") - assert(volume.getMode == Volume.Mode.RO) - - val dockerInfo = containerInfo.getDocker - - val portMappings = dockerInfo.getPortMappingsList.asScala - assert(portMappings.size == 1) - - val portMapping = portMappings.head - assert(portMapping.getHostPort == 8080) - assert(portMapping.getContainerPort == 80) - assert(portMapping.getProtocol == "tcp") - } - - test("force-pull-image option is disabled by default") { - setBackend(Map( - "spark.mesos.executor.docker.image" -> "some_image" - )) - - val (mem, cpu) = (backend.executorMemory(sc), 4) - - val offer1 = createOffer("o1", "s1", mem, cpu) - backend.resourceOffers(driver, List(offer1).asJava) - - val launchedTasks = verifyTaskLaunched(driver, "o1") - assert(launchedTasks.size == 1) - - val containerInfo = launchedTasks.head.getContainer - assert(containerInfo.getType == ContainerInfo.Type.DOCKER) - - val dockerInfo = containerInfo.getDocker - - assert(dockerInfo.getImage == "some_image") - assert(!dockerInfo.getForcePullImage) - } - - test("Do not call removeExecutor() after backend is stopped") { - setBackend() - - // launches a task on a valid offer - val offers = List(Resources(backend.executorMemory(sc), 1)) - offerResources(offers) - verifyTaskLaunched(driver, "o1") - - // launches a thread simulating status update - val statusUpdateThread = new Thread { - override def run(): Unit = { - while (!stopCalled) { - Thread.sleep(100) - } - - val status = createTaskStatus("0", "s1", TaskState.TASK_FINISHED) - backend.statusUpdate(driver, status) - } - }.start - - backend.stop() - // Any method of the backend involving sending messages to the driver endpoint should not - // be called after the backend is stopped. - verify(driverEndpoint, never()).askWithRetry(isA(classOf[RemoveExecutor]))(any[ClassTag[_]]) - } - - test("mesos supports spark.executor.uri") { - val url = "spark.spark.spark.com" - setBackend(Map( - "spark.executor.uri" -> url - ), false) - - val (mem, cpu) = (backend.executorMemory(sc), 4) - - val offer1 = createOffer("o1", "s1", mem, cpu) - backend.resourceOffers(driver, List(offer1).asJava) - - val launchedTasks = verifyTaskLaunched(driver, "o1") - assert(launchedTasks.head.getCommand.getUrisList.asScala(0).getValue == url) - } - - test("mesos supports setting fetcher cache") { - val url = "spark.spark.spark.com" - setBackend(Map( - "spark.mesos.fetcherCache.enable" -> "true", - "spark.executor.uri" -> url - ), false) - val offers = List(Resources(backend.executorMemory(sc), 1)) - offerResources(offers) - val launchedTasks = verifyTaskLaunched(driver, "o1") - val uris = launchedTasks.head.getCommand.getUrisList - assert(uris.size() == 1) - assert(uris.asScala.head.getCache) - } - - test("mesos supports disabling fetcher cache") { - val url = "spark.spark.spark.com" - setBackend(Map( - "spark.mesos.fetcherCache.enable" -> "false", - "spark.executor.uri" -> url - ), false) - val offers = List(Resources(backend.executorMemory(sc), 1)) - offerResources(offers) - val launchedTasks = verifyTaskLaunched(driver, "o1") - val uris = launchedTasks.head.getCommand.getUrisList - assert(uris.size() == 1) - assert(!uris.asScala.head.getCache) - } - - test("mesos supports spark.mesos.network.name") { - setBackend(Map( - "spark.mesos.network.name" -> "test-network-name" - )) - - val (mem, cpu) = (backend.executorMemory(sc), 4) - - val offer1 = createOffer("o1", "s1", mem, cpu) - backend.resourceOffers(driver, List(offer1).asJava) - - val launchedTasks = verifyTaskLaunched(driver, "o1") - val networkInfos = launchedTasks.head.getContainer.getNetworkInfosList - assert(networkInfos.size == 1) - assert(networkInfos.get(0).getName == "test-network-name") - } - - private case class Resources(mem: Int, cpus: Int, gpus: Int = 0) - - private def verifyDeclinedOffer(driver: SchedulerDriver, - offerId: OfferID, - filter: Boolean = false): Unit = { - if (filter) { - verify(driver, times(1)).declineOffer(Matchers.eq(offerId), anyObject[Filters]) - } else { - verify(driver, times(1)).declineOffer(Matchers.eq(offerId)) - } - } - - private def offerResources(offers: List[Resources], startId: Int = 1): Unit = { - val mesosOffers = offers.zipWithIndex.map {case (offer, i) => - createOffer(s"o${i + startId}", s"s${i + startId}", offer.mem, offer.cpus, None, offer.gpus)} - - backend.resourceOffers(driver, mesosOffers.asJava) - } - - private def createTaskStatus(taskId: String, slaveId: String, state: TaskState): TaskStatus = { - TaskStatus.newBuilder() - .setTaskId(TaskID.newBuilder().setValue(taskId).build()) - .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build()) - .setState(state) - .build - } - - private def createSchedulerBackend( - taskScheduler: TaskSchedulerImpl, - driver: SchedulerDriver, - shuffleClient: MesosExternalShuffleClient, - endpoint: RpcEndpointRef): MesosCoarseGrainedSchedulerBackend = { - val securityManager = mock[SecurityManager] - - val backend = new MesosCoarseGrainedSchedulerBackend( - taskScheduler, sc, "master", securityManager) { - override protected def createSchedulerDriver( - masterUrl: String, - scheduler: Scheduler, - sparkUser: String, - appName: String, - conf: SparkConf, - webuiUrl: Option[String] = None, - checkpoint: Option[Boolean] = None, - failoverTimeout: Option[Double] = None, - frameworkId: Option[String] = None): SchedulerDriver = driver - - override protected def getShuffleClient(): MesosExternalShuffleClient = shuffleClient - - override protected def createDriverEndpointRef( - properties: ArrayBuffer[(String, String)]): RpcEndpointRef = endpoint - - // override to avoid race condition with the driver thread on `mesosDriver` - override def startScheduler(newDriver: SchedulerDriver): Unit = { - mesosDriver = newDriver - } - - override def stopExecutors(): Unit = { - stopCalled = true - } - - markRegistered() - } - backend.start() - backend - } - - private def setBackend(sparkConfVars: Map[String, String] = null, - setHome: Boolean = true) { - sparkConf = (new SparkConf) - .setMaster("local[*]") - .setAppName("test-mesos-dynamic-alloc") - .set("spark.mesos.driver.webui.url", "http://webui") - - if (setHome) { - sparkConf.setSparkHome("/path") - } - - if (sparkConfVars != null) { - sparkConf.setAll(sparkConfVars) - } - - sc = new SparkContext(sparkConf) - - driver = mock[SchedulerDriver] - when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING) - taskScheduler = mock[TaskSchedulerImpl] - when(taskScheduler.sc).thenReturn(sc) - externalShuffleClient = mock[MesosExternalShuffleClient] - driverEndpoint = mock[RpcEndpointRef] - when(driverEndpoint.ask(any())(any())).thenReturn(Promise().future) - - backend = createSchedulerBackend(taskScheduler, driver, externalShuffleClient, driverEndpoint) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala ---------------------------------------------------------------------- diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala deleted file mode 100644 index 1d7a86f..0000000 --- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala +++ /dev/null @@ -1,385 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster.mesos - -import java.nio.ByteBuffer -import java.util.Arrays -import java.util.Collection -import java.util.Collections - -import scala.collection.JavaConverters._ -import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer - -import org.apache.mesos.{Protos, Scheduler, SchedulerDriver} -import org.apache.mesos.Protos._ -import org.apache.mesos.Protos.Value.Scalar -import org.mockito.{ArgumentCaptor, Matchers} -import org.mockito.Matchers._ -import org.mockito.Mockito._ -import org.scalatest.mock.MockitoSugar - -import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} -import org.apache.spark.executor.MesosExecutorBackend -import org.apache.spark.scheduler.{LiveListenerBus, SparkListenerExecutorAdded, - TaskDescription, TaskSchedulerImpl, WorkerOffer} -import org.apache.spark.scheduler.cluster.ExecutorInfo - -class MesosFineGrainedSchedulerBackendSuite - extends SparkFunSuite with LocalSparkContext with MockitoSugar { - - test("weburi is set in created scheduler driver") { - val conf = new SparkConf - conf.set("spark.mesos.driver.webui.url", "http://webui") - conf.set("spark.app.name", "name1") - - val sc = mock[SparkContext] - when(sc.conf).thenReturn(conf) - when(sc.sparkUser).thenReturn("sparkUser1") - when(sc.appName).thenReturn("appName1") - - val taskScheduler = mock[TaskSchedulerImpl] - val driver = mock[SchedulerDriver] - when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING) - - val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master") { - override protected def createSchedulerDriver( - masterUrl: String, - scheduler: Scheduler, - sparkUser: String, - appName: String, - conf: SparkConf, - webuiUrl: Option[String] = None, - checkpoint: Option[Boolean] = None, - failoverTimeout: Option[Double] = None, - frameworkId: Option[String] = None): SchedulerDriver = { - markRegistered() - assert(webuiUrl.isDefined) - assert(webuiUrl.get.equals("http://webui")) - driver - } - } - - backend.start() - } - - test("Use configured mesosExecutor.cores for ExecutorInfo") { - val mesosExecutorCores = 3 - val conf = new SparkConf - conf.set("spark.mesos.mesosExecutor.cores", mesosExecutorCores.toString) - - val listenerBus = mock[LiveListenerBus] - listenerBus.post( - SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty))) - - val sc = mock[SparkContext] - when(sc.getSparkHome()).thenReturn(Option("/spark-home")) - - when(sc.conf).thenReturn(conf) - when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String]) - when(sc.executorMemory).thenReturn(100) - when(sc.listenerBus).thenReturn(listenerBus) - val taskScheduler = mock[TaskSchedulerImpl] - when(taskScheduler.CPUS_PER_TASK).thenReturn(2) - - val mesosSchedulerBackend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master") - - val resources = Arrays.asList( - mesosSchedulerBackend.createResource("cpus", 4), - mesosSchedulerBackend.createResource("mem", 1024)) - // uri is null. - val (executorInfo, _) = mesosSchedulerBackend.createExecutorInfo(resources, "test-id") - val executorResources = executorInfo.getResourcesList - val cpus = executorResources.asScala.find(_.getName.equals("cpus")).get.getScalar.getValue - - assert(cpus === mesosExecutorCores) - } - - test("check spark-class location correctly") { - val conf = new SparkConf - conf.set("spark.mesos.executor.home", "/mesos-home") - - val listenerBus = mock[LiveListenerBus] - listenerBus.post( - SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty))) - - val sc = mock[SparkContext] - when(sc.getSparkHome()).thenReturn(Option("/spark-home")) - - when(sc.conf).thenReturn(conf) - when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String]) - when(sc.executorMemory).thenReturn(100) - when(sc.listenerBus).thenReturn(listenerBus) - val taskScheduler = mock[TaskSchedulerImpl] - when(taskScheduler.CPUS_PER_TASK).thenReturn(2) - - val mesosSchedulerBackend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master") - - val resources = Arrays.asList( - mesosSchedulerBackend.createResource("cpus", 4), - mesosSchedulerBackend.createResource("mem", 1024)) - // uri is null. - val (executorInfo, _) = mesosSchedulerBackend.createExecutorInfo(resources, "test-id") - assert(executorInfo.getCommand.getValue === - s" /mesos-home/bin/spark-class ${classOf[MesosExecutorBackend].getName}") - - // uri exists. - conf.set("spark.executor.uri", "hdfs:///test-app-1.0.0.tgz") - val (executorInfo1, _) = mesosSchedulerBackend.createExecutorInfo(resources, "test-id") - assert(executorInfo1.getCommand.getValue === - s"cd test-app-1*; ./bin/spark-class ${classOf[MesosExecutorBackend].getName}") - } - - test("spark docker properties correctly populate the DockerInfo message") { - val taskScheduler = mock[TaskSchedulerImpl] - - val conf = new SparkConf() - .set("spark.mesos.executor.docker.image", "spark/mock") - .set("spark.mesos.executor.docker.forcePullImage", "true") - .set("spark.mesos.executor.docker.volumes", "/a,/b:/b,/c:/c:rw,/d:ro,/e:/e:ro") - .set("spark.mesos.executor.docker.portmaps", "80:8080,53:53:tcp") - - val listenerBus = mock[LiveListenerBus] - listenerBus.post( - SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty))) - - val sc = mock[SparkContext] - when(sc.executorMemory).thenReturn(100) - when(sc.getSparkHome()).thenReturn(Option("/spark-home")) - when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String]) - when(sc.conf).thenReturn(conf) - when(sc.listenerBus).thenReturn(listenerBus) - - val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master") - - val (execInfo, _) = backend.createExecutorInfo( - Arrays.asList(backend.createResource("cpus", 4)), "mockExecutor") - assert(execInfo.getContainer.getDocker.getImage.equals("spark/mock")) - assert(execInfo.getContainer.getDocker.getForcePullImage.equals(true)) - val portmaps = execInfo.getContainer.getDocker.getPortMappingsList - assert(portmaps.get(0).getHostPort.equals(80)) - assert(portmaps.get(0).getContainerPort.equals(8080)) - assert(portmaps.get(0).getProtocol.equals("tcp")) - assert(portmaps.get(1).getHostPort.equals(53)) - assert(portmaps.get(1).getContainerPort.equals(53)) - assert(portmaps.get(1).getProtocol.equals("tcp")) - val volumes = execInfo.getContainer.getVolumesList - assert(volumes.get(0).getContainerPath.equals("/a")) - assert(volumes.get(0).getMode.equals(Volume.Mode.RW)) - assert(volumes.get(1).getContainerPath.equals("/b")) - assert(volumes.get(1).getHostPath.equals("/b")) - assert(volumes.get(1).getMode.equals(Volume.Mode.RW)) - assert(volumes.get(2).getContainerPath.equals("/c")) - assert(volumes.get(2).getHostPath.equals("/c")) - assert(volumes.get(2).getMode.equals(Volume.Mode.RW)) - assert(volumes.get(3).getContainerPath.equals("/d")) - assert(volumes.get(3).getMode.equals(Volume.Mode.RO)) - assert(volumes.get(4).getContainerPath.equals("/e")) - assert(volumes.get(4).getHostPath.equals("/e")) - assert(volumes.get(4).getMode.equals(Volume.Mode.RO)) - } - - test("mesos resource offers result in launching tasks") { - def createOffer(id: Int, mem: Int, cpu: Int): Offer = { - val builder = Offer.newBuilder() - builder.addResourcesBuilder() - .setName("mem") - .setType(Value.Type.SCALAR) - .setScalar(Scalar.newBuilder().setValue(mem)) - builder.addResourcesBuilder() - .setName("cpus") - .setType(Value.Type.SCALAR) - .setScalar(Scalar.newBuilder().setValue(cpu)) - builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build()) - .setFrameworkId(FrameworkID.newBuilder().setValue("f1")) - .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}")) - .setHostname(s"host${id.toString}").build() - } - - val driver = mock[SchedulerDriver] - val taskScheduler = mock[TaskSchedulerImpl] - - val listenerBus = mock[LiveListenerBus] - listenerBus.post( - SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty))) - - val sc = mock[SparkContext] - when(sc.executorMemory).thenReturn(100) - when(sc.getSparkHome()).thenReturn(Option("/path")) - when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String]) - when(sc.conf).thenReturn(new SparkConf) - when(sc.listenerBus).thenReturn(listenerBus) - - val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master") - - val minMem = backend.executorMemory(sc) - val minCpu = 4 - - val mesosOffers = new java.util.ArrayList[Offer] - mesosOffers.add(createOffer(1, minMem, minCpu)) - mesosOffers.add(createOffer(2, minMem - 1, minCpu)) - mesosOffers.add(createOffer(3, minMem, minCpu)) - - val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](2) - expectedWorkerOffers += new WorkerOffer( - mesosOffers.get(0).getSlaveId.getValue, - mesosOffers.get(0).getHostname, - (minCpu - backend.mesosExecutorCores).toInt - ) - expectedWorkerOffers += new WorkerOffer( - mesosOffers.get(2).getSlaveId.getValue, - mesosOffers.get(2).getHostname, - (minCpu - backend.mesosExecutorCores).toInt - ) - val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0))) - when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc))) - when(taskScheduler.CPUS_PER_TASK).thenReturn(2) - - val capture = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]]) - when( - driver.launchTasks( - Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)), - capture.capture(), - any(classOf[Filters]) - ) - ).thenReturn(Status.valueOf(1)) - when(driver.declineOffer(mesosOffers.get(1).getId)).thenReturn(Status.valueOf(1)) - when(driver.declineOffer(mesosOffers.get(2).getId)).thenReturn(Status.valueOf(1)) - - backend.resourceOffers(driver, mesosOffers) - - verify(driver, times(1)).launchTasks( - Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)), - capture.capture(), - any(classOf[Filters]) - ) - verify(driver, times(1)).declineOffer(mesosOffers.get(1).getId) - verify(driver, times(1)).declineOffer(mesosOffers.get(2).getId) - assert(capture.getValue.size() === 1) - val taskInfo = capture.getValue.iterator().next() - assert(taskInfo.getName.equals("n1")) - val cpus = taskInfo.getResourcesList.get(0) - assert(cpus.getName.equals("cpus")) - assert(cpus.getScalar.getValue.equals(2.0)) - assert(taskInfo.getSlaveId.getValue.equals("s1")) - - // Unwanted resources offered on an existing node. Make sure they are declined - val mesosOffers2 = new java.util.ArrayList[Offer] - mesosOffers2.add(createOffer(1, minMem, minCpu)) - reset(taskScheduler) - reset(driver) - when(taskScheduler.resourceOffers(any(classOf[IndexedSeq[WorkerOffer]]))).thenReturn(Seq(Seq())) - when(taskScheduler.CPUS_PER_TASK).thenReturn(2) - when(driver.declineOffer(mesosOffers2.get(0).getId)).thenReturn(Status.valueOf(1)) - - backend.resourceOffers(driver, mesosOffers2) - verify(driver, times(1)).declineOffer(mesosOffers2.get(0).getId) - } - - test("can handle multiple roles") { - val driver = mock[SchedulerDriver] - val taskScheduler = mock[TaskSchedulerImpl] - - val listenerBus = mock[LiveListenerBus] - listenerBus.post( - SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty))) - - val sc = mock[SparkContext] - when(sc.executorMemory).thenReturn(100) - when(sc.getSparkHome()).thenReturn(Option("/path")) - when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String]) - when(sc.conf).thenReturn(new SparkConf) - when(sc.listenerBus).thenReturn(listenerBus) - - val id = 1 - val builder = Offer.newBuilder() - builder.addResourcesBuilder() - .setName("mem") - .setType(Value.Type.SCALAR) - .setRole("prod") - .setScalar(Scalar.newBuilder().setValue(500)) - builder.addResourcesBuilder() - .setName("cpus") - .setRole("prod") - .setType(Value.Type.SCALAR) - .setScalar(Scalar.newBuilder().setValue(1)) - builder.addResourcesBuilder() - .setName("mem") - .setRole("dev") - .setType(Value.Type.SCALAR) - .setScalar(Scalar.newBuilder().setValue(600)) - builder.addResourcesBuilder() - .setName("cpus") - .setRole("dev") - .setType(Value.Type.SCALAR) - .setScalar(Scalar.newBuilder().setValue(2)) - val offer = builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build()) - .setFrameworkId(FrameworkID.newBuilder().setValue("f1")) - .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}")) - .setHostname(s"host${id.toString}").build() - - val mesosOffers = new java.util.ArrayList[Offer] - mesosOffers.add(offer) - - val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master") - - val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](1) - expectedWorkerOffers += new WorkerOffer( - mesosOffers.get(0).getSlaveId.getValue, - mesosOffers.get(0).getHostname, - 2 // Deducting 1 for executor - ) - - val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0))) - when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc))) - when(taskScheduler.CPUS_PER_TASK).thenReturn(1) - - val capture = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]]) - when( - driver.launchTasks( - Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)), - capture.capture(), - any(classOf[Filters]) - ) - ).thenReturn(Status.valueOf(1)) - - backend.resourceOffers(driver, mesosOffers) - - verify(driver, times(1)).launchTasks( - Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)), - capture.capture(), - any(classOf[Filters]) - ) - - assert(capture.getValue.size() === 1) - val taskInfo = capture.getValue.iterator().next() - assert(taskInfo.getName.equals("n1")) - assert(taskInfo.getResourcesCount === 1) - val cpusDev = taskInfo.getResourcesList.get(0) - assert(cpusDev.getName.equals("cpus")) - assert(cpusDev.getScalar.getValue.equals(1.0)) - assert(cpusDev.getRole.equals("dev")) - val executorResources = taskInfo.getExecutor.getResourcesList.asScala - assert(executorResources.exists { r => - r.getName.equals("mem") && r.getScalar.getValue.equals(484.0) && r.getRole.equals("prod") - }) - assert(executorResources.exists { r => - r.getName.equals("cpus") && r.getScalar.getValue.equals(1.0) && r.getRole.equals("prod") - }) - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
