http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
deleted file mode 100644
index 1bbede1..0000000
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ /dev/null
@@ -1,494 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import java.util.{List => JList}
-import java.util.concurrent.CountDownLatch
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-import scala.util.control.NonFatal
-
-import com.google.common.base.Splitter
-import org.apache.mesos.{MesosSchedulerDriver, Protos, Scheduler, 
SchedulerDriver}
-import org.apache.mesos.Protos._
-import org.apache.mesos.protobuf.{ByteString, GeneratedMessage}
-
-import org.apache.spark.{SparkConf, SparkContext, SparkException}
-import org.apache.spark.internal.Logging
-import org.apache.spark.util.Utils
-
-
-/**
- * Shared trait for implementing a Mesos Scheduler. This holds common state 
and helper
- * methods and Mesos scheduler will use.
- */
-private[mesos] trait MesosSchedulerUtils extends Logging {
-  // Lock used to wait for scheduler to be registered
-  private final val registerLatch = new CountDownLatch(1)
-
-  // Driver for talking to Mesos
-  protected var mesosDriver: SchedulerDriver = null
-
-  /**
-   * Creates a new MesosSchedulerDriver that communicates to the Mesos master.
-   *
-   * @param masterUrl The url to connect to Mesos master
-   * @param scheduler the scheduler class to receive scheduler callbacks
-   * @param sparkUser User to impersonate with when running tasks
-   * @param appName The framework name to display on the Mesos UI
-   * @param conf Spark configuration
-   * @param webuiUrl The WebUI url to link from Mesos UI
-   * @param checkpoint Option to checkpoint tasks for failover
-   * @param failoverTimeout Duration Mesos master expect scheduler to 
reconnect on disconnect
-   * @param frameworkId The id of the new framework
-   */
-  protected def createSchedulerDriver(
-      masterUrl: String,
-      scheduler: Scheduler,
-      sparkUser: String,
-      appName: String,
-      conf: SparkConf,
-      webuiUrl: Option[String] = None,
-      checkpoint: Option[Boolean] = None,
-      failoverTimeout: Option[Double] = None,
-      frameworkId: Option[String] = None): SchedulerDriver = {
-    val fwInfoBuilder = 
FrameworkInfo.newBuilder().setUser(sparkUser).setName(appName)
-    val credBuilder = Credential.newBuilder()
-    webuiUrl.foreach { url => fwInfoBuilder.setWebuiUrl(url) }
-    checkpoint.foreach { checkpoint => fwInfoBuilder.setCheckpoint(checkpoint) 
}
-    failoverTimeout.foreach { timeout => 
fwInfoBuilder.setFailoverTimeout(timeout) }
-    frameworkId.foreach { id =>
-      fwInfoBuilder.setId(FrameworkID.newBuilder().setValue(id).build())
-    }
-    conf.getOption("spark.mesos.principal").foreach { principal =>
-      fwInfoBuilder.setPrincipal(principal)
-      credBuilder.setPrincipal(principal)
-    }
-    conf.getOption("spark.mesos.secret").foreach { secret =>
-      credBuilder.setSecret(secret)
-    }
-    if (credBuilder.hasSecret && !fwInfoBuilder.hasPrincipal) {
-      throw new SparkException(
-        "spark.mesos.principal must be configured when spark.mesos.secret is 
set")
-    }
-    conf.getOption("spark.mesos.role").foreach { role =>
-      fwInfoBuilder.setRole(role)
-    }
-    if (credBuilder.hasPrincipal) {
-      new MesosSchedulerDriver(
-        scheduler, fwInfoBuilder.build(), masterUrl, credBuilder.build())
-    } else {
-      new MesosSchedulerDriver(scheduler, fwInfoBuilder.build(), masterUrl)
-    }
-  }
-
-  /**
-   * Starts the MesosSchedulerDriver and stores the current running driver to 
this new instance.
-   * This driver is expected to not be running.
-   * This method returns only after the scheduler has registered with Mesos.
-   */
-  def startScheduler(newDriver: SchedulerDriver): Unit = {
-    synchronized {
-      if (mesosDriver != null) {
-        registerLatch.await()
-        return
-      }
-      @volatile
-      var error: Option[Exception] = None
-
-      // We create a new thread that will block inside `mesosDriver.run`
-      // until the scheduler exists
-      new Thread(Utils.getFormattedClassName(this) + "-mesos-driver") {
-        setDaemon(true)
-        override def run() {
-          try {
-            mesosDriver = newDriver
-            val ret = mesosDriver.run()
-            logInfo("driver.run() returned with code " + ret)
-            if (ret != null && ret.equals(Status.DRIVER_ABORTED)) {
-              error = Some(new SparkException("Error starting driver, 
DRIVER_ABORTED"))
-              markErr()
-            }
-          } catch {
-            case e: Exception =>
-              logError("driver.run() failed", e)
-              error = Some(e)
-              markErr()
-          }
-        }
-      }.start()
-
-      registerLatch.await()
-
-      // propagate any error to the calling thread. This ensures that 
SparkContext creation fails
-      // without leaving a broken context that won't be able to schedule any 
tasks
-      error.foreach(throw _)
-    }
-  }
-
-  def getResource(res: JList[Resource], name: String): Double = {
-    // A resource can have multiple values in the offer since it can either be 
from
-    // a specific role or wildcard.
-    res.asScala.filter(_.getName == name).map(_.getScalar.getValue).sum
-  }
-
-  /**
-   * Transforms a range resource to a list of ranges
-   *
-   * @param res the mesos resource list
-   * @param name the name of the resource
-   * @return the list of ranges returned
-   */
-  protected def getRangeResource(res: JList[Resource], name: String): 
List[(Long, Long)] = {
-    // A resource can have multiple values in the offer since it can either be 
from
-    // a specific role or wildcard.
-    res.asScala.filter(_.getName == 
name).flatMap(_.getRanges.getRangeList.asScala
-      .map(r => (r.getBegin, r.getEnd)).toList).toList
-  }
-
-  /**
-   * Signal that the scheduler has registered with Mesos.
-   */
-  protected def markRegistered(): Unit = {
-    registerLatch.countDown()
-  }
-
-  protected def markErr(): Unit = {
-    registerLatch.countDown()
-  }
-
-  def createResource(name: String, amount: Double, role: Option[String] = 
None): Resource = {
-    val builder = Resource.newBuilder()
-      .setName(name)
-      .setType(Value.Type.SCALAR)
-      .setScalar(Value.Scalar.newBuilder().setValue(amount).build())
-
-    role.foreach { r => builder.setRole(r) }
-
-    builder.build()
-  }
-
-  /**
-   * Partition the existing set of resources into two groups, those remaining 
to be
-   * scheduled and those requested to be used for a new task.
-   *
-   * @param resources The full list of available resources
-   * @param resourceName The name of the resource to take from the available 
resources
-   * @param amountToUse The amount of resources to take from the available 
resources
-   * @return The remaining resources list and the used resources list.
-   */
-  def partitionResources(
-      resources: JList[Resource],
-      resourceName: String,
-      amountToUse: Double): (List[Resource], List[Resource]) = {
-    var remain = amountToUse
-    var requestedResources = new ArrayBuffer[Resource]
-    val remainingResources = resources.asScala.map {
-      case r =>
-        if (remain > 0 &&
-          r.getType == Value.Type.SCALAR &&
-          r.getScalar.getValue > 0.0 &&
-          r.getName == resourceName) {
-          val usage = Math.min(remain, r.getScalar.getValue)
-          requestedResources += createResource(resourceName, usage, 
Some(r.getRole))
-          remain -= usage
-          createResource(resourceName, r.getScalar.getValue - usage, 
Some(r.getRole))
-        } else {
-          r
-        }
-    }
-
-    // Filter any resource that has depleted.
-    val filteredResources =
-      remainingResources.filter(r => r.getType != Value.Type.SCALAR || 
r.getScalar.getValue > 0.0)
-
-    (filteredResources.toList, requestedResources.toList)
-  }
-
-  /** Helper method to get the key,value-set pair for a Mesos Attribute 
protobuf */
-  protected def getAttribute(attr: Attribute): (String, Set[String]) = {
-    (attr.getName, attr.getText.getValue.split(',').toSet)
-  }
-
-
-  /** Build a Mesos resource protobuf object */
-  protected def createResource(resourceName: String, quantity: Double): 
Protos.Resource = {
-    Resource.newBuilder()
-      .setName(resourceName)
-      .setType(Value.Type.SCALAR)
-      .setScalar(Value.Scalar.newBuilder().setValue(quantity).build())
-      .build()
-  }
-
-  /**
-   * Converts the attributes from the resource offer into a Map of name -> 
Attribute Value
-   * The attribute values are the mesos attribute types and they are
-   *
-   * @param offerAttributes the attributes offered
-   * @return
-   */
-  protected def toAttributeMap(offerAttributes: JList[Attribute]): Map[String, 
GeneratedMessage] = {
-    offerAttributes.asScala.map { attr =>
-      val attrValue = attr.getType match {
-        case Value.Type.SCALAR => attr.getScalar
-        case Value.Type.RANGES => attr.getRanges
-        case Value.Type.SET => attr.getSet
-        case Value.Type.TEXT => attr.getText
-      }
-      (attr.getName, attrValue)
-    }.toMap
-  }
-
-
-  /**
-   * Match the requirements (if any) to the offer attributes.
-   * if attribute requirements are not specified - return true
-   * else if attribute is defined and no values are given, simple attribute 
presence is performed
-   * else if attribute name and value is specified, subset match is performed 
on slave attributes
-   */
-  def matchesAttributeRequirements(
-      slaveOfferConstraints: Map[String, Set[String]],
-      offerAttributes: Map[String, GeneratedMessage]): Boolean = {
-    slaveOfferConstraints.forall {
-      // offer has the required attribute and subsumes the required values for 
that attribute
-      case (name, requiredValues) =>
-        offerAttributes.get(name) match {
-          case None => false
-          case Some(_) if requiredValues.isEmpty => true // empty value 
matches presence
-          case Some(scalarValue: Value.Scalar) =>
-            // check if provided values is less than equal to the offered 
values
-            requiredValues.map(_.toDouble).exists(_ <= scalarValue.getValue)
-          case Some(rangeValue: Value.Range) =>
-            val offerRange = rangeValue.getBegin to rangeValue.getEnd
-            // Check if there is some required value that is between the 
ranges specified
-            // Note: We only support the ability to specify discrete values, 
in the future
-            // we may expand it to subsume ranges specified with a XX..YY 
value or something
-            // similar to that.
-            requiredValues.map(_.toLong).exists(offerRange.contains(_))
-          case Some(offeredValue: Value.Set) =>
-            // check if the specified required values is a subset of offered 
set
-            requiredValues.subsetOf(offeredValue.getItemList.asScala.toSet)
-          case Some(textValue: Value.Text) =>
-            // check if the specified value is equal, if multiple values are 
specified
-            // we succeed if any of them match.
-            requiredValues.contains(textValue.getValue)
-        }
-    }
-  }
-
-  /**
-   * Parses the attributes constraints provided to spark and build a matching 
data struct:
-   *  Map[<attribute-name>, Set[values-to-match]]
-   *  The constraints are specified as ';' separated key-value pairs where 
keys and values
-   *  are separated by ':'. The ':' implies equality (for singular values) and 
"is one of" for
-   *  multiple values (comma separated). For example:
-   *  {{{
-   *  parseConstraintString("os:centos7;zone:us-east-1a,us-east-1b")
-   *  // would result in
-   *  <code>
-   *  Map(
-   *    "os" -> Set("centos7"),
-   *    "zone":   -> Set("us-east-1a", "us-east-1b")
-   *  )
-   *  }}}
-   *
-   *  Mesos documentation: 
http://mesos.apache.org/documentation/attributes-resources/
-   *                       
https://github.com/apache/mesos/blob/master/src/common/values.cpp
-   *                       
https://github.com/apache/mesos/blob/master/src/common/attributes.cpp
-   *
-   * @param constraintsVal constaints string consisting of ';' separated 
key-value pairs (separated
-   *                       by ':')
-   * @return  Map of constraints to match resources offers.
-   */
-  def parseConstraintString(constraintsVal: String): Map[String, Set[String]] 
= {
-    /*
-      Based on mesos docs:
-      attributes : attribute ( ";" attribute )*
-      attribute : labelString ":" ( labelString | "," )+
-      labelString : [a-zA-Z0-9_/.-]
-    */
-    val splitter = Splitter.on(';').trimResults().withKeyValueSeparator(':')
-    // kv splitter
-    if (constraintsVal.isEmpty) {
-      Map()
-    } else {
-      try {
-        splitter.split(constraintsVal).asScala.toMap.mapValues(v =>
-          if (v == null || v.isEmpty) {
-            Set[String]()
-          } else {
-            v.split(',').toSet
-          }
-        )
-      } catch {
-        case NonFatal(e) =>
-          throw new IllegalArgumentException(s"Bad constraint string: 
$constraintsVal", e)
-      }
-    }
-  }
-
-  // These defaults copied from YARN
-  private val MEMORY_OVERHEAD_FRACTION = 0.10
-  private val MEMORY_OVERHEAD_MINIMUM = 384
-
-  /**
-   * Return the amount of memory to allocate to each executor, taking into 
account
-   * container overheads.
-   *
-   * @param sc SparkContext to use to get 
`spark.mesos.executor.memoryOverhead` value
-   * @return memory requirement as (0.1 * <memoryOverhead>) or 
MEMORY_OVERHEAD_MINIMUM
-   *         (whichever is larger)
-   */
-  def executorMemory(sc: SparkContext): Int = {
-    sc.conf.getInt("spark.mesos.executor.memoryOverhead",
-      math.max(MEMORY_OVERHEAD_FRACTION * sc.executorMemory, 
MEMORY_OVERHEAD_MINIMUM).toInt) +
-      sc.executorMemory
-  }
-
-  def setupUris(uris: String, builder: CommandInfo.Builder): Unit = {
-    uris.split(",").foreach { uri =>
-      builder.addUris(CommandInfo.URI.newBuilder().setValue(uri.trim()))
-    }
-  }
-
-  protected def getRejectOfferDurationForUnmetConstraints(sc: SparkContext): 
Long = {
-    
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", 
"120s")
-  }
-
-  protected def getRejectOfferDurationForReachedMaxCores(sc: SparkContext): 
Long = {
-    
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", 
"120s")
-  }
-
-  /**
-   * Checks executor ports if they are within some range of the offered list 
of ports ranges,
-   *
-   * @param conf the Spark Config
-   * @param ports the list of ports to check
-   * @return true if ports are within range false otherwise
-   */
-  protected def checkPorts(conf: SparkConf, ports: List[(Long, Long)]): 
Boolean = {
-
-    def checkIfInRange(port: Long, ps: List[(Long, Long)]): Boolean = {
-      ps.exists{case (rangeStart, rangeEnd) => rangeStart <= port & rangeEnd 
>= port }
-    }
-
-    val portsToCheck = nonZeroPortValuesFromConfig(conf)
-    val withinRange = portsToCheck.forall(p => checkIfInRange(p, ports))
-    // make sure we have enough ports to allocate per offer
-    val enoughPorts =
-    ports.map{case (rangeStart, rangeEnd) => rangeEnd - rangeStart + 1}.sum >= 
portsToCheck.size
-    enoughPorts && withinRange
-  }
-
-  /**
-   * Partitions port resources.
-   *
-   * @param requestedPorts non-zero ports to assign
-   * @param offeredResources the resources offered
-   * @return resources left, port resources to be used.
-   */
-  def partitionPortResources(requestedPorts: List[Long], offeredResources: 
List[Resource])
-    : (List[Resource], List[Resource]) = {
-    if (requestedPorts.isEmpty) {
-      (offeredResources, List[Resource]())
-    } else {
-      // partition port offers
-      val (resourcesWithoutPorts, portResources) = 
filterPortResources(offeredResources)
-
-      val portsAndRoles = requestedPorts.
-        map(x => (x, findPortAndGetAssignedRangeRole(x, portResources)))
-
-      val assignedPortResources = createResourcesFromPorts(portsAndRoles)
-
-      // ignore non-assigned port resources, they will be declined implicitly 
by mesos
-      // no need for splitting port resources.
-      (resourcesWithoutPorts, assignedPortResources)
-    }
-  }
-
-  val managedPortNames = List("spark.executor.port", "spark.blockManager.port")
-
-  /**
-   * The values of the non-zero ports to be used by the executor process.
-   * @param conf the spark config to use
-   * @return the ono-zero values of the ports
-   */
-  def nonZeroPortValuesFromConfig(conf: SparkConf): List[Long] = {
-    managedPortNames.map(conf.getLong(_, 0)).filter( _ != 0)
-  }
-
-  /** Creates a mesos resource for a specific port number. */
-  private def createResourcesFromPorts(portsAndRoles: List[(Long, String)]) : 
List[Resource] = {
-    portsAndRoles.flatMap{ case (port, role) =>
-      createMesosPortResource(List((port, port)), Some(role))}
-  }
-
-  /** Helper to create mesos resources for specific port ranges. */
-  private def createMesosPortResource(
-      ranges: List[(Long, Long)],
-      role: Option[String] = None): List[Resource] = {
-    ranges.map { case (rangeStart, rangeEnd) =>
-      val rangeValue = Value.Range.newBuilder()
-        .setBegin(rangeStart)
-        .setEnd(rangeEnd)
-      val builder = Resource.newBuilder()
-        .setName("ports")
-        .setType(Value.Type.RANGES)
-        .setRanges(Value.Ranges.newBuilder().addRange(rangeValue))
-      role.foreach(r => builder.setRole(r))
-      builder.build()
-    }
-  }
-
- /**
-  * Helper to assign a port to an offered range and get the latter's role
-  * info to use it later on.
-  */
-  private def findPortAndGetAssignedRangeRole(port: Long, portResources: 
List[Resource])
-    : String = {
-
-    val ranges = portResources.
-      map(resource =>
-        (resource.getRole, resource.getRanges.getRangeList.asScala
-          .map(r => (r.getBegin, r.getEnd)).toList))
-
-    val rangePortRole = ranges
-      .find { case (role, rangeList) => rangeList
-        .exists{ case (rangeStart, rangeEnd) => rangeStart <= port & rangeEnd 
>= port}}
-    // this is safe since we have previously checked about the ranges (see 
checkPorts method)
-    rangePortRole.map{ case (role, rangeList) => role}.get
-  }
-
-  /** Retrieves the port resources from a list of mesos offered resources */
-  private def filterPortResources(resources: List[Resource]): (List[Resource], 
List[Resource]) = {
-    resources.partition { r => !(r.getType == Value.Type.RANGES && r.getName 
== "ports") }
-  }
-
-  /**
-   * spark.mesos.driver.frameworkId is set by the cluster dispatcher to 
correlate driver
-   * submissions with frameworkIDs.  However, this causes issues when a driver 
process launches
-   * more than one framework (more than one SparkContext(, because they all 
try to register with
-   * the same frameworkID.  To enforce that only the first driver registers 
with the configured
-   * framework ID, the driver calls this method after the first registration.
-   */
-  def unsetFrameworkID(sc: SparkContext) {
-    sc.conf.remove("spark.mesos.driver.frameworkId")
-    System.clearProperty("spark.mesos.driver.frameworkId")
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala
deleted file mode 100644
index 8370b61..0000000
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import java.nio.ByteBuffer
-
-import org.apache.mesos.protobuf.ByteString
-
-import org.apache.spark.internal.Logging
-
-/**
- * Wrapper for serializing the data sent when launching Mesos tasks.
- */
-private[spark] case class MesosTaskLaunchData(
-  serializedTask: ByteBuffer,
-  attemptNumber: Int) extends Logging {
-
-  def toByteString: ByteString = {
-    val dataBuffer = ByteBuffer.allocate(4 + serializedTask.limit)
-    dataBuffer.putInt(attemptNumber)
-    dataBuffer.put(serializedTask)
-    dataBuffer.rewind
-    logDebug(s"ByteBuffer size: [${dataBuffer.remaining}]")
-    ByteString.copyFrom(dataBuffer)
-  }
-}
-
-private[spark] object MesosTaskLaunchData extends Logging {
-  def fromByteString(byteString: ByteString): MesosTaskLaunchData = {
-    val byteBuffer = byteString.asReadOnlyByteBuffer()
-    logDebug(s"ByteBuffer size: [${byteBuffer.remaining}]")
-    val attemptNumber = byteBuffer.getInt // updates the position by 4 bytes
-    val serializedTask = byteBuffer.slice() // subsequence starting at the 
current position
-    MesosTaskLaunchData(serializedTask, attemptNumber)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
index 7d75a93..f8938df 100644
--- 
a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
@@ -22,7 +22,6 @@ import org.scalatest.PrivateMethodTester
 import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler.{SchedulerBackend, TaskScheduler, 
TaskSchedulerImpl}
 import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
-import 
org.apache.spark.scheduler.cluster.mesos.{MesosCoarseGrainedSchedulerBackend, 
MesosFineGrainedSchedulerBackend}
 import org.apache.spark.scheduler.local.LocalSchedulerBackend
 
 
@@ -130,31 +129,4 @@ class SparkContextSchedulerCreationSuite
       case _ => fail()
     }
   }
-
-  def testMesos(master: String, expectedClass: Class[_], coarse: Boolean) {
-    val conf = new SparkConf().set("spark.mesos.coarse", coarse.toString)
-    try {
-      val sched = createTaskScheduler(master, "client", conf)
-      assert(sched.backend.getClass === expectedClass)
-    } catch {
-      case e: UnsatisfiedLinkError =>
-        assert(e.getMessage.contains("mesos"))
-        logWarning("Mesos not available, could not test actual Mesos scheduler 
creation")
-      case e: Throwable => fail(e)
-    }
-  }
-
-  test("mesos fine-grained") {
-    testMesos("mesos://localhost:1234", 
classOf[MesosFineGrainedSchedulerBackend], coarse = false)
-  }
-
-  test("mesos coarse-grained") {
-    testMesos("mesos://localhost:1234", 
classOf[MesosCoarseGrainedSchedulerBackend], coarse = true)
-  }
-
-  test("mesos with zookeeper") {
-    testMesos("mesos://zk://localhost:1234,localhost:2345",
-      classOf[MesosFineGrainedSchedulerBackend], coarse = false)
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
deleted file mode 100644
index 87d9080..0000000
--- 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import java.util.{Collection, Collections, Date}
-
-import scala.collection.JavaConverters._
-
-import org.apache.mesos.Protos._
-import org.apache.mesos.Protos.Value.{Scalar, Type}
-import org.apache.mesos.SchedulerDriver
-import org.mockito.{ArgumentCaptor, Matchers}
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.Command
-import org.apache.spark.deploy.mesos.MesosDriverDescription
-
-class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext 
with MockitoSugar {
-
-  private val command = new Command("mainClass", Seq("arg"), Map(), Seq(), 
Seq(), Seq())
-  private var driver: SchedulerDriver = _
-  private var scheduler: MesosClusterScheduler = _
-
-  private def setScheduler(sparkConfVars: Map[String, String] = null): Unit = {
-    val conf = new SparkConf()
-    conf.setMaster("mesos://localhost:5050")
-    conf.setAppName("spark mesos")
-
-    if (sparkConfVars != null) {
-      conf.setAll(sparkConfVars)
-    }
-
-    driver = mock[SchedulerDriver]
-    scheduler = new MesosClusterScheduler(
-      new BlackHoleMesosClusterPersistenceEngineFactory, conf) {
-      override def start(): Unit = { ready = true }
-    }
-    scheduler.start()
-  }
-
-  test("can queue drivers") {
-    setScheduler()
-
-    val response = scheduler.submitDriver(
-      new MesosDriverDescription("d1", "jar", 1000, 1, true,
-        command, Map[String, String](), "s1", new Date()))
-    assert(response.success)
-    val response2 =
-      scheduler.submitDriver(new MesosDriverDescription(
-        "d1", "jar", 1000, 1, true, command, Map[String, String](), "s2", new 
Date()))
-    assert(response2.success)
-    val state = scheduler.getSchedulerState()
-    val queuedDrivers = state.queuedDrivers.toList
-    assert(queuedDrivers(0).submissionId == response.submissionId)
-    assert(queuedDrivers(1).submissionId == response2.submissionId)
-  }
-
-  test("can kill queued drivers") {
-    setScheduler()
-
-    val response = scheduler.submitDriver(
-        new MesosDriverDescription("d1", "jar", 1000, 1, true,
-          command, Map[String, String](), "s1", new Date()))
-    assert(response.success)
-    val killResponse = scheduler.killDriver(response.submissionId)
-    assert(killResponse.success)
-    val state = scheduler.getSchedulerState()
-    assert(state.queuedDrivers.isEmpty)
-  }
-
-  test("can handle multiple roles") {
-    setScheduler()
-
-    val driver = mock[SchedulerDriver]
-    val response = scheduler.submitDriver(
-      new MesosDriverDescription("d1", "jar", 1200, 1.5, true,
-        command,
-        Map(("spark.mesos.executor.home", "test"), ("spark.app.name", "test")),
-        "s1",
-        new Date()))
-    assert(response.success)
-    val offer = Offer.newBuilder()
-      .addResources(
-        Resource.newBuilder().setRole("*")
-          
.setScalar(Scalar.newBuilder().setValue(1).build()).setName("cpus").setType(Type.SCALAR))
-      .addResources(
-        Resource.newBuilder().setRole("*")
-          .setScalar(Scalar.newBuilder().setValue(1000).build())
-          .setName("mem")
-          .setType(Type.SCALAR))
-      .addResources(
-        Resource.newBuilder().setRole("role2")
-          
.setScalar(Scalar.newBuilder().setValue(1).build()).setName("cpus").setType(Type.SCALAR))
-      .addResources(
-        Resource.newBuilder().setRole("role2")
-          
.setScalar(Scalar.newBuilder().setValue(500).build()).setName("mem").setType(Type.SCALAR))
-      .setId(OfferID.newBuilder().setValue("o1").build())
-      .setFrameworkId(FrameworkID.newBuilder().setValue("f1").build())
-      .setSlaveId(SlaveID.newBuilder().setValue("s1").build())
-      .setHostname("host1")
-      .build()
-
-    val capture = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]])
-
-    when(
-      driver.launchTasks(
-        Matchers.eq(Collections.singleton(offer.getId)),
-        capture.capture())
-    ).thenReturn(Status.valueOf(1))
-
-    scheduler.resourceOffers(driver, Collections.singletonList(offer))
-
-    val taskInfos = capture.getValue
-    assert(taskInfos.size() == 1)
-    val taskInfo = taskInfos.iterator().next()
-    val resources = taskInfo.getResourcesList
-    assert(scheduler.getResource(resources, "cpus") == 1.5)
-    assert(scheduler.getResource(resources, "mem") == 1200)
-    val resourcesSeq: Seq[Resource] = resources.asScala
-    val cpus = resourcesSeq.filter(_.getName.equals("cpus")).toList
-    assert(cpus.size == 2)
-    assert(cpus.exists(_.getRole().equals("role2")))
-    assert(cpus.exists(_.getRole().equals("*")))
-    val mem = resourcesSeq.filter(_.getName.equals("mem")).toList
-    assert(mem.size == 2)
-    assert(mem.exists(_.getRole().equals("role2")))
-    assert(mem.exists(_.getRole().equals("*")))
-
-    verify(driver, times(1)).launchTasks(
-      Matchers.eq(Collections.singleton(offer.getId)),
-      capture.capture()
-    )
-  }
-
-  test("escapes commandline args for the shell") {
-    setScheduler()
-
-    val conf = new SparkConf()
-    conf.setMaster("mesos://localhost:5050")
-    conf.setAppName("spark mesos")
-    val scheduler = new MesosClusterScheduler(
-      new BlackHoleMesosClusterPersistenceEngineFactory, conf) {
-      override def start(): Unit = { ready = true }
-    }
-    val escape = scheduler.shellEscape _
-    def wrapped(str: String): String = "\"" + str + "\""
-
-    // Wrapped in quotes
-    assert(escape("'should be left untouched'") === "'should be left 
untouched'")
-    assert(escape("\"should be left untouched\"") === "\"should be left 
untouched\"")
-
-    // Harmless
-    assert(escape("") === "")
-    assert(escape("harmless") === "harmless")
-    assert(escape("har-m.l3ss") === "har-m.l3ss")
-
-    // Special Chars escape
-    assert(escape("should escape this \" quote") === wrapped("should escape 
this \\\" quote"))
-    assert(escape("shouldescape\"quote") === wrapped("shouldescape\\\"quote"))
-    assert(escape("should escape this $ dollar") === wrapped("should escape 
this \\$ dollar"))
-    assert(escape("should escape this ` backtick") === wrapped("should escape 
this \\` backtick"))
-    assert(escape("""should escape this \ backslash""")
-      === wrapped("""should escape this \\ backslash"""))
-    assert(escape("""\"?""") === wrapped("""\\\"?"""))
-
-
-    // Special Chars no escape only wrap
-    List(" ", "'", "<", ">", "&", "|", "?", "*", ";", "!", "#", "(", 
")").foreach(char => {
-      assert(escape(s"onlywrap${char}this") === 
wrapped(s"onlywrap${char}this"))
-    })
-  }
-
-  test("supports spark.mesos.driverEnv.*") {
-    setScheduler()
-
-    val mem = 1000
-    val cpu = 1
-
-    val response = scheduler.submitDriver(
-      new MesosDriverDescription("d1", "jar", mem, cpu, true,
-        command,
-        Map("spark.mesos.executor.home" -> "test",
-          "spark.app.name" -> "test",
-          "spark.mesos.driverEnv.TEST_ENV" -> "TEST_VAL"),
-        "s1",
-        new Date()))
-    assert(response.success)
-
-    val offer = Utils.createOffer("o1", "s1", mem, cpu)
-    scheduler.resourceOffers(driver, List(offer).asJava)
-    val tasks = Utils.verifyTaskLaunched(driver, "o1")
-    val env = 
tasks.head.getCommand.getEnvironment.getVariablesList.asScala.map(v =>
-      (v.getName, v.getValue)).toMap
-    assert(env.getOrElse("TEST_ENV", null) == "TEST_VAL")
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
deleted file mode 100644
index c063797..0000000
--- 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ /dev/null
@@ -1,517 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-import scala.reflect.ClassTag
-
-import org.apache.mesos.{Protos, Scheduler, SchedulerDriver}
-import org.apache.mesos.Protos._
-import org.mockito.Matchers
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.BeforeAndAfter
-
-import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, 
SparkContext, SparkFunSuite}
-import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
-import org.apache.spark.rpc.RpcEndpointRef
-import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
-import org.apache.spark.scheduler.TaskSchedulerImpl
-import org.apache.spark.scheduler.cluster.mesos.Utils._
-
-class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
-    with LocalSparkContext
-    with MockitoSugar
-    with BeforeAndAfter {
-
-  private var sparkConf: SparkConf = _
-  private var driver: SchedulerDriver = _
-  private var taskScheduler: TaskSchedulerImpl = _
-  private var backend: MesosCoarseGrainedSchedulerBackend = _
-  private var externalShuffleClient: MesosExternalShuffleClient = _
-  private var driverEndpoint: RpcEndpointRef = _
-  @volatile private var stopCalled = false
-
-  test("mesos supports killing and limiting executors") {
-    setBackend()
-    sparkConf.set("spark.driver.host", "driverHost")
-    sparkConf.set("spark.driver.port", "1234")
-
-    val minMem = backend.executorMemory(sc)
-    val minCpu = 4
-    val offers = List((minMem, minCpu))
-
-    // launches a task on a valid offer
-    offerResources(offers)
-    verifyTaskLaunched(driver, "o1")
-
-    // kills executors
-    backend.doRequestTotalExecutors(0)
-    assert(backend.doKillExecutors(Seq("0")))
-    val taskID0 = createTaskId("0")
-    verify(driver, times(1)).killTask(taskID0)
-
-    // doesn't launch a new task when requested executors == 0
-    offerResources(offers, 2)
-    verifyDeclinedOffer(driver, createOfferId("o2"))
-
-    // Launches a new task when requested executors is positive
-    backend.doRequestTotalExecutors(2)
-    offerResources(offers, 2)
-    verifyTaskLaunched(driver, "o2")
-  }
-
-  test("mesos supports killing and relaunching tasks with executors") {
-    setBackend()
-
-    // launches a task on a valid offer
-    val minMem = backend.executorMemory(sc) + 1024
-    val minCpu = 4
-    val offer1 = (minMem, minCpu)
-    val offer2 = (minMem, 1)
-    offerResources(List(offer1, offer2))
-    verifyTaskLaunched(driver, "o1")
-
-    // accounts for a killed task
-    val status = createTaskStatus("0", "s1", TaskState.TASK_KILLED)
-    backend.statusUpdate(driver, status)
-    verify(driver, times(1)).reviveOffers()
-
-    // Launches a new task on a valid offer from the same slave
-    offerResources(List(offer2))
-    verifyTaskLaunched(driver, "o2")
-  }
-
-  test("mesos supports spark.executor.cores") {
-    val executorCores = 4
-    setBackend(Map("spark.executor.cores" -> executorCores.toString))
-
-    val executorMemory = backend.executorMemory(sc)
-    val offers = List((executorMemory * 2, executorCores + 1))
-    offerResources(offers)
-
-    val taskInfos = verifyTaskLaunched(driver, "o1")
-    assert(taskInfos.length == 1)
-
-    val cpus = backend.getResource(taskInfos.head.getResourcesList, "cpus")
-    assert(cpus == executorCores)
-  }
-
-  test("mesos supports unset spark.executor.cores") {
-    setBackend()
-
-    val executorMemory = backend.executorMemory(sc)
-    val offerCores = 10
-    offerResources(List((executorMemory * 2, offerCores)))
-
-    val taskInfos = verifyTaskLaunched(driver, "o1")
-    assert(taskInfos.length == 1)
-
-    val cpus = backend.getResource(taskInfos.head.getResourcesList, "cpus")
-    assert(cpus == offerCores)
-  }
-
-  test("mesos does not acquire more than spark.cores.max") {
-    val maxCores = 10
-    setBackend(Map("spark.cores.max" -> maxCores.toString))
-
-    val executorMemory = backend.executorMemory(sc)
-    offerResources(List((executorMemory, maxCores + 1)))
-
-    val taskInfos = verifyTaskLaunched(driver, "o1")
-    assert(taskInfos.length == 1)
-
-    val cpus = backend.getResource(taskInfos.head.getResourcesList, "cpus")
-    assert(cpus == maxCores)
-  }
-
-  test("mesos declines offers that violate attribute constraints") {
-    setBackend(Map("spark.mesos.constraints" -> "x:true"))
-    offerResources(List((backend.executorMemory(sc), 4)))
-    verifyDeclinedOffer(driver, createOfferId("o1"), true)
-  }
-
-  test("mesos declines offers with a filter when reached spark.cores.max") {
-    val maxCores = 3
-    setBackend(Map("spark.cores.max" -> maxCores.toString))
-
-    val executorMemory = backend.executorMemory(sc)
-    offerResources(List(
-      (executorMemory, maxCores + 1),
-      (executorMemory, maxCores + 1)))
-
-    verifyTaskLaunched(driver, "o1")
-    verifyDeclinedOffer(driver, createOfferId("o2"), true)
-  }
-
-  test("mesos assigns tasks round-robin on offers") {
-    val executorCores = 4
-    val maxCores = executorCores * 2
-    setBackend(Map("spark.executor.cores" -> executorCores.toString,
-      "spark.cores.max" -> maxCores.toString))
-
-    val executorMemory = backend.executorMemory(sc)
-    offerResources(List(
-      (executorMemory * 2, executorCores * 2),
-      (executorMemory * 2, executorCores * 2)))
-
-    verifyTaskLaunched(driver, "o1")
-    verifyTaskLaunched(driver, "o2")
-  }
-
-  test("mesos creates multiple executors on a single slave") {
-    val executorCores = 4
-    setBackend(Map("spark.executor.cores" -> executorCores.toString))
-
-    // offer with room for two executors
-    val executorMemory = backend.executorMemory(sc)
-    offerResources(List((executorMemory * 2, executorCores * 2)))
-
-    // verify two executors were started on a single offer
-    val taskInfos = verifyTaskLaunched(driver, "o1")
-    assert(taskInfos.length == 2)
-  }
-
-  test("mesos doesn't register twice with the same shuffle service") {
-    setBackend(Map("spark.shuffle.service.enabled" -> "true"))
-    val (mem, cpu) = (backend.executorMemory(sc), 4)
-
-    val offer1 = createOffer("o1", "s1", mem, cpu)
-    backend.resourceOffers(driver, List(offer1).asJava)
-    verifyTaskLaunched(driver, "o1")
-
-    val offer2 = createOffer("o2", "s1", mem, cpu)
-    backend.resourceOffers(driver, List(offer2).asJava)
-    verifyTaskLaunched(driver, "o2")
-
-    val status1 = createTaskStatus("0", "s1", TaskState.TASK_RUNNING)
-    backend.statusUpdate(driver, status1)
-
-    val status2 = createTaskStatus("1", "s1", TaskState.TASK_RUNNING)
-    backend.statusUpdate(driver, status2)
-    verify(externalShuffleClient, times(1))
-      .registerDriverWithShuffleService(anyString, anyInt, anyLong, anyLong)
-  }
-
-  test("Port offer decline when there is no appropriate range") {
-    setBackend(Map("spark.blockManager.port" -> "30100"))
-    val offeredPorts = (31100L, 31200L)
-    val (mem, cpu) = (backend.executorMemory(sc), 4)
-
-    val offer1 = createOffer("o1", "s1", mem, cpu, Some(offeredPorts))
-    backend.resourceOffers(driver, List(offer1).asJava)
-    verify(driver, times(1)).declineOffer(offer1.getId)
-  }
-
-  test("Port offer accepted when ephemeral ports are used") {
-    setBackend()
-    val offeredPorts = (31100L, 31200L)
-    val (mem, cpu) = (backend.executorMemory(sc), 4)
-
-    val offer1 = createOffer("o1", "s1", mem, cpu, Some(offeredPorts))
-    backend.resourceOffers(driver, List(offer1).asJava)
-    verifyTaskLaunched(driver, "o1")
-  }
-
-  test("Port offer accepted with user defined port numbers") {
-    val port = 30100
-    setBackend(Map("spark.blockManager.port" -> s"$port"))
-    val offeredPorts = (30000L, 31000L)
-    val (mem, cpu) = (backend.executorMemory(sc), 4)
-
-    val offer1 = createOffer("o1", "s1", mem, cpu, Some(offeredPorts))
-    backend.resourceOffers(driver, List(offer1).asJava)
-    val taskInfo = verifyTaskLaunched(driver, "o1")
-
-    val taskPortResources = taskInfo.head.getResourcesList.asScala.
-    find(r => r.getType == Value.Type.RANGES && r.getName == "ports")
-
-    val isPortInOffer = (r: Resource) => {
-      r.getRanges().getRangeList
-        .asScala.exists(range => range.getBegin == port && range.getEnd == 
port)
-    }
-    assert(taskPortResources.exists(isPortInOffer))
-  }
-
-  test("mesos kills an executor when told") {
-    setBackend()
-
-    val (mem, cpu) = (backend.executorMemory(sc), 4)
-
-    val offer1 = createOffer("o1", "s1", mem, cpu)
-    backend.resourceOffers(driver, List(offer1).asJava)
-    verifyTaskLaunched(driver, "o1")
-
-    backend.doKillExecutors(List("0"))
-    verify(driver, times(1)).killTask(createTaskId("0"))
-  }
-
-  test("weburi is set in created scheduler driver") {
-    setBackend()
-    val taskScheduler = mock[TaskSchedulerImpl]
-    when(taskScheduler.sc).thenReturn(sc)
-    val driver = mock[SchedulerDriver]
-    when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
-    val securityManager = mock[SecurityManager]
-
-    val backend = new MesosCoarseGrainedSchedulerBackend(
-        taskScheduler, sc, "master", securityManager) {
-      override protected def createSchedulerDriver(
-        masterUrl: String,
-        scheduler: Scheduler,
-        sparkUser: String,
-        appName: String,
-        conf: SparkConf,
-        webuiUrl: Option[String] = None,
-        checkpoint: Option[Boolean] = None,
-        failoverTimeout: Option[Double] = None,
-        frameworkId: Option[String] = None): SchedulerDriver = {
-        markRegistered()
-        assert(webuiUrl.isDefined)
-        assert(webuiUrl.get.equals("http://webui";))
-        driver
-      }
-    }
-
-    backend.start()
-  }
-
-  test("honors unset spark.mesos.containerizer") {
-    setBackend(Map("spark.mesos.executor.docker.image" -> "test"))
-
-    val (mem, cpu) = (backend.executorMemory(sc), 4)
-
-    val offer1 = createOffer("o1", "s1", mem, cpu)
-    backend.resourceOffers(driver, List(offer1).asJava)
-
-    val taskInfos = verifyTaskLaunched(driver, "o1")
-    assert(taskInfos.head.getContainer.getType == ContainerInfo.Type.DOCKER)
-  }
-
-  test("honors spark.mesos.containerizer=\"mesos\"") {
-    setBackend(Map(
-      "spark.mesos.executor.docker.image" -> "test",
-      "spark.mesos.containerizer" -> "mesos"))
-
-    val (mem, cpu) = (backend.executorMemory(sc), 4)
-
-    val offer1 = createOffer("o1", "s1", mem, cpu)
-    backend.resourceOffers(driver, List(offer1).asJava)
-
-    val taskInfos = verifyTaskLaunched(driver, "o1")
-    assert(taskInfos.head.getContainer.getType == ContainerInfo.Type.MESOS)
-  }
-
-  test("docker settings are reflected in created tasks") {
-    setBackend(Map(
-      "spark.mesos.executor.docker.image" -> "some_image",
-      "spark.mesos.executor.docker.forcePullImage" -> "true",
-      "spark.mesos.executor.docker.volumes" -> "/host_vol:/container_vol:ro",
-      "spark.mesos.executor.docker.portmaps" -> "8080:80:tcp"
-    ))
-
-    val (mem, cpu) = (backend.executorMemory(sc), 4)
-
-    val offer1 = createOffer("o1", "s1", mem, cpu)
-    backend.resourceOffers(driver, List(offer1).asJava)
-
-    val launchedTasks = verifyTaskLaunched(driver, "o1")
-    assert(launchedTasks.size == 1)
-
-    val containerInfo = launchedTasks.head.getContainer
-    assert(containerInfo.getType == ContainerInfo.Type.DOCKER)
-
-    val volumes = containerInfo.getVolumesList.asScala
-    assert(volumes.size == 1)
-
-    val volume = volumes.head
-    assert(volume.getHostPath == "/host_vol")
-    assert(volume.getContainerPath == "/container_vol")
-    assert(volume.getMode == Volume.Mode.RO)
-
-    val dockerInfo = containerInfo.getDocker
-
-    assert(dockerInfo.getImage == "some_image")
-    assert(dockerInfo.getForcePullImage)
-
-    val portMappings = dockerInfo.getPortMappingsList.asScala
-    assert(portMappings.size == 1)
-
-    val portMapping = portMappings.head
-    assert(portMapping.getHostPort == 8080)
-    assert(portMapping.getContainerPort == 80)
-    assert(portMapping.getProtocol == "tcp")
-  }
-
-  test("force-pull-image option is disabled by default") {
-    setBackend(Map(
-      "spark.mesos.executor.docker.image" -> "some_image"
-    ))
-
-    val (mem, cpu) = (backend.executorMemory(sc), 4)
-
-    val offer1 = createOffer("o1", "s1", mem, cpu)
-    backend.resourceOffers(driver, List(offer1).asJava)
-
-    val launchedTasks = verifyTaskLaunched(driver, "o1")
-    assert(launchedTasks.size == 1)
-
-    val containerInfo = launchedTasks.head.getContainer
-    assert(containerInfo.getType == ContainerInfo.Type.DOCKER)
-
-    val dockerInfo = containerInfo.getDocker
-
-    assert(dockerInfo.getImage == "some_image")
-    assert(!dockerInfo.getForcePullImage)
-  }
-
-  test("Do not call removeExecutor() after backend is stopped") {
-    setBackend()
-
-    // launches a task on a valid offer
-    val offers = List((backend.executorMemory(sc), 1))
-    offerResources(offers)
-    verifyTaskLaunched(driver, "o1")
-
-    // launches a thread simulating status update
-    val statusUpdateThread = new Thread {
-      override def run(): Unit = {
-        while (!stopCalled) {
-          Thread.sleep(100)
-        }
-
-        val status = createTaskStatus("0", "s1", TaskState.TASK_FINISHED)
-        backend.statusUpdate(driver, status)
-      }
-    }.start
-
-    backend.stop()
-    // Any method of the backend involving sending messages to the driver 
endpoint should not
-    // be called after the backend is stopped.
-    verify(driverEndpoint, 
never()).askWithRetry(isA(classOf[RemoveExecutor]))(any[ClassTag[_]])
-  }
-
-  test("mesos supports spark.executor.uri") {
-    val url = "spark.spark.spark.com"
-    setBackend(Map(
-      "spark.executor.uri" -> url
-    ), false)
-
-    val (mem, cpu) = (backend.executorMemory(sc), 4)
-
-    val offer1 = createOffer("o1", "s1", mem, cpu)
-    backend.resourceOffers(driver, List(offer1).asJava)
-
-    val launchedTasks = verifyTaskLaunched(driver, "o1")
-    assert(launchedTasks.head.getCommand.getUrisList.asScala(0).getValue == 
url)
-  }
-
-  private def verifyDeclinedOffer(driver: SchedulerDriver,
-      offerId: OfferID,
-      filter: Boolean = false): Unit = {
-    if (filter) {
-      verify(driver, times(1)).declineOffer(Matchers.eq(offerId), 
anyObject[Filters])
-    } else {
-      verify(driver, times(1)).declineOffer(Matchers.eq(offerId))
-    }
-  }
-
-  private def offerResources(offers: List[(Int, Int)], startId: Int = 1): Unit 
= {
-    val mesosOffers = offers.zipWithIndex.map {case (offer, i) =>
-      createOffer(s"o${i + startId}", s"s${i + startId}", offer._1, offer._2)}
-
-    backend.resourceOffers(driver, mesosOffers.asJava)
-  }
-
-  private def createTaskStatus(taskId: String, slaveId: String, state: 
TaskState): TaskStatus = {
-    TaskStatus.newBuilder()
-      .setTaskId(TaskID.newBuilder().setValue(taskId).build())
-      .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
-      .setState(state)
-      .build
-  }
-
-  private def createSchedulerBackend(
-      taskScheduler: TaskSchedulerImpl,
-      driver: SchedulerDriver,
-      shuffleClient: MesosExternalShuffleClient,
-      endpoint: RpcEndpointRef): MesosCoarseGrainedSchedulerBackend = {
-    val securityManager = mock[SecurityManager]
-
-    val backend = new MesosCoarseGrainedSchedulerBackend(
-        taskScheduler, sc, "master", securityManager) {
-      override protected def createSchedulerDriver(
-          masterUrl: String,
-          scheduler: Scheduler,
-          sparkUser: String,
-          appName: String,
-          conf: SparkConf,
-          webuiUrl: Option[String] = None,
-          checkpoint: Option[Boolean] = None,
-          failoverTimeout: Option[Double] = None,
-          frameworkId: Option[String] = None): SchedulerDriver = driver
-
-      override protected def getShuffleClient(): MesosExternalShuffleClient = 
shuffleClient
-
-      override protected def createDriverEndpointRef(
-          properties: ArrayBuffer[(String, String)]): RpcEndpointRef = endpoint
-
-      // override to avoid race condition with the driver thread on 
`mesosDriver`
-      override def startScheduler(newDriver: SchedulerDriver): Unit = {
-        mesosDriver = newDriver
-      }
-
-      override def stopExecutors(): Unit = {
-        stopCalled = true
-      }
-
-      markRegistered()
-    }
-    backend.start()
-    backend
-  }
-
-  private def setBackend(sparkConfVars: Map[String, String] = null,
-      setHome: Boolean = true) {
-    sparkConf = (new SparkConf)
-      .setMaster("local[*]")
-      .setAppName("test-mesos-dynamic-alloc")
-      .set("spark.mesos.driver.webui.url", "http://webui";)
-
-    if (setHome) {
-      sparkConf.setSparkHome("/path")
-    }
-
-    if (sparkConfVars != null) {
-      sparkConf.setAll(sparkConfVars)
-    }
-
-    sc = new SparkContext(sparkConf)
-
-    driver = mock[SchedulerDriver]
-    when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
-    taskScheduler = mock[TaskSchedulerImpl]
-    when(taskScheduler.sc).thenReturn(sc)
-    externalShuffleClient = mock[MesosExternalShuffleClient]
-    driverEndpoint = mock[RpcEndpointRef]
-
-    backend = createSchedulerBackend(taskScheduler, driver, 
externalShuffleClient, driverEndpoint)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
deleted file mode 100644
index fcf39f6..0000000
--- 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
+++ /dev/null
@@ -1,385 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import java.nio.ByteBuffer
-import java.util.Arrays
-import java.util.Collection
-import java.util.Collections
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.mesos.{Protos, Scheduler, SchedulerDriver}
-import org.apache.mesos.Protos._
-import org.apache.mesos.Protos.Value.Scalar
-import org.mockito.{ArgumentCaptor, Matchers}
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, 
SparkFunSuite}
-import org.apache.spark.executor.MesosExecutorBackend
-import org.apache.spark.scheduler.{LiveListenerBus, SparkListenerExecutorAdded,
-  TaskDescription, TaskSchedulerImpl, WorkerOffer}
-import org.apache.spark.scheduler.cluster.ExecutorInfo
-
-class MesosFineGrainedSchedulerBackendSuite
-  extends SparkFunSuite with LocalSparkContext with MockitoSugar {
-
-  test("weburi is set in created scheduler driver") {
-    val conf = new SparkConf
-    conf.set("spark.mesos.driver.webui.url", "http://webui";)
-    conf.set("spark.app.name", "name1")
-
-    val sc = mock[SparkContext]
-    when(sc.conf).thenReturn(conf)
-    when(sc.sparkUser).thenReturn("sparkUser1")
-    when(sc.appName).thenReturn("appName1")
-
-    val taskScheduler = mock[TaskSchedulerImpl]
-    val driver = mock[SchedulerDriver]
-    when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
-
-    val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, 
"master") {
-      override protected def createSchedulerDriver(
-        masterUrl: String,
-        scheduler: Scheduler,
-        sparkUser: String,
-        appName: String,
-        conf: SparkConf,
-        webuiUrl: Option[String] = None,
-        checkpoint: Option[Boolean] = None,
-        failoverTimeout: Option[Double] = None,
-        frameworkId: Option[String] = None): SchedulerDriver = {
-        markRegistered()
-        assert(webuiUrl.isDefined)
-        assert(webuiUrl.get.equals("http://webui";))
-        driver
-      }
-    }
-
-    backend.start()
-  }
-
-  test("Use configured mesosExecutor.cores for ExecutorInfo") {
-    val mesosExecutorCores = 3
-    val conf = new SparkConf
-    conf.set("spark.mesos.mesosExecutor.cores", mesosExecutorCores.toString)
-
-    val listenerBus = mock[LiveListenerBus]
-    listenerBus.post(
-      SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, 
Map.empty)))
-
-    val sc = mock[SparkContext]
-    when(sc.getSparkHome()).thenReturn(Option("/spark-home"))
-
-    when(sc.conf).thenReturn(conf)
-    when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
-    when(sc.executorMemory).thenReturn(100)
-    when(sc.listenerBus).thenReturn(listenerBus)
-    val taskScheduler = mock[TaskSchedulerImpl]
-    when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
-
-    val mesosSchedulerBackend = new 
MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master")
-
-    val resources = Arrays.asList(
-      mesosSchedulerBackend.createResource("cpus", 4),
-      mesosSchedulerBackend.createResource("mem", 1024))
-    // uri is null.
-    val (executorInfo, _) = 
mesosSchedulerBackend.createExecutorInfo(resources, "test-id")
-    val executorResources = executorInfo.getResourcesList
-    val cpus = 
executorResources.asScala.find(_.getName.equals("cpus")).get.getScalar.getValue
-
-    assert(cpus === mesosExecutorCores)
-  }
-
-  test("check spark-class location correctly") {
-    val conf = new SparkConf
-    conf.set("spark.mesos.executor.home", "/mesos-home")
-
-    val listenerBus = mock[LiveListenerBus]
-    listenerBus.post(
-      SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, 
Map.empty)))
-
-    val sc = mock[SparkContext]
-    when(sc.getSparkHome()).thenReturn(Option("/spark-home"))
-
-    when(sc.conf).thenReturn(conf)
-    when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
-    when(sc.executorMemory).thenReturn(100)
-    when(sc.listenerBus).thenReturn(listenerBus)
-    val taskScheduler = mock[TaskSchedulerImpl]
-    when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
-
-    val mesosSchedulerBackend = new 
MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master")
-
-    val resources = Arrays.asList(
-      mesosSchedulerBackend.createResource("cpus", 4),
-      mesosSchedulerBackend.createResource("mem", 1024))
-    // uri is null.
-    val (executorInfo, _) = 
mesosSchedulerBackend.createExecutorInfo(resources, "test-id")
-    assert(executorInfo.getCommand.getValue ===
-      s" /mesos-home/bin/spark-class ${classOf[MesosExecutorBackend].getName}")
-
-    // uri exists.
-    conf.set("spark.executor.uri", "hdfs:///test-app-1.0.0.tgz")
-    val (executorInfo1, _) = 
mesosSchedulerBackend.createExecutorInfo(resources, "test-id")
-    assert(executorInfo1.getCommand.getValue ===
-      s"cd test-app-1*;  ./bin/spark-class 
${classOf[MesosExecutorBackend].getName}")
-  }
-
-  test("spark docker properties correctly populate the DockerInfo message") {
-    val taskScheduler = mock[TaskSchedulerImpl]
-
-    val conf = new SparkConf()
-      .set("spark.mesos.executor.docker.image", "spark/mock")
-      .set("spark.mesos.executor.docker.forcePullImage", "true")
-      .set("spark.mesos.executor.docker.volumes", 
"/a,/b:/b,/c:/c:rw,/d:ro,/e:/e:ro")
-      .set("spark.mesos.executor.docker.portmaps", "80:8080,53:53:tcp")
-
-    val listenerBus = mock[LiveListenerBus]
-    listenerBus.post(
-      SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, 
Map.empty)))
-
-    val sc = mock[SparkContext]
-    when(sc.executorMemory).thenReturn(100)
-    when(sc.getSparkHome()).thenReturn(Option("/spark-home"))
-    when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
-    when(sc.conf).thenReturn(conf)
-    when(sc.listenerBus).thenReturn(listenerBus)
-
-    val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, 
"master")
-
-    val (execInfo, _) = backend.createExecutorInfo(
-      Arrays.asList(backend.createResource("cpus", 4)), "mockExecutor")
-    assert(execInfo.getContainer.getDocker.getImage.equals("spark/mock"))
-    assert(execInfo.getContainer.getDocker.getForcePullImage.equals(true))
-    val portmaps = execInfo.getContainer.getDocker.getPortMappingsList
-    assert(portmaps.get(0).getHostPort.equals(80))
-    assert(portmaps.get(0).getContainerPort.equals(8080))
-    assert(portmaps.get(0).getProtocol.equals("tcp"))
-    assert(portmaps.get(1).getHostPort.equals(53))
-    assert(portmaps.get(1).getContainerPort.equals(53))
-    assert(portmaps.get(1).getProtocol.equals("tcp"))
-    val volumes = execInfo.getContainer.getVolumesList
-    assert(volumes.get(0).getContainerPath.equals("/a"))
-    assert(volumes.get(0).getMode.equals(Volume.Mode.RW))
-    assert(volumes.get(1).getContainerPath.equals("/b"))
-    assert(volumes.get(1).getHostPath.equals("/b"))
-    assert(volumes.get(1).getMode.equals(Volume.Mode.RW))
-    assert(volumes.get(2).getContainerPath.equals("/c"))
-    assert(volumes.get(2).getHostPath.equals("/c"))
-    assert(volumes.get(2).getMode.equals(Volume.Mode.RW))
-    assert(volumes.get(3).getContainerPath.equals("/d"))
-    assert(volumes.get(3).getMode.equals(Volume.Mode.RO))
-    assert(volumes.get(4).getContainerPath.equals("/e"))
-    assert(volumes.get(4).getHostPath.equals("/e"))
-    assert(volumes.get(4).getMode.equals(Volume.Mode.RO))
-  }
-
-  test("mesos resource offers result in launching tasks") {
-    def createOffer(id: Int, mem: Int, cpu: Int): Offer = {
-      val builder = Offer.newBuilder()
-      builder.addResourcesBuilder()
-        .setName("mem")
-        .setType(Value.Type.SCALAR)
-        .setScalar(Scalar.newBuilder().setValue(mem))
-      builder.addResourcesBuilder()
-        .setName("cpus")
-        .setType(Value.Type.SCALAR)
-        .setScalar(Scalar.newBuilder().setValue(cpu))
-      builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build())
-        .setFrameworkId(FrameworkID.newBuilder().setValue("f1"))
-        .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}"))
-        .setHostname(s"host${id.toString}").build()
-    }
-
-    val driver = mock[SchedulerDriver]
-    val taskScheduler = mock[TaskSchedulerImpl]
-
-    val listenerBus = mock[LiveListenerBus]
-    listenerBus.post(
-      SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, 
Map.empty)))
-
-    val sc = mock[SparkContext]
-    when(sc.executorMemory).thenReturn(100)
-    when(sc.getSparkHome()).thenReturn(Option("/path"))
-    when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
-    when(sc.conf).thenReturn(new SparkConf)
-    when(sc.listenerBus).thenReturn(listenerBus)
-
-    val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, 
"master")
-
-    val minMem = backend.executorMemory(sc)
-    val minCpu = 4
-
-    val mesosOffers = new java.util.ArrayList[Offer]
-    mesosOffers.add(createOffer(1, minMem, minCpu))
-    mesosOffers.add(createOffer(2, minMem - 1, minCpu))
-    mesosOffers.add(createOffer(3, minMem, minCpu))
-
-    val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](2)
-    expectedWorkerOffers.append(new WorkerOffer(
-      mesosOffers.get(0).getSlaveId.getValue,
-      mesosOffers.get(0).getHostname,
-      (minCpu - backend.mesosExecutorCores).toInt
-    ))
-    expectedWorkerOffers.append(new WorkerOffer(
-      mesosOffers.get(2).getSlaveId.getValue,
-      mesosOffers.get(2).getHostname,
-      (minCpu - backend.mesosExecutorCores).toInt
-    ))
-    val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, 
ByteBuffer.wrap(new Array[Byte](0)))
-    
when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc)))
-    when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
-
-    val capture = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]])
-    when(
-      driver.launchTasks(
-        Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
-        capture.capture(),
-        any(classOf[Filters])
-      )
-    ).thenReturn(Status.valueOf(1))
-    
when(driver.declineOffer(mesosOffers.get(1).getId)).thenReturn(Status.valueOf(1))
-    
when(driver.declineOffer(mesosOffers.get(2).getId)).thenReturn(Status.valueOf(1))
-
-    backend.resourceOffers(driver, mesosOffers)
-
-    verify(driver, times(1)).launchTasks(
-      Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
-      capture.capture(),
-      any(classOf[Filters])
-    )
-    verify(driver, times(1)).declineOffer(mesosOffers.get(1).getId)
-    verify(driver, times(1)).declineOffer(mesosOffers.get(2).getId)
-    assert(capture.getValue.size() === 1)
-    val taskInfo = capture.getValue.iterator().next()
-    assert(taskInfo.getName.equals("n1"))
-    val cpus = taskInfo.getResourcesList.get(0)
-    assert(cpus.getName.equals("cpus"))
-    assert(cpus.getScalar.getValue.equals(2.0))
-    assert(taskInfo.getSlaveId.getValue.equals("s1"))
-
-    // Unwanted resources offered on an existing node. Make sure they are 
declined
-    val mesosOffers2 = new java.util.ArrayList[Offer]
-    mesosOffers2.add(createOffer(1, minMem, minCpu))
-    reset(taskScheduler)
-    reset(driver)
-    
when(taskScheduler.resourceOffers(any(classOf[Seq[WorkerOffer]]))).thenReturn(Seq(Seq()))
-    when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
-    
when(driver.declineOffer(mesosOffers2.get(0).getId)).thenReturn(Status.valueOf(1))
-
-    backend.resourceOffers(driver, mesosOffers2)
-    verify(driver, times(1)).declineOffer(mesosOffers2.get(0).getId)
-  }
-
-  test("can handle multiple roles") {
-    val driver = mock[SchedulerDriver]
-    val taskScheduler = mock[TaskSchedulerImpl]
-
-    val listenerBus = mock[LiveListenerBus]
-    listenerBus.post(
-      SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, 
Map.empty)))
-
-    val sc = mock[SparkContext]
-    when(sc.executorMemory).thenReturn(100)
-    when(sc.getSparkHome()).thenReturn(Option("/path"))
-    when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
-    when(sc.conf).thenReturn(new SparkConf)
-    when(sc.listenerBus).thenReturn(listenerBus)
-
-    val id = 1
-    val builder = Offer.newBuilder()
-    builder.addResourcesBuilder()
-      .setName("mem")
-      .setType(Value.Type.SCALAR)
-      .setRole("prod")
-      .setScalar(Scalar.newBuilder().setValue(500))
-    builder.addResourcesBuilder()
-      .setName("cpus")
-      .setRole("prod")
-      .setType(Value.Type.SCALAR)
-      .setScalar(Scalar.newBuilder().setValue(1))
-    builder.addResourcesBuilder()
-      .setName("mem")
-      .setRole("dev")
-      .setType(Value.Type.SCALAR)
-      .setScalar(Scalar.newBuilder().setValue(600))
-    builder.addResourcesBuilder()
-      .setName("cpus")
-      .setRole("dev")
-      .setType(Value.Type.SCALAR)
-      .setScalar(Scalar.newBuilder().setValue(2))
-    val offer = 
builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build())
-      .setFrameworkId(FrameworkID.newBuilder().setValue("f1"))
-      .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}"))
-      .setHostname(s"host${id.toString}").build()
-
-    val mesosOffers = new java.util.ArrayList[Offer]
-    mesosOffers.add(offer)
-
-    val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, 
"master")
-
-    val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](1)
-    expectedWorkerOffers.append(new WorkerOffer(
-      mesosOffers.get(0).getSlaveId.getValue,
-      mesosOffers.get(0).getHostname,
-      2 // Deducting 1 for executor
-    ))
-
-    val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, 
ByteBuffer.wrap(new Array[Byte](0)))
-    
when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc)))
-    when(taskScheduler.CPUS_PER_TASK).thenReturn(1)
-
-    val capture = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]])
-    when(
-      driver.launchTasks(
-        Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
-        capture.capture(),
-        any(classOf[Filters])
-      )
-    ).thenReturn(Status.valueOf(1))
-
-    backend.resourceOffers(driver, mesosOffers)
-
-    verify(driver, times(1)).launchTasks(
-      Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
-      capture.capture(),
-      any(classOf[Filters])
-    )
-
-    assert(capture.getValue.size() === 1)
-    val taskInfo = capture.getValue.iterator().next()
-    assert(taskInfo.getName.equals("n1"))
-    assert(taskInfo.getResourcesCount === 1)
-    val cpusDev = taskInfo.getResourcesList.get(0)
-    assert(cpusDev.getName.equals("cpus"))
-    assert(cpusDev.getScalar.getValue.equals(1.0))
-    assert(cpusDev.getRole.equals("dev"))
-    val executorResources = taskInfo.getExecutor.getResourcesList.asScala
-    assert(executorResources.exists { r =>
-      r.getName.equals("mem") && r.getScalar.getValue.equals(484.0) && 
r.getRole.equals("prod")
-    })
-    assert(executorResources.exists { r =>
-      r.getName.equals("cpus") && r.getScalar.getValue.equals(1.0) && 
r.getRole.equals("prod")
-    })
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
deleted file mode 100644
index e3d7949..0000000
--- 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import scala.collection.JavaConverters._
-import scala.language.reflectiveCalls
-
-import org.apache.mesos.Protos.{Resource, Value}
-import org.mockito.Mockito._
-import org.scalatest._
-import org.scalatest.mock.MockitoSugar
-
-import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
-
-class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with 
MockitoSugar {
-
-  // scalastyle:off structural.type
-  // this is the documented way of generating fixtures in scalatest
-  def fixture: Object {val sc: SparkContext; val sparkConf: SparkConf} = new {
-    val sparkConf = new SparkConf
-    val sc = mock[SparkContext]
-    when(sc.conf).thenReturn(sparkConf)
-  }
-
-  private def createTestPortResource(range: (Long, Long), role: Option[String] 
= None): Resource = {
-    val rangeValue = Value.Range.newBuilder()
-    rangeValue.setBegin(range._1)
-    rangeValue.setEnd(range._2)
-    val builder = Resource.newBuilder()
-      .setName("ports")
-      .setType(Value.Type.RANGES)
-      .setRanges(Value.Ranges.newBuilder().addRange(rangeValue))
-
-    role.foreach { r => builder.setRole(r) }
-    builder.build()
-  }
-
-  private def rangesResourcesToTuple(resources: List[Resource]): List[(Long, 
Long)] = {
-    resources.flatMap{resource => resource.getRanges.getRangeList
-      .asScala.map(range => (range.getBegin, range.getEnd))}
-  }
-
-  def arePortsEqual(array1: Array[(Long, Long)], array2: Array[(Long, Long)])
-    : Boolean = {
-    array1.sortBy(identity).deep == array2.sortBy(identity).deep
-  }
-
-  def arePortsEqual(array1: Array[Long], array2: Array[Long])
-    : Boolean = {
-    array1.sortBy(identity).deep == array2.sortBy(identity).deep
-  }
-
-  def getRangesFromResources(resources: List[Resource]): List[(Long, Long)] = {
-    resources.flatMap{ resource =>
-      resource.getRanges.getRangeList.asScala.toList.map{
-        range => (range.getBegin, range.getEnd)}}
-  }
-
-  val utils = new MesosSchedulerUtils { }
-  // scalastyle:on structural.type
-
-  test("use at-least minimum overhead") {
-    val f = fixture
-    when(f.sc.executorMemory).thenReturn(512)
-    utils.executorMemory(f.sc) shouldBe 896
-  }
-
-  test("use overhead if it is greater than minimum value") {
-    val f = fixture
-    when(f.sc.executorMemory).thenReturn(4096)
-    utils.executorMemory(f.sc) shouldBe 4505
-  }
-
-  test("use spark.mesos.executor.memoryOverhead (if set)") {
-    val f = fixture
-    when(f.sc.executorMemory).thenReturn(1024)
-    f.sparkConf.set("spark.mesos.executor.memoryOverhead", "512")
-    utils.executorMemory(f.sc) shouldBe 1536
-  }
-
-  test("parse a non-empty constraint string correctly") {
-    val expectedMap = Map(
-      "os" -> Set("centos7"),
-      "zone" -> Set("us-east-1a", "us-east-1b")
-    )
-    utils.parseConstraintString("os:centos7;zone:us-east-1a,us-east-1b") 
should be (expectedMap)
-  }
-
-  test("parse an empty constraint string correctly") {
-    utils.parseConstraintString("") shouldBe Map()
-  }
-
-  test("throw an exception when the input is malformed") {
-    an[IllegalArgumentException] should be thrownBy
-      utils.parseConstraintString("os;zone:us-east")
-  }
-
-  test("empty values for attributes' constraints matches all values") {
-    val constraintsStr = "os:"
-    val parsedConstraints = utils.parseConstraintString(constraintsStr)
-
-    parsedConstraints shouldBe Map("os" -> Set())
-
-    val zoneSet = 
Value.Set.newBuilder().addItem("us-east-1a").addItem("us-east-1b").build()
-    val noOsOffer = Map("zone" -> zoneSet)
-    val centosOffer = Map("os" -> 
Value.Text.newBuilder().setValue("centos").build())
-    val ubuntuOffer = Map("os" -> 
Value.Text.newBuilder().setValue("ubuntu").build())
-
-    utils.matchesAttributeRequirements(parsedConstraints, noOsOffer) shouldBe 
false
-    utils.matchesAttributeRequirements(parsedConstraints, centosOffer) 
shouldBe true
-    utils.matchesAttributeRequirements(parsedConstraints, ubuntuOffer) 
shouldBe true
-  }
-
-  test("subset match is performed for set attributes") {
-    val supersetConstraint = Map(
-      "os" -> Value.Text.newBuilder().setValue("ubuntu").build(),
-      "zone" -> Value.Set.newBuilder()
-        .addItem("us-east-1a")
-        .addItem("us-east-1b")
-        .addItem("us-east-1c")
-        .build())
-
-    val zoneConstraintStr = "os:;zone:us-east-1a,us-east-1c"
-    val parsedConstraints = utils.parseConstraintString(zoneConstraintStr)
-
-    utils.matchesAttributeRequirements(parsedConstraints, supersetConstraint) 
shouldBe true
-  }
-
-  test("less than equal match is performed on scalar attributes") {
-    val offerAttribs = Map("gpus" -> 
Value.Scalar.newBuilder().setValue(3).build())
-
-    val ltConstraint = utils.parseConstraintString("gpus:2")
-    val eqConstraint = utils.parseConstraintString("gpus:3")
-    val gtConstraint = utils.parseConstraintString("gpus:4")
-
-    utils.matchesAttributeRequirements(ltConstraint, offerAttribs) shouldBe 
true
-    utils.matchesAttributeRequirements(eqConstraint, offerAttribs) shouldBe 
true
-    utils.matchesAttributeRequirements(gtConstraint, offerAttribs) shouldBe 
false
-  }
-
-  test("contains match is performed for range attributes") {
-    val offerAttribs = Map("ports" -> 
Value.Range.newBuilder().setBegin(7000).setEnd(8000).build())
-    val ltConstraint = utils.parseConstraintString("ports:6000")
-    val eqConstraint = utils.parseConstraintString("ports:7500")
-    val gtConstraint = utils.parseConstraintString("ports:8002")
-    val multiConstraint = utils.parseConstraintString("ports:5000,7500,8300")
-
-    utils.matchesAttributeRequirements(ltConstraint, offerAttribs) shouldBe 
false
-    utils.matchesAttributeRequirements(eqConstraint, offerAttribs) shouldBe 
true
-    utils.matchesAttributeRequirements(gtConstraint, offerAttribs) shouldBe 
false
-    utils.matchesAttributeRequirements(multiConstraint, offerAttribs) shouldBe 
true
-  }
-
-  test("equality match is performed for text attributes") {
-    val offerAttribs = Map("os" -> 
Value.Text.newBuilder().setValue("centos7").build())
-
-    val trueConstraint = utils.parseConstraintString("os:centos7")
-    val falseConstraint = utils.parseConstraintString("os:ubuntu")
-
-    utils.matchesAttributeRequirements(trueConstraint, offerAttribs) shouldBe 
true
-    utils.matchesAttributeRequirements(falseConstraint, offerAttribs) shouldBe 
false
-  }
-
-  test("Port reservation is done correctly with user specified ports only") {
-    val conf = new SparkConf()
-    conf.set("spark.executor.port", "3000" )
-    conf.set("spark.blockManager.port", "4000")
-    val portResource = createTestPortResource((3000, 5000), Some("my_role"))
-
-    val (resourcesLeft, resourcesToBeUsed) = utils
-      .partitionPortResources(List(3000, 4000), List(portResource))
-    resourcesToBeUsed.length shouldBe 2
-
-    val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => 
r._1}.toArray
-
-    portsToUse.length shouldBe 2
-    arePortsEqual(portsToUse, Array(3000L, 4000L)) shouldBe true
-
-    val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed)
-
-    val expectedUSed = Array((3000L, 3000L), (4000L, 4000L))
-
-    arePortsEqual(portRangesToBeUsed.toArray, expectedUSed) shouldBe true
-  }
-
-  test("Port reservation is done correctly with some user specified ports 
(spark.executor.port)") {
-    val conf = new SparkConf()
-    conf.set("spark.executor.port", "3100" )
-    val portResource = createTestPortResource((3000, 5000), Some("my_role"))
-
-    val (resourcesLeft, resourcesToBeUsed) = utils
-      .partitionPortResources(List(3100), List(portResource))
-
-    val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
-
-    portsToUse.length shouldBe 1
-    portsToUse.contains(3100) shouldBe true
-  }
-
-  test("Port reservation is done correctly with all random ports") {
-    val conf = new SparkConf()
-    val portResource = createTestPortResource((3000L, 5000L), Some("my_role"))
-
-    val (resourcesLeft, resourcesToBeUsed) = utils
-      .partitionPortResources(List(), List(portResource))
-    val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
-
-    portsToUse.isEmpty shouldBe true
-  }
-
-  test("Port reservation is done correctly with user specified ports only - 
multiple ranges") {
-    val conf = new SparkConf()
-    conf.set("spark.executor.port", "2100" )
-    conf.set("spark.blockManager.port", "4000")
-    val portResourceList = List(createTestPortResource((3000, 5000), 
Some("my_role")),
-      createTestPortResource((2000, 2500), Some("other_role")))
-    val (resourcesLeft, resourcesToBeUsed) = utils
-      .partitionPortResources(List(2100, 4000), portResourceList)
-    val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
-
-    portsToUse.length shouldBe 2
-    val portsRangesLeft = rangesResourcesToTuple(resourcesLeft)
-    val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed)
-
-    val expectedUsed = Array((2100L, 2100L), (4000L, 4000L))
-
-    arePortsEqual(portsToUse.toArray, Array(2100L, 4000L)) shouldBe true
-    arePortsEqual(portRangesToBeUsed.toArray, expectedUsed) shouldBe true
-  }
-
-  test("Port reservation is done correctly with all random ports - multiple 
ranges") {
-    val conf = new SparkConf()
-    val portResourceList = List(createTestPortResource((3000, 5000), 
Some("my_role")),
-      createTestPortResource((2000, 2500), Some("other_role")))
-    val (resourcesLeft, resourcesToBeUsed) = utils
-      .partitionPortResources(List(), portResourceList)
-    val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
-    portsToUse.isEmpty shouldBe true
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala
deleted file mode 100644
index 5a81bb3..0000000
--- 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import java.nio.ByteBuffer
-
-import org.apache.spark.SparkFunSuite
-
-class MesosTaskLaunchDataSuite extends SparkFunSuite {
-  test("serialize and deserialize data must be same") {
-    val serializedTask = ByteBuffer.allocate(40)
-    (Range(100, 110).map(serializedTask.putInt(_)))
-    serializedTask.rewind
-    val attemptNumber = 100
-    val byteString = MesosTaskLaunchData(serializedTask, 
attemptNumber).toByteString
-    serializedTask.rewind
-    val mesosTaskLaunchData = MesosTaskLaunchData.fromByteString(byteString)
-    assert(mesosTaskLaunchData.attemptNumber == attemptNumber)
-    assert(mesosTaskLaunchData.serializedTask.equals(serializedTask))
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala 
b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
deleted file mode 100644
index fa9406f..0000000
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import java.util.Collections
-
-import scala.collection.JavaConverters._
-
-import org.apache.mesos.Protos._
-import org.apache.mesos.Protos.Value.{Range => MesosRange, Ranges, Scalar}
-import org.apache.mesos.SchedulerDriver
-import org.mockito.{ArgumentCaptor, Matchers}
-import org.mockito.Mockito._
-
-object Utils {
-  def createOffer(
-      offerId: String,
-      slaveId: String,
-      mem: Int,
-      cpu: Int,
-      ports: Option[(Long, Long)] = None): Offer = {
-    val builder = Offer.newBuilder()
-    builder.addResourcesBuilder()
-      .setName("mem")
-      .setType(Value.Type.SCALAR)
-      .setScalar(Scalar.newBuilder().setValue(mem))
-    builder.addResourcesBuilder()
-      .setName("cpus")
-      .setType(Value.Type.SCALAR)
-      .setScalar(Scalar.newBuilder().setValue(cpu))
-    ports.foreach { resourcePorts =>
-      builder.addResourcesBuilder()
-        .setName("ports")
-        .setType(Value.Type.RANGES)
-        .setRanges(Ranges.newBuilder().addRange(MesosRange.newBuilder()
-          .setBegin(resourcePorts._1).setEnd(resourcePorts._2).build()))
-    }
-    builder.setId(createOfferId(offerId))
-      .setFrameworkId(FrameworkID.newBuilder()
-        .setValue("f1"))
-      .setSlaveId(SlaveID.newBuilder().setValue(slaveId))
-      .setHostname(s"host${slaveId}")
-      .build()
-  }
-
-  def verifyTaskLaunched(driver: SchedulerDriver, offerId: String): 
List[TaskInfo] = {
-    val captor = 
ArgumentCaptor.forClass(classOf[java.util.Collection[TaskInfo]])
-    verify(driver, times(1)).launchTasks(
-      Matchers.eq(Collections.singleton(createOfferId(offerId))),
-      captor.capture())
-    captor.getValue.asScala.toList
-  }
-
-  def createOfferId(offerId: String): OfferID = {
-    OfferID.newBuilder().setValue(offerId).build()
-  }
-
-  def createSlaveId(slaveId: String): SlaveID = {
-    SlaveID.newBuilder().setValue(slaveId).build()
-  }
-
-  def createExecutorId(executorId: String): ExecutorID = {
-    ExecutorID.newBuilder().setValue(executorId).build()
-  }
-
-  def createTaskId(taskId: String): TaskID = {
-    TaskID.newBuilder().setValue(taskId).build()
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/dev/create-release/release-build.sh
----------------------------------------------------------------------
diff --git a/dev/create-release/release-build.sh 
b/dev/create-release/release-build.sh
index 2833dc7..96f9b57 100755
--- a/dev/create-release/release-build.sh
+++ b/dev/create-release/release-build.sh
@@ -80,7 +80,7 @@ NEXUS_PROFILE=d63f592e7eac0 # Profile for Spark staging 
uploads
 BASE_DIR=$(pwd)
 
 MVN="build/mvn --force"
-PUBLISH_PROFILES="-Pyarn -Phive -Phive-thriftserver -Phadoop-2.2"
+PUBLISH_PROFILES="-Pmesos -Pyarn -Phive -Phive-thriftserver -Phadoop-2.2"
 PUBLISH_PROFILES="$PUBLISH_PROFILES -Pspark-ganglia-lgpl -Pkinesis-asl"
 
 rm -rf spark
@@ -186,12 +186,13 @@ if [[ "$1" == "package" ]]; then
 
   # We increment the Zinc port each time to avoid OOM's and other craziness if 
multiple builds
   # share the same Zinc server.
-  make_binary_release "hadoop2.3" "-Psparkr -Phadoop-2.3 -Phive 
-Phive-thriftserver -Pyarn" "3033" &
-  make_binary_release "hadoop2.4" "-Psparkr -Phadoop-2.4 -Phive 
-Phive-thriftserver -Pyarn" "3034" &
-  make_binary_release "hadoop2.6" "-Psparkr -Phadoop-2.6 -Phive 
-Phive-thriftserver -Pyarn" "3035" &
-  make_binary_release "hadoop2.7" "-Psparkr -Phadoop-2.7 -Phive 
-Phive-thriftserver -Pyarn" "3036" &
-  make_binary_release "hadoop2.4-without-hive" "-Psparkr -Phadoop-2.4 -Pyarn" 
"3037" &
-  make_binary_release "without-hadoop" "-Psparkr -Phadoop-provided -Pyarn" 
"3038" &
+  FLAGS="-Psparkr -Phive -Phive-thriftserver -Pyarn -Pmesos"
+  make_binary_release "hadoop2.3" "-Phadoop2.3 $FLAGS" "3033" &
+  make_binary_release "hadoop2.4" "-Phadoop2.4 $FLAGS" "3034" &
+  make_binary_release "hadoop2.6" "-Phadoop2.6 $FLAGS" "3035" &
+  make_binary_release "hadoop2.7" "-Phadoop2.7 $FLAGS" "3036" &
+  make_binary_release "hadoop2.4-without-hive" "-Psparkr -Phadoop-2.4 -Pyarn 
-Pmesos" "3037" &
+  make_binary_release "without-hadoop" "-Psparkr -Phadoop-provided -Pyarn 
-Pmesos" "3038" &
   wait
   rm -rf spark-$SPARK_VERSION-bin-*/
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/dev/lint-java
----------------------------------------------------------------------
diff --git a/dev/lint-java b/dev/lint-java
index fe8ab83..c2e8053 100755
--- a/dev/lint-java
+++ b/dev/lint-java
@@ -20,7 +20,7 @@
 SCRIPT_DIR="$( cd "$( dirname "$0" )" && pwd )"
 SPARK_ROOT_DIR="$(dirname $SCRIPT_DIR)"
 
-ERRORS=$($SCRIPT_DIR/../build/mvn -Pkinesis-asl -Pyarn -Phive 
-Phive-thriftserver checkstyle:check | grep ERROR)
+ERRORS=$($SCRIPT_DIR/../build/mvn -Pkinesis-asl -Pmesos -Pyarn -Phive 
-Phive-thriftserver checkstyle:check | grep ERROR)
 
 if test ! -z "$ERRORS"; then
     echo -e "Checkstyle checks failed at following occurrences:\n$ERRORS"


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to