http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala deleted file mode 100644 index bb6f6b3..0000000 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ /dev/null @@ -1,745 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster.mesos - -import java.io.File -import java.util.{Collections, Date, List => JList} - -import scala.collection.JavaConverters._ -import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer - -import org.apache.mesos.{Scheduler, SchedulerDriver} -import org.apache.mesos.Protos.{TaskState => MesosTaskState, _} -import org.apache.mesos.Protos.Environment.Variable -import org.apache.mesos.Protos.TaskStatus.Reason - -import org.apache.spark.{SecurityManager, SparkConf, SparkException, TaskState} -import org.apache.spark.deploy.mesos.MesosDriverDescription -import org.apache.spark.deploy.rest.{CreateSubmissionResponse, KillSubmissionResponse, SubmissionStatusResponse} -import org.apache.spark.metrics.MetricsSystem -import org.apache.spark.util.Utils - -/** - * Tracks the current state of a Mesos Task that runs a Spark driver. - * @param driverDescription Submitted driver description from - * [[org.apache.spark.deploy.rest.mesos.MesosRestServer]] - * @param taskId Mesos TaskID generated for the task - * @param slaveId Slave ID that the task is assigned to - * @param mesosTaskStatus The last known task status update. - * @param startDate The date the task was launched - * @param finishDate The date the task finished - * @param frameworkId Mesos framework ID the task registers with - */ -private[spark] class MesosClusterSubmissionState( - val driverDescription: MesosDriverDescription, - val taskId: TaskID, - val slaveId: SlaveID, - var mesosTaskStatus: Option[TaskStatus], - var startDate: Date, - var finishDate: Option[Date], - val frameworkId: String) - extends Serializable { - - def copy(): MesosClusterSubmissionState = { - new MesosClusterSubmissionState( - driverDescription, taskId, slaveId, mesosTaskStatus, startDate, finishDate, frameworkId) - } -} - -/** - * Tracks the retry state of a driver, which includes the next time it should be scheduled - * and necessary information to do exponential backoff. - * This class is not thread-safe, and we expect the caller to handle synchronizing state. - * - * @param lastFailureStatus Last Task status when it failed. - * @param retries Number of times it has been retried. - * @param nextRetry Time at which it should be retried next - * @param waitTime The amount of time driver is scheduled to wait until next retry. - */ -private[spark] class MesosClusterRetryState( - val lastFailureStatus: TaskStatus, - val retries: Int, - val nextRetry: Date, - val waitTime: Int) extends Serializable { - def copy(): MesosClusterRetryState = - new MesosClusterRetryState(lastFailureStatus, retries, nextRetry, waitTime) -} - -/** - * The full state of the cluster scheduler, currently being used for displaying - * information on the UI. - * - * @param frameworkId Mesos Framework id for the cluster scheduler. - * @param masterUrl The Mesos master url - * @param queuedDrivers All drivers queued to be launched - * @param launchedDrivers All launched or running drivers - * @param finishedDrivers All terminated drivers - * @param pendingRetryDrivers All drivers pending to be retried - */ -private[spark] class MesosClusterSchedulerState( - val frameworkId: String, - val masterUrl: Option[String], - val queuedDrivers: Iterable[MesosDriverDescription], - val launchedDrivers: Iterable[MesosClusterSubmissionState], - val finishedDrivers: Iterable[MesosClusterSubmissionState], - val pendingRetryDrivers: Iterable[MesosDriverDescription]) - -/** - * The full state of a Mesos driver, that is being used to display driver information on the UI. - */ -private[spark] class MesosDriverState( - val state: String, - val description: MesosDriverDescription, - val submissionState: Option[MesosClusterSubmissionState] = None) - -/** - * A Mesos scheduler that is responsible for launching submitted Spark drivers in cluster mode - * as Mesos tasks in a Mesos cluster. - * All drivers are launched asynchronously by the framework, which will eventually be launched - * by one of the slaves in the cluster. The results of the driver will be stored in slave's task - * sandbox which is accessible by visiting the Mesos UI. - * This scheduler supports recovery by persisting all its state and performs task reconciliation - * on recover, which gets all the latest state for all the drivers from Mesos master. - */ -private[spark] class MesosClusterScheduler( - engineFactory: MesosClusterPersistenceEngineFactory, - conf: SparkConf) - extends Scheduler with MesosSchedulerUtils { - var frameworkUrl: String = _ - private val metricsSystem = - MetricsSystem.createMetricsSystem("mesos_cluster", conf, new SecurityManager(conf)) - private val master = conf.get("spark.master") - private val appName = conf.get("spark.app.name") - private val queuedCapacity = conf.getInt("spark.mesos.maxDrivers", 200) - private val retainedDrivers = conf.getInt("spark.mesos.retainedDrivers", 200) - private val maxRetryWaitTime = conf.getInt("spark.mesos.cluster.retry.wait.max", 60) // 1 minute - private val schedulerState = engineFactory.createEngine("scheduler") - private val stateLock = new Object() - private val finishedDrivers = - new mutable.ArrayBuffer[MesosClusterSubmissionState](retainedDrivers) - private var frameworkId: String = null - // Holds all the launched drivers and current launch state, keyed by driver id. - private val launchedDrivers = new mutable.HashMap[String, MesosClusterSubmissionState]() - // Holds a map of driver id to expected slave id that is passed to Mesos for reconciliation. - // All drivers that are loaded after failover are added here, as we need get the latest - // state of the tasks from Mesos. - private val pendingRecover = new mutable.HashMap[String, SlaveID]() - // Stores all the submitted drivers that hasn't been launched. - private val queuedDrivers = new ArrayBuffer[MesosDriverDescription]() - // All supervised drivers that are waiting to retry after termination. - private val pendingRetryDrivers = new ArrayBuffer[MesosDriverDescription]() - private val queuedDriversState = engineFactory.createEngine("driverQueue") - private val launchedDriversState = engineFactory.createEngine("launchedDrivers") - private val pendingRetryDriversState = engineFactory.createEngine("retryList") - // Flag to mark if the scheduler is ready to be called, which is until the scheduler - // is registered with Mesos master. - @volatile protected var ready = false - private var masterInfo: Option[MasterInfo] = None - - def submitDriver(desc: MesosDriverDescription): CreateSubmissionResponse = { - val c = new CreateSubmissionResponse - if (!ready) { - c.success = false - c.message = "Scheduler is not ready to take requests" - return c - } - - stateLock.synchronized { - if (isQueueFull()) { - c.success = false - c.message = "Already reached maximum submission size" - return c - } - c.submissionId = desc.submissionId - queuedDriversState.persist(desc.submissionId, desc) - queuedDrivers += desc - c.success = true - } - c - } - - def killDriver(submissionId: String): KillSubmissionResponse = { - val k = new KillSubmissionResponse - if (!ready) { - k.success = false - k.message = "Scheduler is not ready to take requests" - return k - } - k.submissionId = submissionId - stateLock.synchronized { - // We look for the requested driver in the following places: - // 1. Check if submission is running or launched. - // 2. Check if it's still queued. - // 3. Check if it's in the retry list. - // 4. Check if it has already completed. - if (launchedDrivers.contains(submissionId)) { - val task = launchedDrivers(submissionId) - mesosDriver.killTask(task.taskId) - k.success = true - k.message = "Killing running driver" - } else if (removeFromQueuedDrivers(submissionId)) { - k.success = true - k.message = "Removed driver while it's still pending" - } else if (removeFromPendingRetryDrivers(submissionId)) { - k.success = true - k.message = "Removed driver while it's being retried" - } else if (finishedDrivers.exists(_.driverDescription.submissionId.equals(submissionId))) { - k.success = false - k.message = "Driver already terminated" - } else { - k.success = false - k.message = "Cannot find driver" - } - } - k - } - - def getDriverStatus(submissionId: String): SubmissionStatusResponse = { - val s = new SubmissionStatusResponse - if (!ready) { - s.success = false - s.message = "Scheduler is not ready to take requests" - return s - } - s.submissionId = submissionId - stateLock.synchronized { - if (queuedDrivers.exists(_.submissionId.equals(submissionId))) { - s.success = true - s.driverState = "QUEUED" - } else if (launchedDrivers.contains(submissionId)) { - s.success = true - s.driverState = "RUNNING" - launchedDrivers(submissionId).mesosTaskStatus.foreach(state => s.message = state.toString) - } else if (finishedDrivers.exists(_.driverDescription.submissionId.equals(submissionId))) { - s.success = true - s.driverState = "FINISHED" - finishedDrivers - .find(d => d.driverDescription.submissionId.equals(submissionId)).get.mesosTaskStatus - .foreach(state => s.message = state.toString) - } else if (pendingRetryDrivers.exists(_.submissionId.equals(submissionId))) { - val status = pendingRetryDrivers.find(_.submissionId.equals(submissionId)) - .get.retryState.get.lastFailureStatus - s.success = true - s.driverState = "RETRYING" - s.message = status.toString - } else { - s.success = false - s.driverState = "NOT_FOUND" - } - } - s - } - - /** - * Gets the driver state to be displayed on the Web UI. - */ - def getDriverState(submissionId: String): Option[MesosDriverState] = { - stateLock.synchronized { - queuedDrivers.find(_.submissionId.equals(submissionId)) - .map(d => new MesosDriverState("QUEUED", d)) - .orElse(launchedDrivers.get(submissionId) - .map(d => new MesosDriverState("RUNNING", d.driverDescription, Some(d)))) - .orElse(finishedDrivers.find(_.driverDescription.submissionId.equals(submissionId)) - .map(d => new MesosDriverState("FINISHED", d.driverDescription, Some(d)))) - .orElse(pendingRetryDrivers.find(_.submissionId.equals(submissionId)) - .map(d => new MesosDriverState("RETRYING", d))) - } - } - - private def isQueueFull(): Boolean = launchedDrivers.size >= queuedCapacity - - /** - * Recover scheduler state that is persisted. - * We still need to do task reconciliation to be up to date of the latest task states - * as it might have changed while the scheduler is failing over. - */ - private def recoverState(): Unit = { - stateLock.synchronized { - launchedDriversState.fetchAll[MesosClusterSubmissionState]().foreach { state => - launchedDrivers(state.taskId.getValue) = state - pendingRecover(state.taskId.getValue) = state.slaveId - } - queuedDriversState.fetchAll[MesosDriverDescription]().foreach(d => queuedDrivers += d) - // There is potential timing issue where a queued driver might have been launched - // but the scheduler shuts down before the queued driver was able to be removed - // from the queue. We try to mitigate this issue by walking through all queued drivers - // and remove if they're already launched. - queuedDrivers - .filter(d => launchedDrivers.contains(d.submissionId)) - .foreach(d => removeFromQueuedDrivers(d.submissionId)) - pendingRetryDriversState.fetchAll[MesosDriverDescription]() - .foreach(s => pendingRetryDrivers += s) - // TODO: Consider storing finished drivers so we can show them on the UI after - // failover. For now we clear the history on each recovery. - finishedDrivers.clear() - } - } - - /** - * Starts the cluster scheduler and wait until the scheduler is registered. - * This also marks the scheduler to be ready for requests. - */ - def start(): Unit = { - // TODO: Implement leader election to make sure only one framework running in the cluster. - val fwId = schedulerState.fetch[String]("frameworkId") - fwId.foreach { id => - frameworkId = id - } - recoverState() - metricsSystem.registerSource(new MesosClusterSchedulerSource(this)) - metricsSystem.start() - val driver = createSchedulerDriver( - master, - MesosClusterScheduler.this, - Utils.getCurrentUserName(), - appName, - conf, - Some(frameworkUrl), - Some(true), - Some(Integer.MAX_VALUE), - fwId) - - startScheduler(driver) - ready = true - } - - def stop(): Unit = { - ready = false - metricsSystem.report() - metricsSystem.stop() - mesosDriver.stop(true) - } - - override def registered( - driver: SchedulerDriver, - newFrameworkId: FrameworkID, - masterInfo: MasterInfo): Unit = { - logInfo("Registered as framework ID " + newFrameworkId.getValue) - if (newFrameworkId.getValue != frameworkId) { - frameworkId = newFrameworkId.getValue - schedulerState.persist("frameworkId", frameworkId) - } - markRegistered() - - stateLock.synchronized { - this.masterInfo = Some(masterInfo) - if (!pendingRecover.isEmpty) { - // Start task reconciliation if we need to recover. - val statuses = pendingRecover.collect { - case (taskId, slaveId) => - val newStatus = TaskStatus.newBuilder() - .setTaskId(TaskID.newBuilder().setValue(taskId).build()) - .setSlaveId(slaveId) - .setState(MesosTaskState.TASK_STAGING) - .build() - launchedDrivers.get(taskId).map(_.mesosTaskStatus.getOrElse(newStatus)) - .getOrElse(newStatus) - } - // TODO: Page the status updates to avoid trying to reconcile - // a large amount of tasks at once. - driver.reconcileTasks(statuses.toSeq.asJava) - } - } - } - - private def getDriverExecutorURI(desc: MesosDriverDescription): Option[String] = { - desc.conf.getOption("spark.executor.uri") - .orElse(desc.command.environment.get("SPARK_EXECUTOR_URI")) - } - - private def getDriverFrameworkID(desc: MesosDriverDescription): String = { - s"${frameworkId}-${desc.submissionId}" - } - - private def adjust[A, B](m: collection.Map[A, B], k: A, default: B)(f: B => B) = { - m.updated(k, f(m.getOrElse(k, default))) - } - - private def getDriverEnvironment(desc: MesosDriverDescription): Environment = { - // TODO(mgummelt): Don't do this here. This should be passed as a --conf - val commandEnv = adjust(desc.command.environment, "SPARK_SUBMIT_OPTS", "")( - v => s"$v -Dspark.mesos.driver.frameworkId=${getDriverFrameworkID(desc)}" - ) - - val env = desc.conf.getAllWithPrefix("spark.mesos.driverEnv.") ++ commandEnv - - val envBuilder = Environment.newBuilder() - env.foreach { case (k, v) => - envBuilder.addVariables(Variable.newBuilder().setName(k).setValue(v)) - } - envBuilder.build() - } - - private def getDriverUris(desc: MesosDriverDescription): List[CommandInfo.URI] = { - val confUris = List(conf.getOption("spark.mesos.uris"), - desc.conf.getOption("spark.mesos.uris"), - desc.conf.getOption("spark.submit.pyFiles")).flatMap( - _.map(_.split(",").map(_.trim)) - ).flatten - - val jarUrl = desc.jarUrl.stripPrefix("file:").stripPrefix("local:") - - ((jarUrl :: confUris) ++ getDriverExecutorURI(desc).toList).map(uri => - CommandInfo.URI.newBuilder().setValue(uri.trim()).build()) - } - - private def getDriverCommandValue(desc: MesosDriverDescription): String = { - val dockerDefined = desc.conf.contains("spark.mesos.executor.docker.image") - val executorUri = getDriverExecutorURI(desc) - // Gets the path to run spark-submit, and the path to the Mesos sandbox. - val (executable, sandboxPath) = if (dockerDefined) { - // Application jar is automatically downloaded in the mounted sandbox by Mesos, - // and the path to the mounted volume is stored in $MESOS_SANDBOX env variable. - ("./bin/spark-submit", "$MESOS_SANDBOX") - } else if (executorUri.isDefined) { - val folderBasename = executorUri.get.split('/').last.split('.').head - - val entries = conf.getOption("spark.executor.extraLibraryPath") - .map(path => Seq(path) ++ desc.command.libraryPathEntries) - .getOrElse(desc.command.libraryPathEntries) - - val prefixEnv = if (!entries.isEmpty) Utils.libraryPathEnvPrefix(entries) else "" - - val cmdExecutable = s"cd $folderBasename*; $prefixEnv bin/spark-submit" - // Sandbox path points to the parent folder as we chdir into the folderBasename. - (cmdExecutable, "..") - } else { - val executorSparkHome = desc.conf.getOption("spark.mesos.executor.home") - .orElse(conf.getOption("spark.home")) - .orElse(Option(System.getenv("SPARK_HOME"))) - .getOrElse { - throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!") - } - val cmdExecutable = new File(executorSparkHome, "./bin/spark-submit").getPath - // Sandbox points to the current directory by default with Mesos. - (cmdExecutable, ".") - } - val cmdOptions = generateCmdOption(desc, sandboxPath).mkString(" ") - val primaryResource = new File(sandboxPath, desc.jarUrl.split("/").last).toString() - val appArguments = desc.command.arguments.mkString(" ") - - s"$executable $cmdOptions $primaryResource $appArguments" - } - - private def buildDriverCommand(desc: MesosDriverDescription): CommandInfo = { - val builder = CommandInfo.newBuilder() - builder.setValue(getDriverCommandValue(desc)) - builder.setEnvironment(getDriverEnvironment(desc)) - builder.addAllUris(getDriverUris(desc).asJava) - builder.build() - } - - private def generateCmdOption(desc: MesosDriverDescription, sandboxPath: String): Seq[String] = { - var options = Seq( - "--name", desc.conf.get("spark.app.name"), - "--master", s"mesos://${conf.get("spark.master")}", - "--driver-cores", desc.cores.toString, - "--driver-memory", s"${desc.mem}M") - - // Assume empty main class means we're running python - if (!desc.command.mainClass.equals("")) { - options ++= Seq("--class", desc.command.mainClass) - } - - desc.conf.getOption("spark.executor.memory").foreach { v => - options ++= Seq("--executor-memory", v) - } - desc.conf.getOption("spark.cores.max").foreach { v => - options ++= Seq("--total-executor-cores", v) - } - desc.conf.getOption("spark.submit.pyFiles").foreach { pyFiles => - val formattedFiles = pyFiles.split(",") - .map { path => new File(sandboxPath, path.split("/").last).toString() } - .mkString(",") - options ++= Seq("--py-files", formattedFiles) - } - - // --conf - val replicatedOptionsBlacklist = Set( - "spark.jars", // Avoids duplicate classes in classpath - "spark.submit.deployMode", // this would be set to `cluster`, but we need client - "spark.master" // this contains the address of the dispatcher, not master - ) - val defaultConf = conf.getAllWithPrefix("spark.mesos.dispatcher.driverDefault.").toMap - val driverConf = desc.conf.getAll - .filter { case (key, _) => !replicatedOptionsBlacklist.contains(key) } - .toMap - (defaultConf ++ driverConf).foreach { case (key, value) => - options ++= Seq("--conf", s"$key=${shellEscape(value)}") } - - options - } - - /** - * Escape args for Unix-like shells, unless already quoted by the user. - * Based on: http://www.gnu.org/software/bash/manual/html_node/Double-Quotes.html - * and http://www.grymoire.com/Unix/Quote.html - * - * @param value argument - * @return escaped argument - */ - private[scheduler] def shellEscape(value: String): String = { - val WrappedInQuotes = """^(".+"|'.+')$""".r - val ShellSpecialChars = (""".*([ '<>&|\?\*;!#\\(\)"$`]).*""").r - value match { - case WrappedInQuotes(c) => value // The user quoted his args, don't touch it! - case ShellSpecialChars(c) => "\"" + value.replaceAll("""(["`\$\\])""", """\\$1""") + "\"" - case _: String => value // Don't touch harmless strings - } - } - - private class ResourceOffer( - val offerId: OfferID, - val slaveId: SlaveID, - var resources: JList[Resource]) { - override def toString(): String = { - s"Offer id: ${offerId}, resources: ${resources}" - } - } - - private def createTaskInfo(desc: MesosDriverDescription, offer: ResourceOffer): TaskInfo = { - val taskId = TaskID.newBuilder().setValue(desc.submissionId).build() - - val (remainingResources, cpuResourcesToUse) = - partitionResources(offer.resources, "cpus", desc.cores) - val (finalResources, memResourcesToUse) = - partitionResources(remainingResources.asJava, "mem", desc.mem) - offer.resources = finalResources.asJava - - val appName = desc.conf.get("spark.app.name") - val taskInfo = TaskInfo.newBuilder() - .setTaskId(taskId) - .setName(s"Driver for ${appName}") - .setSlaveId(offer.slaveId) - .setCommand(buildDriverCommand(desc)) - .addAllResources(cpuResourcesToUse.asJava) - .addAllResources(memResourcesToUse.asJava) - - desc.conf.getOption("spark.mesos.executor.docker.image").foreach { image => - MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(image, - desc.conf, - taskInfo.getContainerBuilder) - } - - taskInfo.build - } - - /** - * This method takes all the possible candidates and attempt to schedule them with Mesos offers. - * Every time a new task is scheduled, the afterLaunchCallback is called to perform post scheduled - * logic on each task. - */ - private def scheduleTasks( - candidates: Seq[MesosDriverDescription], - afterLaunchCallback: (String) => Boolean, - currentOffers: List[ResourceOffer], - tasks: mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]): Unit = { - for (submission <- candidates) { - val driverCpu = submission.cores - val driverMem = submission.mem - logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: $driverMem") - val offerOption = currentOffers.find { o => - getResource(o.resources, "cpus") >= driverCpu && - getResource(o.resources, "mem") >= driverMem - } - if (offerOption.isEmpty) { - logDebug(s"Unable to find offer to launch driver id: ${submission.submissionId}, " + - s"cpu: $driverCpu, mem: $driverMem") - } else { - val offer = offerOption.get - val queuedTasks = tasks.getOrElseUpdate(offer.offerId, new ArrayBuffer[TaskInfo]) - val task = createTaskInfo(submission, offer) - queuedTasks += task - logTrace(s"Using offer ${offer.offerId.getValue} to launch driver " + - submission.submissionId) - val newState = new MesosClusterSubmissionState(submission, task.getTaskId, offer.slaveId, - None, new Date(), None, getDriverFrameworkID(submission)) - launchedDrivers(submission.submissionId) = newState - launchedDriversState.persist(submission.submissionId, newState) - afterLaunchCallback(submission.submissionId) - } - } - } - - override def resourceOffers(driver: SchedulerDriver, offers: JList[Offer]): Unit = { - logTrace(s"Received offers from Mesos: \n${offers.asScala.mkString("\n")}") - val tasks = new mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]() - val currentTime = new Date() - - val currentOffers = offers.asScala.map { - o => new ResourceOffer(o.getId, o.getSlaveId, o.getResourcesList) - }.toList - - stateLock.synchronized { - // We first schedule all the supervised drivers that are ready to retry. - // This list will be empty if none of the drivers are marked as supervise. - val driversToRetry = pendingRetryDrivers.filter { d => - d.retryState.get.nextRetry.before(currentTime) - } - - scheduleTasks( - copyBuffer(driversToRetry), - removeFromPendingRetryDrivers, - currentOffers, - tasks) - - // Then we walk through the queued drivers and try to schedule them. - scheduleTasks( - copyBuffer(queuedDrivers), - removeFromQueuedDrivers, - currentOffers, - tasks) - } - tasks.foreach { case (offerId, taskInfos) => - driver.launchTasks(Collections.singleton(offerId), taskInfos.asJava) - } - - for (o <- currentOffers if !tasks.contains(o.offerId)) { - driver.declineOffer(o.offerId) - } - } - - private def copyBuffer( - buffer: ArrayBuffer[MesosDriverDescription]): ArrayBuffer[MesosDriverDescription] = { - val newBuffer = new ArrayBuffer[MesosDriverDescription](buffer.size) - buffer.copyToBuffer(newBuffer) - newBuffer - } - - def getSchedulerState(): MesosClusterSchedulerState = { - stateLock.synchronized { - new MesosClusterSchedulerState( - frameworkId, - masterInfo.map(m => s"http://${m.getIp}:${m.getPort}"), - copyBuffer(queuedDrivers), - launchedDrivers.values.map(_.copy()).toList, - finishedDrivers.map(_.copy()).toList, - copyBuffer(pendingRetryDrivers)) - } - } - - override def offerRescinded(driver: SchedulerDriver, offerId: OfferID): Unit = {} - override def disconnected(driver: SchedulerDriver): Unit = {} - override def reregistered(driver: SchedulerDriver, masterInfo: MasterInfo): Unit = { - logInfo(s"Framework re-registered with master ${masterInfo.getId}") - } - override def slaveLost(driver: SchedulerDriver, slaveId: SlaveID): Unit = {} - override def error(driver: SchedulerDriver, error: String): Unit = { - logError("Error received: " + error) - markErr() - } - - /** - * Check if the task state is a recoverable state that we can relaunch the task. - * Task state like TASK_ERROR are not relaunchable state since it wasn't able - * to be validated by Mesos. - */ - private def shouldRelaunch(state: MesosTaskState): Boolean = { - state == MesosTaskState.TASK_FAILED || - state == MesosTaskState.TASK_KILLED || - state == MesosTaskState.TASK_LOST - } - - override def statusUpdate(driver: SchedulerDriver, status: TaskStatus): Unit = { - val taskId = status.getTaskId.getValue - stateLock.synchronized { - if (launchedDrivers.contains(taskId)) { - if (status.getReason == Reason.REASON_RECONCILIATION && - !pendingRecover.contains(taskId)) { - // Task has already received update and no longer requires reconciliation. - return - } - val state = launchedDrivers(taskId) - // Check if the driver is supervise enabled and can be relaunched. - if (state.driverDescription.supervise && shouldRelaunch(status.getState)) { - removeFromLaunchedDrivers(taskId) - state.finishDate = Some(new Date()) - val retryState: Option[MesosClusterRetryState] = state.driverDescription.retryState - val (retries, waitTimeSec) = retryState - .map { rs => (rs.retries + 1, Math.min(maxRetryWaitTime, rs.waitTime * 2)) } - .getOrElse{ (1, 1) } - val nextRetry = new Date(new Date().getTime + waitTimeSec * 1000L) - - val newDriverDescription = state.driverDescription.copy( - retryState = Some(new MesosClusterRetryState(status, retries, nextRetry, waitTimeSec))) - pendingRetryDrivers += newDriverDescription - pendingRetryDriversState.persist(taskId, newDriverDescription) - } else if (TaskState.isFinished(TaskState.fromMesos(status.getState))) { - removeFromLaunchedDrivers(taskId) - state.finishDate = Some(new Date()) - if (finishedDrivers.size >= retainedDrivers) { - val toRemove = math.max(retainedDrivers / 10, 1) - finishedDrivers.trimStart(toRemove) - } - finishedDrivers += state - } - state.mesosTaskStatus = Option(status) - } else { - logError(s"Unable to find driver $taskId in status update") - } - } - } - - override def frameworkMessage( - driver: SchedulerDriver, - executorId: ExecutorID, - slaveId: SlaveID, - message: Array[Byte]): Unit = {} - - override def executorLost( - driver: SchedulerDriver, - executorId: ExecutorID, - slaveId: SlaveID, - status: Int): Unit = {} - - private def removeFromQueuedDrivers(id: String): Boolean = { - val index = queuedDrivers.indexWhere(_.submissionId.equals(id)) - if (index != -1) { - queuedDrivers.remove(index) - queuedDriversState.expunge(id) - true - } else { - false - } - } - - private def removeFromLaunchedDrivers(id: String): Boolean = { - if (launchedDrivers.remove(id).isDefined) { - launchedDriversState.expunge(id) - true - } else { - false - } - } - - private def removeFromPendingRetryDrivers(id: String): Boolean = { - val index = pendingRetryDrivers.indexWhere(_.submissionId.equals(id)) - if (index != -1) { - pendingRetryDrivers.remove(index) - pendingRetryDriversState.expunge(id) - true - } else { - false - } - } - - def getQueuedDriversSize: Int = queuedDrivers.size - def getLaunchedDriversSize: Int = launchedDrivers.size - def getPendingRetryDriversSize: Int = pendingRetryDrivers.size -}
http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSource.scala deleted file mode 100644 index 1fe9497..0000000 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSource.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster.mesos - -import com.codahale.metrics.{Gauge, MetricRegistry} - -import org.apache.spark.metrics.source.Source - -private[mesos] class MesosClusterSchedulerSource(scheduler: MesosClusterScheduler) - extends Source { - override def sourceName: String = "mesos_cluster" - override def metricRegistry: MetricRegistry = new MetricRegistry() - - metricRegistry.register(MetricRegistry.name("waitingDrivers"), new Gauge[Int] { - override def getValue: Int = scheduler.getQueuedDriversSize - }) - - metricRegistry.register(MetricRegistry.name("launchedDrivers"), new Gauge[Int] { - override def getValue: Int = scheduler.getLaunchedDriversSize - }) - - metricRegistry.register(MetricRegistry.name("retryDrivers"), new Gauge[Int] { - override def getValue: Int = scheduler.getPendingRetryDriversSize - }) -} http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala deleted file mode 100644 index 6b9313e..0000000 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ /dev/null @@ -1,642 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster.mesos - -import java.io.File -import java.util.{Collections, List => JList} -import java.util.concurrent.locks.ReentrantLock - -import scala.collection.JavaConverters._ -import scala.collection.mutable - -import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} - -import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState} -import org.apache.spark.network.netty.SparkTransportConf -import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient -import org.apache.spark.rpc.RpcEndpointAddress -import org.apache.spark.scheduler.{SlaveLost, TaskSchedulerImpl} -import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend -import org.apache.spark.util.Utils - -/** - * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds - * onto each Mesos node for the duration of the Spark job instead of relinquishing cores whenever - * a task is done. It launches Spark tasks within the coarse-grained Mesos tasks using the - * CoarseGrainedSchedulerBackend mechanism. This class is useful for lower and more predictable - * latency. - * - * Unfortunately this has a bit of duplication from [[MesosFineGrainedSchedulerBackend]], - * but it seems hard to remove this. - */ -private[spark] class MesosCoarseGrainedSchedulerBackend( - scheduler: TaskSchedulerImpl, - sc: SparkContext, - master: String, - securityManager: SecurityManager) - extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) - with org.apache.mesos.Scheduler - with MesosSchedulerUtils { - - val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures - - // Maximum number of cores to acquire (TODO: we'll need more flexible controls here) - val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt - - private[this] val shutdownTimeoutMS = - conf.getTimeAsMs("spark.mesos.coarse.shutdownTimeout", "10s") - .ensuring(_ >= 0, "spark.mesos.coarse.shutdownTimeout must be >= 0") - - // Synchronization protected by stateLock - private[this] var stopCalled: Boolean = false - - // If shuffle service is enabled, the Spark driver will register with the shuffle service. - // This is for cleaning up shuffle files reliably. - private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false) - - // Cores we have acquired with each Mesos task ID - val coresByTaskId = new mutable.HashMap[String, Int] - var totalCoresAcquired = 0 - - // SlaveID -> Slave - // This map accumulates entries for the duration of the job. Slaves are never deleted, because - // we need to maintain e.g. failure state and connection state. - private val slaves = new mutable.HashMap[String, Slave] - - /** - * The total number of executors we aim to have. Undefined when not using dynamic allocation. - * Initially set to 0 when using dynamic allocation, the executor allocation manager will send - * the real initial limit later. - */ - private var executorLimitOption: Option[Int] = { - if (Utils.isDynamicAllocationEnabled(conf)) { - Some(0) - } else { - None - } - } - - /** - * Return the current executor limit, which may be [[Int.MaxValue]] - * before properly initialized. - */ - private[mesos] def executorLimit: Int = executorLimitOption.getOrElse(Int.MaxValue) - - // private lock object protecting mutable state above. Using the intrinsic lock - // may lead to deadlocks since the superclass might also try to lock - private val stateLock = new ReentrantLock - - val extraCoresPerExecutor = conf.getInt("spark.mesos.extra.cores", 0) - - // Offer constraints - private val slaveOfferConstraints = - parseConstraintString(sc.conf.get("spark.mesos.constraints", "")) - - // Reject offers with mismatched constraints in seconds - private val rejectOfferDurationForUnmetConstraints = - getRejectOfferDurationForUnmetConstraints(sc) - - // Reject offers when we reached the maximum number of cores for this framework - private val rejectOfferDurationForReachedMaxCores = - getRejectOfferDurationForReachedMaxCores(sc) - - // A client for talking to the external shuffle service - private val mesosExternalShuffleClient: Option[MesosExternalShuffleClient] = { - if (shuffleServiceEnabled) { - Some(getShuffleClient()) - } else { - None - } - } - - // This method is factored out for testability - protected def getShuffleClient(): MesosExternalShuffleClient = { - new MesosExternalShuffleClient( - SparkTransportConf.fromSparkConf(conf, "shuffle"), - securityManager, - securityManager.isAuthenticationEnabled(), - securityManager.isSaslEncryptionEnabled()) - } - - var nextMesosTaskId = 0 - - @volatile var appId: String = _ - - def newMesosTaskId(): String = { - val id = nextMesosTaskId - nextMesosTaskId += 1 - id.toString - } - - override def start() { - super.start() - val driver = createSchedulerDriver( - master, - MesosCoarseGrainedSchedulerBackend.this, - sc.sparkUser, - sc.appName, - sc.conf, - sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.appUIAddress)), - None, - None, - sc.conf.getOption("spark.mesos.driver.frameworkId") - ) - - unsetFrameworkID(sc) - startScheduler(driver) - } - - def createCommand(offer: Offer, numCores: Int, taskId: String): CommandInfo = { - val environment = Environment.newBuilder() - val extraClassPath = conf.getOption("spark.executor.extraClassPath") - extraClassPath.foreach { cp => - environment.addVariables( - Environment.Variable.newBuilder().setName("SPARK_CLASSPATH").setValue(cp).build()) - } - val extraJavaOpts = conf.get("spark.executor.extraJavaOptions", "") - - // Set the environment variable through a command prefix - // to append to the existing value of the variable - val prefixEnv = conf.getOption("spark.executor.extraLibraryPath").map { p => - Utils.libraryPathEnvPrefix(Seq(p)) - }.getOrElse("") - - environment.addVariables( - Environment.Variable.newBuilder() - .setName("SPARK_EXECUTOR_OPTS") - .setValue(extraJavaOpts) - .build()) - - sc.executorEnvs.foreach { case (key, value) => - environment.addVariables(Environment.Variable.newBuilder() - .setName(key) - .setValue(value) - .build()) - } - val command = CommandInfo.newBuilder() - .setEnvironment(environment) - - val uri = conf.getOption("spark.executor.uri") - .orElse(Option(System.getenv("SPARK_EXECUTOR_URI"))) - - if (uri.isEmpty) { - val executorSparkHome = conf.getOption("spark.mesos.executor.home") - .orElse(sc.getSparkHome()) - .getOrElse { - throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!") - } - val runScript = new File(executorSparkHome, "./bin/spark-class").getPath - command.setValue( - "%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend" - .format(prefixEnv, runScript) + - s" --driver-url $driverURL" + - s" --executor-id $taskId" + - s" --hostname ${offer.getHostname}" + - s" --cores $numCores" + - s" --app-id $appId") - } else { - // Grab everything to the first '.'. We'll use that and '*' to - // glob the directory "correctly". - val basename = uri.get.split('/').last.split('.').head - command.setValue( - s"cd $basename*; $prefixEnv " + - "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" + - s" --driver-url $driverURL" + - s" --executor-id $taskId" + - s" --hostname ${offer.getHostname}" + - s" --cores $numCores" + - s" --app-id $appId") - command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get)) - } - - conf.getOption("spark.mesos.uris").foreach(setupUris(_, command)) - - command.build() - } - - protected def driverURL: String = { - if (conf.contains("spark.testing")) { - "driverURL" - } else { - RpcEndpointAddress( - conf.get("spark.driver.host"), - conf.get("spark.driver.port").toInt, - CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString - } - } - - override def offerRescinded(d: org.apache.mesos.SchedulerDriver, o: OfferID) {} - - override def registered( - d: org.apache.mesos.SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { - appId = frameworkId.getValue - mesosExternalShuffleClient.foreach(_.init(appId)) - markRegistered() - } - - override def sufficientResourcesRegistered(): Boolean = { - totalCoresAcquired >= maxCores * minRegisteredRatio - } - - override def disconnected(d: org.apache.mesos.SchedulerDriver) {} - - override def reregistered(d: org.apache.mesos.SchedulerDriver, masterInfo: MasterInfo) {} - - /** - * Method called by Mesos to offer resources on slaves. We respond by launching an executor, - * unless we've already launched more than we wanted to. - */ - override def resourceOffers(d: org.apache.mesos.SchedulerDriver, offers: JList[Offer]) { - stateLock.synchronized { - if (stopCalled) { - logDebug("Ignoring offers during shutdown") - // Driver should simply return a stopped status on race - // condition between this.stop() and completing here - offers.asScala.map(_.getId).foreach(d.declineOffer) - return - } - - logDebug(s"Received ${offers.size} resource offers.") - - val (matchedOffers, unmatchedOffers) = offers.asScala.partition { offer => - val offerAttributes = toAttributeMap(offer.getAttributesList) - matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) - } - - declineUnmatchedOffers(d, unmatchedOffers) - handleMatchedOffers(d, matchedOffers) - } - } - - private def declineUnmatchedOffers( - d: org.apache.mesos.SchedulerDriver, offers: mutable.Buffer[Offer]): Unit = { - offers.foreach { offer => - declineOffer(d, offer, Some("unmet constraints"), - Some(rejectOfferDurationForUnmetConstraints)) - } - } - - private def declineOffer( - d: org.apache.mesos.SchedulerDriver, - offer: Offer, - reason: Option[String] = None, - refuseSeconds: Option[Long] = None): Unit = { - - val id = offer.getId.getValue - val offerAttributes = toAttributeMap(offer.getAttributesList) - val mem = getResource(offer.getResourcesList, "mem") - val cpus = getResource(offer.getResourcesList, "cpus") - val ports = getRangeResource(offer.getResourcesList, "ports") - - logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem" + - s" cpu: $cpus port: $ports for $refuseSeconds seconds" + - reason.map(r => s" (reason: $r)").getOrElse("")) - - refuseSeconds match { - case Some(seconds) => - val filters = Filters.newBuilder().setRefuseSeconds(seconds).build() - d.declineOffer(offer.getId, filters) - case _ => d.declineOffer(offer.getId) - } - } - - /** - * Launches executors on accepted offers, and declines unused offers. Executors are launched - * round-robin on offers. - * - * @param d SchedulerDriver - * @param offers Mesos offers that match attribute constraints - */ - private def handleMatchedOffers( - d: org.apache.mesos.SchedulerDriver, offers: mutable.Buffer[Offer]): Unit = { - val tasks = buildMesosTasks(offers) - for (offer <- offers) { - val offerAttributes = toAttributeMap(offer.getAttributesList) - val offerMem = getResource(offer.getResourcesList, "mem") - val offerCpus = getResource(offer.getResourcesList, "cpus") - val offerPorts = getRangeResource(offer.getResourcesList, "ports") - val id = offer.getId.getValue - - if (tasks.contains(offer.getId)) { // accept - val offerTasks = tasks(offer.getId) - - logDebug(s"Accepting offer: $id with attributes: $offerAttributes " + - s"mem: $offerMem cpu: $offerCpus ports: $offerPorts." + - s" Launching ${offerTasks.size} Mesos tasks.") - - for (task <- offerTasks) { - val taskId = task.getTaskId - val mem = getResource(task.getResourcesList, "mem") - val cpus = getResource(task.getResourcesList, "cpus") - val ports = getRangeResource(task.getResourcesList, "ports").mkString(",") - - logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus" + - s" ports: $ports") - } - - d.launchTasks( - Collections.singleton(offer.getId), - offerTasks.asJava) - } else if (totalCoresAcquired >= maxCores) { - // Reject an offer for a configurable amount of time to avoid starving other frameworks - declineOffer(d, offer, Some("reached spark.cores.max"), - Some(rejectOfferDurationForReachedMaxCores)) - } else { - declineOffer(d, offer) - } - } - } - - /** - * Returns a map from OfferIDs to the tasks to launch on those offers. In order to maximize - * per-task memory and IO, tasks are round-robin assigned to offers. - * - * @param offers Mesos offers that match attribute constraints - * @return A map from OfferID to a list of Mesos tasks to launch on that offer - */ - private def buildMesosTasks(offers: mutable.Buffer[Offer]): Map[OfferID, List[MesosTaskInfo]] = { - // offerID -> tasks - val tasks = new mutable.HashMap[OfferID, List[MesosTaskInfo]].withDefaultValue(Nil) - - // offerID -> resources - val remainingResources = mutable.Map(offers.map(offer => - (offer.getId.getValue, offer.getResourcesList)): _*) - - var launchTasks = true - - // TODO(mgummelt): combine offers for a single slave - // - // round-robin create executors on the available offers - while (launchTasks) { - launchTasks = false - - for (offer <- offers) { - val slaveId = offer.getSlaveId.getValue - val offerId = offer.getId.getValue - val resources = remainingResources(offerId) - - if (canLaunchTask(slaveId, resources)) { - // Create a task - launchTasks = true - val taskId = newMesosTaskId() - val offerCPUs = getResource(resources, "cpus").toInt - - val taskCPUs = executorCores(offerCPUs) - val taskMemory = executorMemory(sc) - - slaves.getOrElseUpdate(slaveId, new Slave(offer.getHostname)).taskIDs.add(taskId) - - val (resourcesLeft, resourcesToUse) = - partitionTaskResources(resources, taskCPUs, taskMemory) - - val taskBuilder = MesosTaskInfo.newBuilder() - .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build()) - .setSlaveId(offer.getSlaveId) - .setCommand(createCommand(offer, taskCPUs + extraCoresPerExecutor, taskId)) - .setName("Task " + taskId) - - taskBuilder.addAllResources(resourcesToUse.asJava) - - sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image => - MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo( - image, - sc.conf, - taskBuilder.getContainerBuilder - ) - } - - tasks(offer.getId) ::= taskBuilder.build() - remainingResources(offerId) = resourcesLeft.asJava - totalCoresAcquired += taskCPUs - coresByTaskId(taskId) = taskCPUs - } - } - } - tasks.toMap - } - - /** Extracts task needed resources from a list of available resources. */ - private def partitionTaskResources(resources: JList[Resource], taskCPUs: Int, taskMemory: Int) - : (List[Resource], List[Resource]) = { - - // partition cpus & mem - val (afterCPUResources, cpuResourcesToUse) = partitionResources(resources, "cpus", taskCPUs) - val (afterMemResources, memResourcesToUse) = - partitionResources(afterCPUResources.asJava, "mem", taskMemory) - - // If user specifies port numbers in SparkConfig then consecutive tasks will not be launched - // on the same host. This essentially means one executor per host. - // TODO: handle network isolator case - val (nonPortResources, portResourcesToUse) = - partitionPortResources(nonZeroPortValuesFromConfig(sc.conf), afterMemResources) - - (nonPortResources, cpuResourcesToUse ++ memResourcesToUse ++ portResourcesToUse) - } - - private def canLaunchTask(slaveId: String, resources: JList[Resource]): Boolean = { - val offerMem = getResource(resources, "mem") - val offerCPUs = getResource(resources, "cpus").toInt - val cpus = executorCores(offerCPUs) - val mem = executorMemory(sc) - val ports = getRangeResource(resources, "ports") - val meetsPortRequirements = checkPorts(sc.conf, ports) - - cpus > 0 && - cpus <= offerCPUs && - cpus + totalCoresAcquired <= maxCores && - mem <= offerMem && - numExecutors() < executorLimit && - slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < MAX_SLAVE_FAILURES && - meetsPortRequirements - } - - private def executorCores(offerCPUs: Int): Int = { - sc.conf.getInt("spark.executor.cores", - math.min(offerCPUs, maxCores - totalCoresAcquired)) - } - - override def statusUpdate(d: org.apache.mesos.SchedulerDriver, status: TaskStatus) { - val taskId = status.getTaskId.getValue - val slaveId = status.getSlaveId.getValue - val state = TaskState.fromMesos(status.getState) - - logInfo(s"Mesos task $taskId is now ${status.getState}") - - stateLock.synchronized { - val slave = slaves(slaveId) - - // If the shuffle service is enabled, have the driver register with each one of the - // shuffle services. This allows the shuffle services to clean up state associated with - // this application when the driver exits. There is currently not a great way to detect - // this through Mesos, since the shuffle services are set up independently. - if (state.equals(TaskState.RUNNING) && - shuffleServiceEnabled && - !slave.shuffleRegistered) { - assume(mesosExternalShuffleClient.isDefined, - "External shuffle client was not instantiated even though shuffle service is enabled.") - // TODO: Remove this and allow the MesosExternalShuffleService to detect - // framework termination when new Mesos Framework HTTP API is available. - val externalShufflePort = conf.getInt("spark.shuffle.service.port", 7337) - - logDebug(s"Connecting to shuffle service on slave $slaveId, " + - s"host ${slave.hostname}, port $externalShufflePort for app ${conf.getAppId}") - - mesosExternalShuffleClient.get - .registerDriverWithShuffleService( - slave.hostname, - externalShufflePort, - sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", - s"${sc.conf.getTimeAsMs("spark.network.timeout", "120s")}ms"), - sc.conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")) - slave.shuffleRegistered = true - } - - if (TaskState.isFinished(state)) { - // Remove the cores we have remembered for this task, if it's in the hashmap - for (cores <- coresByTaskId.get(taskId)) { - totalCoresAcquired -= cores - coresByTaskId -= taskId - } - // If it was a failure, mark the slave as failed for blacklisting purposes - if (TaskState.isFailed(state)) { - slave.taskFailures += 1 - - if (slave.taskFailures >= MAX_SLAVE_FAILURES) { - logInfo(s"Blacklisting Mesos slave $slaveId due to too many failures; " + - "is Spark installed on it?") - } - } - executorTerminated(d, slaveId, taskId, s"Executor finished with state $state") - // In case we'd rejected everything before but have now lost a node - d.reviveOffers() - } - } - } - - override def error(d: org.apache.mesos.SchedulerDriver, message: String) { - logError(s"Mesos error: $message") - scheduler.error(message) - } - - override def stop() { - // Make sure we're not launching tasks during shutdown - stateLock.synchronized { - if (stopCalled) { - logWarning("Stop called multiple times, ignoring") - return - } - stopCalled = true - super.stop() - } - - // Wait for executors to report done, or else mesosDriver.stop() will forcefully kill them. - // See SPARK-12330 - val startTime = System.nanoTime() - - // slaveIdsWithExecutors has no memory barrier, so this is eventually consistent - while (numExecutors() > 0 && - System.nanoTime() - startTime < shutdownTimeoutMS * 1000L * 1000L) { - Thread.sleep(100) - } - - if (numExecutors() > 0) { - logWarning(s"Timed out waiting for ${numExecutors()} remaining executors " - + s"to terminate within $shutdownTimeoutMS ms. This may leave temporary files " - + "on the mesos nodes.") - } - - // Close the mesos external shuffle client if used - mesosExternalShuffleClient.foreach(_.close()) - - if (mesosDriver != null) { - mesosDriver.stop() - } - } - - override def frameworkMessage( - d: org.apache.mesos.SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} - - /** - * Called when a slave is lost or a Mesos task finished. Updates local view on - * what tasks are running. It also notifies the driver that an executor was removed. - */ - private def executorTerminated( - d: org.apache.mesos.SchedulerDriver, - slaveId: String, - taskId: String, - reason: String): Unit = { - stateLock.synchronized { - // Do not call removeExecutor() after this scheduler backend was stopped because - // removeExecutor() internally will send a message to the driver endpoint but - // the driver endpoint is not available now, otherwise an exception will be thrown. - if (!stopCalled) { - removeExecutor(taskId, SlaveLost(reason)) - } - slaves(slaveId).taskIDs.remove(taskId) - } - } - - override def slaveLost(d: org.apache.mesos.SchedulerDriver, slaveId: SlaveID): Unit = { - logInfo(s"Mesos slave lost: ${slaveId.getValue}") - } - - override def executorLost( - d: org.apache.mesos.SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int): Unit = { - logInfo("Mesos executor lost: %s".format(e.getValue)) - } - - override def applicationId(): String = - Option(appId).getOrElse { - logWarning("Application ID is not initialized yet.") - super.applicationId - } - - override def doRequestTotalExecutors(requestedTotal: Int): Boolean = { - // We don't truly know if we can fulfill the full amount of executors - // since at coarse grain it depends on the amount of slaves available. - logInfo("Capping the total amount of executors to " + requestedTotal) - executorLimitOption = Some(requestedTotal) - true - } - - override def doKillExecutors(executorIds: Seq[String]): Boolean = { - if (mesosDriver == null) { - logWarning("Asked to kill executors before the Mesos driver was started.") - false - } else { - for (executorId <- executorIds) { - val taskId = TaskID.newBuilder().setValue(executorId).build() - mesosDriver.killTask(taskId) - } - // no need to adjust `executorLimitOption` since the AllocationManager already communicated - // the desired limit through a call to `doRequestTotalExecutors`. - // See [[o.a.s.scheduler.cluster.CoarseGrainedSchedulerBackend.killExecutors]] - true - } - } - - private def numExecutors(): Int = { - slaves.values.map(_.taskIDs.size).sum - } -} - -private class Slave(val hostname: String) { - val taskIDs = new mutable.HashSet[String]() - var taskFailures = 0 - var shuffleRegistered = false -} http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala deleted file mode 100644 index f1e48fa..0000000 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala +++ /dev/null @@ -1,451 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster.mesos - -import java.io.File -import java.util.{ArrayList => JArrayList, Collections, List => JList} - -import scala.collection.JavaConverters._ -import scala.collection.mutable.{HashMap, HashSet} - -import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, TaskInfo => MesosTaskInfo, _} -import org.apache.mesos.protobuf.ByteString - -import org.apache.spark.{SparkContext, SparkException, TaskState} -import org.apache.spark.executor.MesosExecutorBackend -import org.apache.spark.scheduler._ -import org.apache.spark.scheduler.cluster.ExecutorInfo -import org.apache.spark.util.Utils - -/** - * A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a - * separate Mesos task, allowing multiple applications to share cluster nodes both in space (tasks - * from multiple apps can run on different cores) and in time (a core can switch ownership). - */ -private[spark] class MesosFineGrainedSchedulerBackend( - scheduler: TaskSchedulerImpl, - sc: SparkContext, - master: String) - extends SchedulerBackend - with org.apache.mesos.Scheduler - with MesosSchedulerUtils { - - // Stores the slave ids that has launched a Mesos executor. - val slaveIdToExecutorInfo = new HashMap[String, MesosExecutorInfo] - val taskIdToSlaveId = new HashMap[Long, String] - - // An ExecutorInfo for our tasks - var execArgs: Array[Byte] = null - - var classLoader: ClassLoader = null - - // The listener bus to publish executor added/removed events. - val listenerBus = sc.listenerBus - - private[mesos] val mesosExecutorCores = sc.conf.getDouble("spark.mesos.mesosExecutor.cores", 1) - - // Offer constraints - private[this] val slaveOfferConstraints = - parseConstraintString(sc.conf.get("spark.mesos.constraints", "")) - - // reject offers with mismatched constraints in seconds - private val rejectOfferDurationForUnmetConstraints = - getRejectOfferDurationForUnmetConstraints(sc) - - @volatile var appId: String = _ - - override def start() { - classLoader = Thread.currentThread.getContextClassLoader - val driver = createSchedulerDriver( - master, - MesosFineGrainedSchedulerBackend.this, - sc.sparkUser, - sc.appName, - sc.conf, - sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.appUIAddress)), - Option.empty, - Option.empty, - sc.conf.getOption("spark.mesos.driver.frameworkId") - ) - - unsetFrameworkID(sc) - startScheduler(driver) - } - - /** - * Creates a MesosExecutorInfo that is used to launch a Mesos executor. - * @param availableResources Available resources that is offered by Mesos - * @param execId The executor id to assign to this new executor. - * @return A tuple of the new mesos executor info and the remaining available resources. - */ - def createExecutorInfo( - availableResources: JList[Resource], - execId: String): (MesosExecutorInfo, JList[Resource]) = { - val executorSparkHome = sc.conf.getOption("spark.mesos.executor.home") - .orElse(sc.getSparkHome()) // Fall back to driver Spark home for backward compatibility - .getOrElse { - throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!") - } - val environment = Environment.newBuilder() - sc.conf.getOption("spark.executor.extraClassPath").foreach { cp => - environment.addVariables( - Environment.Variable.newBuilder().setName("SPARK_CLASSPATH").setValue(cp).build()) - } - val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions").getOrElse("") - - val prefixEnv = sc.conf.getOption("spark.executor.extraLibraryPath").map { p => - Utils.libraryPathEnvPrefix(Seq(p)) - }.getOrElse("") - - environment.addVariables( - Environment.Variable.newBuilder() - .setName("SPARK_EXECUTOR_OPTS") - .setValue(extraJavaOpts) - .build()) - sc.executorEnvs.foreach { case (key, value) => - environment.addVariables(Environment.Variable.newBuilder() - .setName(key) - .setValue(value) - .build()) - } - val command = CommandInfo.newBuilder() - .setEnvironment(environment) - val uri = sc.conf.getOption("spark.executor.uri") - .orElse(Option(System.getenv("SPARK_EXECUTOR_URI"))) - - val executorBackendName = classOf[MesosExecutorBackend].getName - if (uri.isEmpty) { - val executorPath = new File(executorSparkHome, "/bin/spark-class").getPath - command.setValue(s"$prefixEnv $executorPath $executorBackendName") - } else { - // Grab everything to the first '.'. We'll use that and '*' to - // glob the directory "correctly". - val basename = uri.get.split('/').last.split('.').head - command.setValue(s"cd ${basename}*; $prefixEnv ./bin/spark-class $executorBackendName") - command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get)) - } - val builder = MesosExecutorInfo.newBuilder() - val (resourcesAfterCpu, usedCpuResources) = - partitionResources(availableResources, "cpus", mesosExecutorCores) - val (resourcesAfterMem, usedMemResources) = - partitionResources(resourcesAfterCpu.asJava, "mem", executorMemory(sc)) - - builder.addAllResources(usedCpuResources.asJava) - builder.addAllResources(usedMemResources.asJava) - - sc.conf.getOption("spark.mesos.uris").foreach(setupUris(_, command)) - - val executorInfo = builder - .setExecutorId(ExecutorID.newBuilder().setValue(execId).build()) - .setCommand(command) - .setData(ByteString.copyFrom(createExecArg())) - - sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image => - MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo( - image, - sc.conf, - executorInfo.getContainerBuilder() - ) - } - - (executorInfo.build(), resourcesAfterMem.asJava) - } - - /** - * Create and serialize the executor argument to pass to Mesos. Our executor arg is an array - * containing all the spark.* system properties in the form of (String, String) pairs. - */ - private def createExecArg(): Array[Byte] = { - if (execArgs == null) { - val props = new HashMap[String, String] - for ((key, value) <- sc.conf.getAll) { - props(key) = value - } - // Serialize the map as an array of (String, String) pairs - execArgs = Utils.serialize(props.toArray) - } - execArgs - } - - override def offerRescinded(d: org.apache.mesos.SchedulerDriver, o: OfferID) {} - - override def registered( - d: org.apache.mesos.SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { - inClassLoader() { - appId = frameworkId.getValue - logInfo("Registered as framework ID " + appId) - markRegistered() - } - } - - private def inClassLoader()(fun: => Unit) = { - val oldClassLoader = Thread.currentThread.getContextClassLoader - Thread.currentThread.setContextClassLoader(classLoader) - try { - fun - } finally { - Thread.currentThread.setContextClassLoader(oldClassLoader) - } - } - - override def disconnected(d: org.apache.mesos.SchedulerDriver) {} - - override def reregistered(d: org.apache.mesos.SchedulerDriver, masterInfo: MasterInfo) {} - - private def getTasksSummary(tasks: JArrayList[MesosTaskInfo]): String = { - val builder = new StringBuilder - tasks.asScala.foreach { t => - builder.append("Task id: ").append(t.getTaskId.getValue).append("\n") - .append("Slave id: ").append(t.getSlaveId.getValue).append("\n") - .append("Task resources: ").append(t.getResourcesList).append("\n") - .append("Executor resources: ").append(t.getExecutor.getResourcesList) - .append("---------------------------------------------\n") - } - builder.toString() - } - - /** - * Method called by Mesos to offer resources on slaves. We respond by asking our active task sets - * for tasks in order of priority. We fill each node with tasks in a round-robin manner so that - * tasks are balanced across the cluster. - */ - override def resourceOffers(d: org.apache.mesos.SchedulerDriver, offers: JList[Offer]) { - inClassLoader() { - // Fail first on offers with unmet constraints - val (offersMatchingConstraints, offersNotMatchingConstraints) = - offers.asScala.partition { o => - val offerAttributes = toAttributeMap(o.getAttributesList) - val meetsConstraints = - matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) - - // add some debug messaging - if (!meetsConstraints) { - val id = o.getId.getValue - logDebug(s"Declining offer: $id with attributes: $offerAttributes") - } - - meetsConstraints - } - - // These offers do not meet constraints. We don't need to see them again. - // Decline the offer for a long period of time. - offersNotMatchingConstraints.foreach { o => - d.declineOffer(o.getId, Filters.newBuilder() - .setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build()) - } - - // Of the matching constraints, see which ones give us enough memory and cores - val (usableOffers, unUsableOffers) = offersMatchingConstraints.partition { o => - val mem = getResource(o.getResourcesList, "mem") - val cpus = getResource(o.getResourcesList, "cpus") - val slaveId = o.getSlaveId.getValue - val offerAttributes = toAttributeMap(o.getAttributesList) - - // check offers for - // 1. Memory requirements - // 2. CPU requirements - need at least 1 for executor, 1 for task - val meetsMemoryRequirements = mem >= executorMemory(sc) - val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK) - val meetsRequirements = - (meetsMemoryRequirements && meetsCPURequirements) || - (slaveIdToExecutorInfo.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK) - val debugstr = if (meetsRequirements) "Accepting" else "Declining" - logDebug(s"$debugstr offer: ${o.getId.getValue} with attributes: " - + s"$offerAttributes mem: $mem cpu: $cpus") - - meetsRequirements - } - - // Decline offers we ruled out immediately - unUsableOffers.foreach(o => d.declineOffer(o.getId)) - - val workerOffers = usableOffers.map { o => - val cpus = if (slaveIdToExecutorInfo.contains(o.getSlaveId.getValue)) { - getResource(o.getResourcesList, "cpus").toInt - } else { - // If the Mesos executor has not been started on this slave yet, set aside a few - // cores for the Mesos executor by offering fewer cores to the Spark executor - (getResource(o.getResourcesList, "cpus") - mesosExecutorCores).toInt - } - new WorkerOffer( - o.getSlaveId.getValue, - o.getHostname, - cpus) - } - - val slaveIdToOffer = usableOffers.map(o => o.getSlaveId.getValue -> o).toMap - val slaveIdToWorkerOffer = workerOffers.map(o => o.executorId -> o).toMap - val slaveIdToResources = new HashMap[String, JList[Resource]]() - usableOffers.foreach { o => - slaveIdToResources(o.getSlaveId.getValue) = o.getResourcesList - } - - val mesosTasks = new HashMap[String, JArrayList[MesosTaskInfo]] - - val slavesIdsOfAcceptedOffers = HashSet[String]() - - // Call into the TaskSchedulerImpl - val acceptedOffers = scheduler.resourceOffers(workerOffers).filter(!_.isEmpty) - acceptedOffers - .foreach { offer => - offer.foreach { taskDesc => - val slaveId = taskDesc.executorId - slavesIdsOfAcceptedOffers += slaveId - taskIdToSlaveId(taskDesc.taskId) = slaveId - val (mesosTask, remainingResources) = createMesosTask( - taskDesc, - slaveIdToResources(slaveId), - slaveId) - mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo]) - .add(mesosTask) - slaveIdToResources(slaveId) = remainingResources - } - } - - // Reply to the offers - val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout? - - mesosTasks.foreach { case (slaveId, tasks) => - slaveIdToWorkerOffer.get(slaveId).foreach(o => - listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), slaveId, - // TODO: Add support for log urls for Mesos - new ExecutorInfo(o.host, o.cores, Map.empty))) - ) - logTrace(s"Launching Mesos tasks on slave '$slaveId', tasks:\n${getTasksSummary(tasks)}") - d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters) - } - - // Decline offers that weren't used - // NOTE: This logic assumes that we only get a single offer for each host in a given batch - for (o <- usableOffers if !slavesIdsOfAcceptedOffers.contains(o.getSlaveId.getValue)) { - d.declineOffer(o.getId) - } - } - } - - /** Turn a Spark TaskDescription into a Mesos task and also resources unused by the task */ - def createMesosTask( - task: TaskDescription, - resources: JList[Resource], - slaveId: String): (MesosTaskInfo, JList[Resource]) = { - val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build() - val (executorInfo, remainingResources) = if (slaveIdToExecutorInfo.contains(slaveId)) { - (slaveIdToExecutorInfo(slaveId), resources) - } else { - createExecutorInfo(resources, slaveId) - } - slaveIdToExecutorInfo(slaveId) = executorInfo - val (finalResources, cpuResources) = - partitionResources(remainingResources, "cpus", scheduler.CPUS_PER_TASK) - val taskInfo = MesosTaskInfo.newBuilder() - .setTaskId(taskId) - .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build()) - .setExecutor(executorInfo) - .setName(task.name) - .addAllResources(cpuResources.asJava) - .setData(MesosTaskLaunchData(task.serializedTask, task.attemptNumber).toByteString) - .build() - (taskInfo, finalResources.asJava) - } - - override def statusUpdate(d: org.apache.mesos.SchedulerDriver, status: TaskStatus) { - inClassLoader() { - val tid = status.getTaskId.getValue.toLong - val state = TaskState.fromMesos(status.getState) - synchronized { - if (TaskState.isFailed(TaskState.fromMesos(status.getState)) - && taskIdToSlaveId.contains(tid)) { - // We lost the executor on this slave, so remember that it's gone - removeExecutor(taskIdToSlaveId(tid), "Lost executor") - } - if (TaskState.isFinished(state)) { - taskIdToSlaveId.remove(tid) - } - } - scheduler.statusUpdate(tid, state, status.getData.asReadOnlyByteBuffer) - } - } - - override def error(d: org.apache.mesos.SchedulerDriver, message: String) { - inClassLoader() { - logError("Mesos error: " + message) - markErr() - scheduler.error(message) - } - } - - override def stop() { - if (mesosDriver != null) { - mesosDriver.stop() - } - } - - override def reviveOffers() { - mesosDriver.reviveOffers() - } - - override def frameworkMessage( - d: org.apache.mesos.SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} - - /** - * Remove executor associated with slaveId in a thread safe manner. - */ - private def removeExecutor(slaveId: String, reason: String) = { - synchronized { - listenerBus.post(SparkListenerExecutorRemoved(System.currentTimeMillis(), slaveId, reason)) - slaveIdToExecutorInfo -= slaveId - } - } - - private def recordSlaveLost( - d: org.apache.mesos.SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) { - inClassLoader() { - logInfo("Mesos slave lost: " + slaveId.getValue) - removeExecutor(slaveId.getValue, reason.toString) - scheduler.executorLost(slaveId.getValue, reason) - } - } - - override def slaveLost(d: org.apache.mesos.SchedulerDriver, slaveId: SlaveID) { - recordSlaveLost(d, slaveId, SlaveLost()) - } - - override def executorLost( - d: org.apache.mesos.SchedulerDriver, executorId: ExecutorID, slaveId: SlaveID, status: Int) { - logInfo("Executor lost: %s, marking slave %s as lost".format(executorId.getValue, - slaveId.getValue)) - recordSlaveLost(d, slaveId, ExecutorExited(status, exitCausedByApp = true)) - } - - override def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = { - mesosDriver.killTask( - TaskID.newBuilder() - .setValue(taskId.toString).build() - ) - } - - // TODO: query Mesos for number of cores - override def defaultParallelism(): Int = sc.conf.getInt("spark.default.parallelism", 8) - - override def applicationId(): String = - Option(appId).getOrElse { - logWarning("Application ID is not initialized yet.") - super.applicationId - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala deleted file mode 100644 index 3fe0674..0000000 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster.mesos - -import org.apache.mesos.Protos.{ContainerInfo, Image, Volume} -import org.apache.mesos.Protos.ContainerInfo.DockerInfo - -import org.apache.spark.{SparkConf, SparkException} -import org.apache.spark.internal.Logging - -/** - * A collection of utility functions which can be used by both the - * MesosSchedulerBackend and the [[MesosFineGrainedSchedulerBackend]]. - */ -private[mesos] object MesosSchedulerBackendUtil extends Logging { - /** - * Parse a comma-delimited list of volume specs, each of which - * takes the form [host-dir:]container-dir[:rw|:ro]. - */ - def parseVolumesSpec(volumes: String): List[Volume] = { - volumes.split(",").map(_.split(":")).flatMap { spec => - val vol: Volume.Builder = Volume - .newBuilder() - .setMode(Volume.Mode.RW) - spec match { - case Array(container_path) => - Some(vol.setContainerPath(container_path)) - case Array(container_path, "rw") => - Some(vol.setContainerPath(container_path)) - case Array(container_path, "ro") => - Some(vol.setContainerPath(container_path) - .setMode(Volume.Mode.RO)) - case Array(host_path, container_path) => - Some(vol.setContainerPath(container_path) - .setHostPath(host_path)) - case Array(host_path, container_path, "rw") => - Some(vol.setContainerPath(container_path) - .setHostPath(host_path)) - case Array(host_path, container_path, "ro") => - Some(vol.setContainerPath(container_path) - .setHostPath(host_path) - .setMode(Volume.Mode.RO)) - case spec => - logWarning(s"Unable to parse volume specs: $volumes. " - + "Expected form: \"[host-dir:]container-dir[:rw|:ro](, ...)\"") - None - } - } - .map { _.build() } - .toList - } - - /** - * Parse a comma-delimited list of port mapping specs, each of which - * takes the form host_port:container_port[:udp|:tcp] - * - * Note: - * the docker form is [ip:]host_port:container_port, but the DockerInfo - * message has no field for 'ip', and instead has a 'protocol' field. - * Docker itself only appears to support TCP, so this alternative form - * anticipates the expansion of the docker form to allow for a protocol - * and leaves open the chance for mesos to begin to accept an 'ip' field - */ - def parsePortMappingsSpec(portmaps: String): List[DockerInfo.PortMapping] = { - portmaps.split(",").map(_.split(":")).flatMap { spec: Array[String] => - val portmap: DockerInfo.PortMapping.Builder = DockerInfo.PortMapping - .newBuilder() - .setProtocol("tcp") - spec match { - case Array(host_port, container_port) => - Some(portmap.setHostPort(host_port.toInt) - .setContainerPort(container_port.toInt)) - case Array(host_port, container_port, protocol) => - Some(portmap.setHostPort(host_port.toInt) - .setContainerPort(container_port.toInt) - .setProtocol(protocol)) - case spec => - logWarning(s"Unable to parse port mapping specs: $portmaps. " - + "Expected form: \"host_port:container_port[:udp|:tcp](, ...)\"") - None - } - } - .map { _.build() } - .toList - } - - /** - * Construct a DockerInfo structure and insert it into a ContainerInfo - */ - def addDockerInfo( - container: ContainerInfo.Builder, - image: String, - containerizer: String, - forcePullImage: Boolean = false, - volumes: Option[List[Volume]] = None, - portmaps: Option[List[ContainerInfo.DockerInfo.PortMapping]] = None): Unit = { - - containerizer match { - case "docker" => - container.setType(ContainerInfo.Type.DOCKER) - val docker = ContainerInfo.DockerInfo.newBuilder() - .setImage(image) - .setForcePullImage(forcePullImage) - // TODO (mgummelt): Remove this. Portmaps have no effect, - // as we don't support bridge networking. - portmaps.foreach(_.foreach(docker.addPortMappings)) - container.setDocker(docker) - case "mesos" => - container.setType(ContainerInfo.Type.MESOS) - val imageProto = Image.newBuilder() - .setType(Image.Type.DOCKER) - .setDocker(Image.Docker.newBuilder().setName(image)) - .setCached(!forcePullImage) - container.setMesos(ContainerInfo.MesosInfo.newBuilder().setImage(imageProto)) - case _ => - throw new SparkException( - "spark.mesos.containerizer must be one of {\"docker\", \"mesos\"}") - } - - volumes.foreach(_.foreach(container.addVolumes)) - } - - /** - * Setup a docker containerizer from MesosDriverDescription scheduler properties - */ - def setupContainerBuilderDockerInfo( - imageName: String, - conf: SparkConf, - builder: ContainerInfo.Builder): Unit = { - val forcePullImage = conf - .getOption("spark.mesos.executor.docker.forcePullImage") - .exists(_.equals("true")) - val volumes = conf - .getOption("spark.mesos.executor.docker.volumes") - .map(parseVolumesSpec) - val portmaps = conf - .getOption("spark.mesos.executor.docker.portmaps") - .map(parsePortMappingsSpec) - - val containerizer = conf.get("spark.mesos.containerizer", "docker") - addDockerInfo( - builder, - imageName, - containerizer, - forcePullImage = forcePullImage, - volumes = volumes, - portmaps = portmaps) - logDebug("setupContainerDockerInfo: using docker image: " + imageName) - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
