[SPARK-16967] move mesos to module ## What changes were proposed in this pull request?
Move Mesos code into a mvn module ## How was this patch tested? unit tests manually submitting a client mode and cluster mode job spark/mesos integration test suite Author: Michael Gummelt <[email protected]> Closes #14637 from mgummelt/mesos-module. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8e5475be Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8e5475be Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8e5475be Branch: refs/heads/master Commit: 8e5475be3c9a620f18f6712631b093464a7d0ee7 Parents: c0949dc Author: Michael Gummelt <[email protected]> Authored: Fri Aug 26 12:25:22 2016 -0700 Committer: Marcelo Vanzin <[email protected]> Committed: Fri Aug 26 12:25:22 2016 -0700 ---------------------------------------------------------------------- .travis.yml | 2 +- assembly/pom.xml | 10 + core/pom.xml | 5 - .../scala/org/apache/spark/SparkContext.scala | 18 +- .../main/scala/org/apache/spark/TaskState.scala | 20 - .../deploy/mesos/MesosClusterDispatcher.scala | 115 --- .../mesos/MesosClusterDispatcherArguments.scala | 109 --- .../deploy/mesos/MesosDriverDescription.scala | 70 -- .../mesos/MesosExternalShuffleService.scala | 130 ---- .../spark/deploy/mesos/ui/DriverPage.scala | 179 ----- .../deploy/mesos/ui/MesosClusterPage.scala | 135 ---- .../spark/deploy/mesos/ui/MesosClusterUI.scala | 49 -- .../deploy/rest/mesos/MesosRestServer.scala | 157 ---- .../spark/executor/MesosExecutorBackend.scala | 130 ---- .../mesos/MesosClusterPersistenceEngine.scala | 134 ---- .../cluster/mesos/MesosClusterScheduler.scala | 745 ------------------- .../mesos/MesosClusterSchedulerSource.scala | 40 - .../MesosCoarseGrainedSchedulerBackend.scala | 642 ---------------- .../MesosFineGrainedSchedulerBackend.scala | 451 ----------- .../mesos/MesosSchedulerBackendUtil.scala | 165 ---- .../cluster/mesos/MesosSchedulerUtils.scala | 494 ------------ .../cluster/mesos/MesosTaskLaunchData.scala | 51 -- .../SparkContextSchedulerCreationSuite.scala | 28 - .../mesos/MesosClusterSchedulerSuite.scala | 213 ------ ...esosCoarseGrainedSchedulerBackendSuite.scala | 517 ------------- .../MesosFineGrainedSchedulerBackendSuite.scala | 385 ---------- .../mesos/MesosSchedulerUtilsSuite.scala | 255 ------- .../mesos/MesosTaskLaunchDataSuite.scala | 36 - .../spark/scheduler/cluster/mesos/Utils.scala | 85 --- dev/create-release/release-build.sh | 15 +- dev/lint-java | 2 +- dev/mima | 2 +- dev/scalastyle | 1 + dev/sparktestsupport/modules.py | 7 + dev/test-dependencies.sh | 2 +- docs/building-spark.md | 24 +- mesos/pom.xml | 109 +++ ...pache.spark.scheduler.ExternalClusterManager | 1 + .../deploy/mesos/MesosClusterDispatcher.scala | 115 +++ .../mesos/MesosClusterDispatcherArguments.scala | 109 +++ .../deploy/mesos/MesosDriverDescription.scala | 70 ++ .../mesos/MesosExternalShuffleService.scala | 130 ++++ .../spark/deploy/mesos/ui/DriverPage.scala | 179 +++++ .../deploy/mesos/ui/MesosClusterPage.scala | 135 ++++ .../spark/deploy/mesos/ui/MesosClusterUI.scala | 49 ++ .../deploy/rest/mesos/MesosRestServer.scala | 157 ++++ .../spark/executor/MesosExecutorBackend.scala | 131 ++++ .../cluster/mesos/MesosClusterManager.scala | 60 ++ .../mesos/MesosClusterPersistenceEngine.scala | 134 ++++ .../cluster/mesos/MesosClusterScheduler.scala | 745 +++++++++++++++++++ .../mesos/MesosClusterSchedulerSource.scala | 40 + .../MesosCoarseGrainedSchedulerBackend.scala | 642 ++++++++++++++++ .../MesosFineGrainedSchedulerBackend.scala | 451 +++++++++++ .../mesos/MesosSchedulerBackendUtil.scala | 165 ++++ .../cluster/mesos/MesosSchedulerUtils.scala | 514 +++++++++++++ .../cluster/mesos/MesosTaskLaunchData.scala | 51 ++ .../mesos/MesosClusterManagerSuite.scala | 47 ++ .../mesos/MesosClusterSchedulerSuite.scala | 213 ++++++ ...esosCoarseGrainedSchedulerBackendSuite.scala | 517 +++++++++++++ .../MesosFineGrainedSchedulerBackendSuite.scala | 385 ++++++++++ .../mesos/MesosSchedulerUtilsSuite.scala | 255 +++++++ .../mesos/MesosTaskLaunchDataSuite.scala | 36 + .../spark/scheduler/cluster/mesos/Utils.scala | 85 +++ pom.xml | 21 +- project/MimaExcludes.scala | 4 +- project/SparkBuild.scala | 4 +- 66 files changed, 5582 insertions(+), 5395 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index c16f763..8739849 100644 --- a/.travis.yml +++ b/.travis.yml @@ -44,7 +44,7 @@ notifications: # 5. Run maven install before running lint-java. install: - export MAVEN_SKIP_RC=1 - - build/mvn -T 4 -q -DskipTests -Pyarn -Phadoop-2.3 -Pkinesis-asl -Phive -Phive-thriftserver install + - build/mvn -T 4 -q -DskipTests -Pmesos -Pyarn -Phadoop-2.3 -Pkinesis-asl -Phive -Phive-thriftserver install # 6. Run lint-java. script: http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/assembly/pom.xml ---------------------------------------------------------------------- diff --git a/assembly/pom.xml b/assembly/pom.xml index 971a62f..ec243ea 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -139,6 +139,16 @@ </dependencies> </profile> <profile> + <id>mesos</id> + <dependencies> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-mesos_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + </profile> + <profile> <id>hive</id> <dependencies> <dependency> http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/pom.xml ---------------------------------------------------------------------- diff --git a/core/pom.xml b/core/pom.xml index ab6c3ce..c04cf7e 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -216,11 +216,6 @@ <artifactId>jersey-container-servlet-core</artifactId> </dependency> <dependency> - <groupId>org.apache.mesos</groupId> - <artifactId>mesos</artifactId> - <classifier>${mesos.classifier}</classifier> - </dependency> - <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 2eaeab1..08d6343 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -42,7 +42,6 @@ import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, Sequence TextInputFormat} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob} import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} -import org.apache.mesos.MesosNativeLibrary import org.apache.spark.annotation.DeveloperApi import org.apache.spark.broadcast.Broadcast @@ -56,7 +55,6 @@ import org.apache.spark.rdd._ import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, StandaloneSchedulerBackend} -import org.apache.spark.scheduler.cluster.mesos.{MesosCoarseGrainedSchedulerBackend, MesosFineGrainedSchedulerBackend} import org.apache.spark.scheduler.local.LocalSchedulerBackend import org.apache.spark.storage._ import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump @@ -2512,18 +2510,6 @@ object SparkContext extends Logging { } (backend, scheduler) - case MESOS_REGEX(mesosUrl) => - MesosNativeLibrary.load() - val scheduler = new TaskSchedulerImpl(sc) - val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", defaultValue = true) - val backend = if (coarseGrained) { - new MesosCoarseGrainedSchedulerBackend(scheduler, sc, mesosUrl, sc.env.securityManager) - } else { - new MesosFineGrainedSchedulerBackend(scheduler, sc, mesosUrl) - } - scheduler.initialize(backend) - (backend, scheduler) - case masterUrl => val cm = getClusterManager(masterUrl) match { case Some(clusterMgr) => clusterMgr @@ -2545,7 +2531,7 @@ object SparkContext extends Logging { private def getClusterManager(url: String): Option[ExternalClusterManager] = { val loader = Utils.getContextOrSparkClassLoader val serviceLoaders = - ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url)) + ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url)) if (serviceLoaders.size > 1) { throw new SparkException(s"Multiple Cluster Managers ($serviceLoaders) registered " + s"for the url $url:") @@ -2566,8 +2552,6 @@ private object SparkMasterRegex { val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r // Regular expression for connecting to Spark deploy clusters val SPARK_REGEX = """spark://(.*)""".r - // Regular expression for connection to Mesos cluster by mesos:// or mesos://zk:// url - val MESOS_REGEX = """mesos://(.*)""".r } /** http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/src/main/scala/org/apache/spark/TaskState.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/TaskState.scala b/core/src/main/scala/org/apache/spark/TaskState.scala index d232fae..cbace7b 100644 --- a/core/src/main/scala/org/apache/spark/TaskState.scala +++ b/core/src/main/scala/org/apache/spark/TaskState.scala @@ -17,8 +17,6 @@ package org.apache.spark -import org.apache.mesos.Protos.{TaskState => MesosTaskState} - private[spark] object TaskState extends Enumeration { val LAUNCHING, RUNNING, FINISHED, FAILED, KILLED, LOST = Value @@ -30,22 +28,4 @@ private[spark] object TaskState extends Enumeration { def isFailed(state: TaskState): Boolean = (LOST == state) || (FAILED == state) def isFinished(state: TaskState): Boolean = FINISHED_STATES.contains(state) - - def toMesos(state: TaskState): MesosTaskState = state match { - case LAUNCHING => MesosTaskState.TASK_STARTING - case RUNNING => MesosTaskState.TASK_RUNNING - case FINISHED => MesosTaskState.TASK_FINISHED - case FAILED => MesosTaskState.TASK_FAILED - case KILLED => MesosTaskState.TASK_KILLED - case LOST => MesosTaskState.TASK_LOST - } - - def fromMesos(mesosState: MesosTaskState): TaskState = mesosState match { - case MesosTaskState.TASK_STAGING | MesosTaskState.TASK_STARTING => LAUNCHING - case MesosTaskState.TASK_RUNNING | MesosTaskState.TASK_KILLING => RUNNING - case MesosTaskState.TASK_FINISHED => FINISHED - case MesosTaskState.TASK_FAILED => FAILED - case MesosTaskState.TASK_KILLED => KILLED - case MesosTaskState.TASK_LOST | MesosTaskState.TASK_ERROR => LOST - } } http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala deleted file mode 100644 index 73b6ca3..0000000 --- a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.mesos - -import java.util.concurrent.CountDownLatch - -import org.apache.spark.{SecurityManager, SparkConf} -import org.apache.spark.deploy.mesos.ui.MesosClusterUI -import org.apache.spark.deploy.rest.mesos.MesosRestServer -import org.apache.spark.internal.Logging -import org.apache.spark.scheduler.cluster.mesos._ -import org.apache.spark.util.{ShutdownHookManager, Utils} - -/* - * A dispatcher that is responsible for managing and launching drivers, and is intended to be - * used for Mesos cluster mode. The dispatcher is a long-running process started by the user in - * the cluster independently of Spark applications. - * It contains a [[MesosRestServer]] that listens for requests to submit drivers and a - * [[MesosClusterScheduler]] that processes these requests by negotiating with the Mesos master - * for resources. - * - * A typical new driver lifecycle is the following: - * - Driver submitted via spark-submit talking to the [[MesosRestServer]] - * - [[MesosRestServer]] queues the driver request to [[MesosClusterScheduler]] - * - [[MesosClusterScheduler]] gets resource offers and launches the drivers that are in queue - * - * This dispatcher supports both Mesos fine-grain or coarse-grain mode as the mode is configurable - * per driver launched. - * This class is needed since Mesos doesn't manage frameworks, so the dispatcher acts as - * a daemon to launch drivers as Mesos frameworks upon request. The dispatcher is also started and - * stopped by sbin/start-mesos-dispatcher and sbin/stop-mesos-dispatcher respectively. - */ -private[mesos] class MesosClusterDispatcher( - args: MesosClusterDispatcherArguments, - conf: SparkConf) - extends Logging { - - private val publicAddress = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(args.host) - private val recoveryMode = conf.get("spark.deploy.recoveryMode", "NONE").toUpperCase() - logInfo("Recovery mode in Mesos dispatcher set to: " + recoveryMode) - - private val engineFactory = recoveryMode match { - case "NONE" => new BlackHoleMesosClusterPersistenceEngineFactory - case "ZOOKEEPER" => new ZookeeperMesosClusterPersistenceEngineFactory(conf) - case _ => throw new IllegalArgumentException("Unsupported recovery mode: " + recoveryMode) - } - - private val scheduler = new MesosClusterScheduler(engineFactory, conf) - - private val server = new MesosRestServer(args.host, args.port, conf, scheduler) - private val webUi = new MesosClusterUI( - new SecurityManager(conf), - args.webUiPort, - conf, - publicAddress, - scheduler) - - private val shutdownLatch = new CountDownLatch(1) - - def start(): Unit = { - webUi.bind() - scheduler.frameworkUrl = conf.get("spark.mesos.dispatcher.webui.url", webUi.activeWebUiUrl) - scheduler.start() - server.start() - } - - def awaitShutdown(): Unit = { - shutdownLatch.await() - } - - def stop(): Unit = { - webUi.stop() - server.stop() - scheduler.stop() - shutdownLatch.countDown() - } -} - -private[mesos] object MesosClusterDispatcher extends Logging { - def main(args: Array[String]) { - Utils.initDaemon(log) - val conf = new SparkConf - val dispatcherArgs = new MesosClusterDispatcherArguments(args, conf) - conf.setMaster(dispatcherArgs.masterUrl) - conf.setAppName(dispatcherArgs.name) - dispatcherArgs.zookeeperUrl.foreach { z => - conf.set("spark.deploy.recoveryMode", "ZOOKEEPER") - conf.set("spark.deploy.zookeeper.url", z) - } - val dispatcher = new MesosClusterDispatcher(dispatcherArgs, conf) - dispatcher.start() - logDebug("Adding shutdown hook") // force eager creation of logger - ShutdownHookManager.addShutdownHook { () => - logInfo("Shutdown hook is shutting down dispatcher") - dispatcher.stop() - dispatcher.awaitShutdown() - } - dispatcher.awaitShutdown() - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala deleted file mode 100644 index 11e1344..0000000 --- a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.mesos - -import scala.annotation.tailrec - -import org.apache.spark.SparkConf -import org.apache.spark.util.{IntParam, Utils} - - -private[mesos] class MesosClusterDispatcherArguments(args: Array[String], conf: SparkConf) { - var host = Utils.localHostName() - var port = 7077 - var name = "Spark Cluster" - var webUiPort = 8081 - var masterUrl: String = _ - var zookeeperUrl: Option[String] = None - var propertiesFile: String = _ - - parse(args.toList) - - propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile) - - @tailrec - private def parse(args: List[String]): Unit = args match { - case ("--host" | "-h") :: value :: tail => - Utils.checkHost(value, "Please use hostname " + value) - host = value - parse(tail) - - case ("--port" | "-p") :: IntParam(value) :: tail => - port = value - parse(tail) - - case ("--webui-port") :: IntParam(value) :: tail => - webUiPort = value - parse(tail) - - case ("--zk" | "-z") :: value :: tail => - zookeeperUrl = Some(value) - parse(tail) - - case ("--master" | "-m") :: value :: tail => - if (!value.startsWith("mesos://")) { - // scalastyle:off println - System.err.println("Cluster dispatcher only supports mesos (uri begins with mesos://)") - // scalastyle:on println - System.exit(1) - } - masterUrl = value.stripPrefix("mesos://") - parse(tail) - - case ("--name") :: value :: tail => - name = value - parse(tail) - - case ("--properties-file") :: value :: tail => - propertiesFile = value - parse(tail) - - case ("--help") :: tail => - printUsageAndExit(0) - - case Nil => - if (masterUrl == null) { - // scalastyle:off println - System.err.println("--master is required") - // scalastyle:on println - printUsageAndExit(1) - } - - case _ => - printUsageAndExit(1) - } - - private def printUsageAndExit(exitCode: Int): Unit = { - // scalastyle:off println - System.err.println( - "Usage: MesosClusterDispatcher [options]\n" + - "\n" + - "Options:\n" + - " -h HOST, --host HOST Hostname to listen on\n" + - " -p PORT, --port PORT Port to listen on (default: 7077)\n" + - " --webui-port WEBUI_PORT WebUI Port to listen on (default: 8081)\n" + - " --name NAME Framework name to show in Mesos UI\n" + - " -m --master MASTER URI for connecting to Mesos master\n" + - " -z --zk ZOOKEEPER Comma delimited URLs for connecting to \n" + - " Zookeeper for persistence\n" + - " --properties-file FILE Path to a custom Spark properties file.\n" + - " Default is conf/spark-defaults.conf.") - // scalastyle:on println - System.exit(exitCode) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala deleted file mode 100644 index d4c7022..0000000 --- a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.mesos - -import java.util.Date - -import org.apache.spark.SparkConf -import org.apache.spark.deploy.Command -import org.apache.spark.scheduler.cluster.mesos.MesosClusterRetryState - -/** - * Describes a Spark driver that is submitted from the - * [[org.apache.spark.deploy.rest.mesos.MesosRestServer]], to be launched by - * [[org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler]]. - * @param jarUrl URL to the application jar - * @param mem Amount of memory for the driver - * @param cores Number of cores for the driver - * @param supervise Supervise the driver for long running app - * @param command The command to launch the driver. - * @param schedulerProperties Extra properties to pass the Mesos scheduler - */ -private[spark] class MesosDriverDescription( - val name: String, - val jarUrl: String, - val mem: Int, - val cores: Double, - val supervise: Boolean, - val command: Command, - schedulerProperties: Map[String, String], - val submissionId: String, - val submissionDate: Date, - val retryState: Option[MesosClusterRetryState] = None) - extends Serializable { - - val conf = new SparkConf(false) - schedulerProperties.foreach {case (k, v) => conf.set(k, v)} - - def copy( - name: String = name, - jarUrl: String = jarUrl, - mem: Int = mem, - cores: Double = cores, - supervise: Boolean = supervise, - command: Command = command, - schedulerProperties: SparkConf = conf, - submissionId: String = submissionId, - submissionDate: Date = submissionDate, - retryState: Option[MesosClusterRetryState] = retryState): MesosDriverDescription = { - - new MesosDriverDescription(name, jarUrl, mem, cores, supervise, command, conf.getAll.toMap, - submissionId, submissionDate, retryState) - } - - override def toString: String = s"MesosDriverDescription (${command.mainClass})" -} http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala deleted file mode 100644 index 6b297c4..0000000 --- a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.mesos - -import java.nio.ByteBuffer -import java.util.concurrent.{ConcurrentHashMap, TimeUnit} - -import scala.collection.JavaConverters._ - -import org.apache.spark.{SecurityManager, SparkConf} -import org.apache.spark.deploy.ExternalShuffleService -import org.apache.spark.internal.Logging -import org.apache.spark.network.client.{RpcResponseCallback, TransportClient} -import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler -import org.apache.spark.network.shuffle.protocol.BlockTransferMessage -import org.apache.spark.network.shuffle.protocol.mesos.{RegisterDriver, ShuffleServiceHeartbeat} -import org.apache.spark.network.util.TransportConf -import org.apache.spark.util.ThreadUtils - -/** - * An RPC endpoint that receives registration requests from Spark drivers running on Mesos. - * It detects driver termination and calls the cleanup callback to [[ExternalShuffleService]]. - */ -private[mesos] class MesosExternalShuffleBlockHandler( - transportConf: TransportConf, - cleanerIntervalS: Long) - extends ExternalShuffleBlockHandler(transportConf, null) with Logging { - - ThreadUtils.newDaemonSingleThreadScheduledExecutor("shuffle-cleaner-watcher") - .scheduleAtFixedRate(new CleanerThread(), 0, cleanerIntervalS, TimeUnit.SECONDS) - - // Stores a map of app id to app state (timeout value and last heartbeat) - private val connectedApps = new ConcurrentHashMap[String, AppState]() - - protected override def handleMessage( - message: BlockTransferMessage, - client: TransportClient, - callback: RpcResponseCallback): Unit = { - message match { - case RegisterDriverParam(appId, appState) => - val address = client.getSocketAddress - val timeout = appState.heartbeatTimeout - logInfo(s"Received registration request from app $appId (remote address $address, " + - s"heartbeat timeout $timeout ms).") - if (connectedApps.containsKey(appId)) { - logWarning(s"Received a registration request from app $appId, but it was already " + - s"registered") - } - connectedApps.put(appId, appState) - callback.onSuccess(ByteBuffer.allocate(0)) - case Heartbeat(appId) => - val address = client.getSocketAddress - Option(connectedApps.get(appId)) match { - case Some(existingAppState) => - logTrace(s"Received ShuffleServiceHeartbeat from app '$appId' (remote " + - s"address $address).") - existingAppState.lastHeartbeat = System.nanoTime() - case None => - logWarning(s"Received ShuffleServiceHeartbeat from an unknown app (remote " + - s"address $address, appId '$appId').") - } - case _ => super.handleMessage(message, client, callback) - } - } - - /** An extractor object for matching [[RegisterDriver]] message. */ - private object RegisterDriverParam { - def unapply(r: RegisterDriver): Option[(String, AppState)] = - Some((r.getAppId, new AppState(r.getHeartbeatTimeoutMs, System.nanoTime()))) - } - - private object Heartbeat { - def unapply(h: ShuffleServiceHeartbeat): Option[String] = Some(h.getAppId) - } - - private class AppState(val heartbeatTimeout: Long, @volatile var lastHeartbeat: Long) - - private class CleanerThread extends Runnable { - override def run(): Unit = { - val now = System.nanoTime() - connectedApps.asScala.foreach { case (appId, appState) => - if (now - appState.lastHeartbeat > appState.heartbeatTimeout * 1000 * 1000) { - logInfo(s"Application $appId timed out. Removing shuffle files.") - connectedApps.remove(appId) - applicationRemoved(appId, true) - } - } - } - } -} - -/** - * A wrapper of [[ExternalShuffleService]] that provides an additional endpoint for drivers - * to associate with. This allows the shuffle service to detect when a driver is terminated - * and can clean up the associated shuffle files. - */ -private[mesos] class MesosExternalShuffleService(conf: SparkConf, securityManager: SecurityManager) - extends ExternalShuffleService(conf, securityManager) { - - protected override def newShuffleBlockHandler( - conf: TransportConf): ExternalShuffleBlockHandler = { - val cleanerIntervalS = this.conf.getTimeAsSeconds("spark.shuffle.cleaner.interval", "30s") - new MesosExternalShuffleBlockHandler(conf, cleanerIntervalS) - } -} - -private[spark] object MesosExternalShuffleService extends Logging { - - def main(args: Array[String]): Unit = { - ExternalShuffleService.main(args, - (conf: SparkConf, sm: SecurityManager) => new MesosExternalShuffleService(conf, sm)) - } -} - - http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala deleted file mode 100644 index cd98110..0000000 --- a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.mesos.ui - -import javax.servlet.http.HttpServletRequest - -import scala.xml.Node - -import org.apache.spark.deploy.Command -import org.apache.spark.deploy.mesos.MesosDriverDescription -import org.apache.spark.scheduler.cluster.mesos.{MesosClusterRetryState, MesosClusterSubmissionState} -import org.apache.spark.ui.{UIUtils, WebUIPage} - -private[ui] class DriverPage(parent: MesosClusterUI) extends WebUIPage("driver") { - - override def render(request: HttpServletRequest): Seq[Node] = { - val driverId = request.getParameter("id") - require(driverId != null && driverId.nonEmpty, "Missing id parameter") - - val state = parent.scheduler.getDriverState(driverId) - if (state.isEmpty) { - val content = - <div> - <p>Cannot find driver {driverId}</p> - </div> - return UIUtils.basicSparkPage(content, s"Details for Job $driverId") - } - val driverState = state.get - val driverHeaders = Seq("Driver property", "Value") - val schedulerHeaders = Seq("Scheduler property", "Value") - val commandEnvHeaders = Seq("Command environment variable", "Value") - val launchedHeaders = Seq("Launched property", "Value") - val commandHeaders = Seq("Command property", "Value") - val retryHeaders = Seq("Last failed status", "Next retry time", "Retry count") - val driverDescription = Iterable.apply(driverState.description) - val submissionState = Iterable.apply(driverState.submissionState) - val command = Iterable.apply(driverState.description.command) - val schedulerProperties = Iterable.apply(driverState.description.conf.getAll.toMap) - val commandEnv = Iterable.apply(driverState.description.command.environment) - val driverTable = - UIUtils.listingTable(driverHeaders, driverRow, driverDescription) - val commandTable = - UIUtils.listingTable(commandHeaders, commandRow, command) - val commandEnvTable = - UIUtils.listingTable(commandEnvHeaders, propertiesRow, commandEnv) - val schedulerTable = - UIUtils.listingTable(schedulerHeaders, propertiesRow, schedulerProperties) - val launchedTable = - UIUtils.listingTable(launchedHeaders, launchedRow, submissionState) - val retryTable = - UIUtils.listingTable( - retryHeaders, retryRow, Iterable.apply(driverState.description.retryState)) - val content = - <p>Driver state information for driver id {driverId}</p> - <a href={UIUtils.prependBaseUri("/")}>Back to Drivers</a> - <div class="row-fluid"> - <div class="span12"> - <h4>Driver state: {driverState.state}</h4> - <h4>Driver properties</h4> - {driverTable} - <h4>Driver command</h4> - {commandTable} - <h4>Driver command environment</h4> - {commandEnvTable} - <h4>Scheduler properties</h4> - {schedulerTable} - <h4>Launched state</h4> - {launchedTable} - <h4>Retry state</h4> - {retryTable} - </div> - </div>; - - UIUtils.basicSparkPage(content, s"Details for Job $driverId") - } - - private def launchedRow(submissionState: Option[MesosClusterSubmissionState]): Seq[Node] = { - submissionState.map { state => - <tr> - <td>Mesos Slave ID</td> - <td>{state.slaveId.getValue}</td> - </tr> - <tr> - <td>Mesos Task ID</td> - <td>{state.taskId.getValue}</td> - </tr> - <tr> - <td>Launch Time</td> - <td>{state.startDate}</td> - </tr> - <tr> - <td>Finish Time</td> - <td>{state.finishDate.map(_.toString).getOrElse("")}</td> - </tr> - <tr> - <td>Last Task Status</td> - <td>{state.mesosTaskStatus.map(_.toString).getOrElse("")}</td> - </tr> - }.getOrElse(Seq[Node]()) - } - - private def propertiesRow(properties: collection.Map[String, String]): Seq[Node] = { - properties.map { case (k, v) => - <tr> - <td>{k}</td><td>{v}</td> - </tr> - }.toSeq - } - - private def commandRow(command: Command): Seq[Node] = { - <tr> - <td>Main class</td><td>{command.mainClass}</td> - </tr> - <tr> - <td>Arguments</td><td>{command.arguments.mkString(" ")}</td> - </tr> - <tr> - <td>Class path entries</td><td>{command.classPathEntries.mkString(" ")}</td> - </tr> - <tr> - <td>Java options</td><td>{command.javaOpts.mkString((" "))}</td> - </tr> - <tr> - <td>Library path entries</td><td>{command.libraryPathEntries.mkString((" "))}</td> - </tr> - } - - private def driverRow(driver: MesosDriverDescription): Seq[Node] = { - <tr> - <td>Name</td><td>{driver.name}</td> - </tr> - <tr> - <td>Id</td><td>{driver.submissionId}</td> - </tr> - <tr> - <td>Cores</td><td>{driver.cores}</td> - </tr> - <tr> - <td>Memory</td><td>{driver.mem}</td> - </tr> - <tr> - <td>Submitted</td><td>{driver.submissionDate}</td> - </tr> - <tr> - <td>Supervise</td><td>{driver.supervise}</td> - </tr> - } - - private def retryRow(retryState: Option[MesosClusterRetryState]): Seq[Node] = { - retryState.map { state => - <tr> - <td> - {state.lastFailureStatus} - </td> - <td> - {state.nextRetry} - </td> - <td> - {state.retries} - </td> - </tr> - }.getOrElse(Seq[Node]()) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala deleted file mode 100644 index 8dcbdaa..0000000 --- a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.mesos.ui - -import javax.servlet.http.HttpServletRequest - -import scala.xml.Node - -import org.apache.mesos.Protos.TaskStatus - -import org.apache.spark.deploy.mesos.MesosDriverDescription -import org.apache.spark.scheduler.cluster.mesos.MesosClusterSubmissionState -import org.apache.spark.ui.{UIUtils, WebUIPage} - -private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage("") { - private val historyServerURL = parent.conf.getOption("spark.mesos.dispatcher.historyServer.url") - - def render(request: HttpServletRequest): Seq[Node] = { - val state = parent.scheduler.getSchedulerState() - - val driverHeader = Seq("Driver ID") - val historyHeader = historyServerURL.map(url => Seq("History")).getOrElse(Nil) - val submissionHeader = Seq("Submit Date", "Main Class", "Driver Resources") - - val queuedHeaders = driverHeader ++ submissionHeader - val driverHeaders = driverHeader ++ historyHeader ++ submissionHeader ++ - Seq("Start Date", "Mesos Slave ID", "State") - val retryHeaders = Seq("Driver ID", "Submit Date", "Description") ++ - Seq("Last Failed Status", "Next Retry Time", "Attempt Count") - val queuedTable = UIUtils.listingTable(queuedHeaders, queuedRow, state.queuedDrivers) - val launchedTable = UIUtils.listingTable(driverHeaders, driverRow, state.launchedDrivers) - val finishedTable = UIUtils.listingTable(driverHeaders, driverRow, state.finishedDrivers) - val retryTable = UIUtils.listingTable(retryHeaders, retryRow, state.pendingRetryDrivers) - val content = - <p>Mesos Framework ID: {state.frameworkId}</p> - <div class="row-fluid"> - <div class="span12"> - <h4>Queued Drivers:</h4> - {queuedTable} - <h4>Launched Drivers:</h4> - {launchedTable} - <h4>Finished Drivers:</h4> - {finishedTable} - <h4>Supervise drivers waiting for retry:</h4> - {retryTable} - </div> - </div>; - UIUtils.basicSparkPage(content, "Spark Drivers for Mesos cluster") - } - - private def queuedRow(submission: MesosDriverDescription): Seq[Node] = { - val id = submission.submissionId - <tr> - <td><a href={s"driver?id=$id"}>{id}</a></td> - <td>{submission.submissionDate}</td> - <td>{submission.command.mainClass}</td> - <td>cpus: {submission.cores}, mem: {submission.mem}</td> - </tr> - } - - private def driverRow(state: MesosClusterSubmissionState): Seq[Node] = { - val id = state.driverDescription.submissionId - - val historyCol = if (historyServerURL.isDefined) { - <td> - <a href={s"${historyServerURL.get}/history/${state.frameworkId}"}> - {state.frameworkId} - </a> - </td> - } else Nil - - <tr> - <td><a href={s"driver?id=$id"}>{id}</a></td> - {historyCol} - <td>{state.driverDescription.submissionDate}</td> - <td>{state.driverDescription.command.mainClass}</td> - <td>cpus: {state.driverDescription.cores}, mem: {state.driverDescription.mem}</td> - <td>{state.startDate}</td> - <td>{state.slaveId.getValue}</td> - <td>{stateString(state.mesosTaskStatus)}</td> - </tr> - } - - private def retryRow(submission: MesosDriverDescription): Seq[Node] = { - val id = submission.submissionId - <tr> - <td><a href={s"driver?id=$id"}>{id}</a></td> - <td>{submission.submissionDate}</td> - <td>{submission.command.mainClass}</td> - <td>{submission.retryState.get.lastFailureStatus}</td> - <td>{submission.retryState.get.nextRetry}</td> - <td>{submission.retryState.get.retries}</td> - </tr> - } - - private def stateString(status: Option[TaskStatus]): String = { - if (status.isEmpty) { - return "" - } - val sb = new StringBuilder - val s = status.get - sb.append(s"State: ${s.getState}") - if (status.get.hasMessage) { - sb.append(s", Message: ${s.getMessage}") - } - if (status.get.hasHealthy) { - sb.append(s", Healthy: ${s.getHealthy}") - } - if (status.get.hasSource) { - sb.append(s", Source: ${s.getSource}") - } - if (status.get.hasReason) { - sb.append(s", Reason: ${s.getReason}") - } - if (status.get.hasTimestamp) { - sb.append(s", Time: ${s.getTimestamp}") - } - sb.toString() - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala deleted file mode 100644 index 6049789..0000000 --- a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.mesos.ui - -import org.apache.spark.{SecurityManager, SparkConf} -import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler -import org.apache.spark.ui.{SparkUI, WebUI} -import org.apache.spark.ui.JettyUtils._ - -/** - * UI that displays driver results from the [[org.apache.spark.deploy.mesos.MesosClusterDispatcher]] - */ -private[spark] class MesosClusterUI( - securityManager: SecurityManager, - port: Int, - val conf: SparkConf, - dispatcherPublicAddress: String, - val scheduler: MesosClusterScheduler) - extends WebUI(securityManager, securityManager.getSSLOptions("mesos"), port, conf) { - - initialize() - - def activeWebUiUrl: String = "http://" + dispatcherPublicAddress + ":" + boundPort - - override def initialize() { - attachPage(new MesosClusterPage(this)) - attachPage(new DriverPage(this)) - attachHandler(createStaticHandler(MesosClusterUI.STATIC_RESOURCE_DIR, "/static")) - } -} - -private object MesosClusterUI { - val STATIC_RESOURCE_DIR = SparkUI.STATIC_RESOURCE_DIR -} http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala deleted file mode 100644 index 3b96488..0000000 --- a/core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.rest.mesos - -import java.io.File -import java.text.SimpleDateFormat -import java.util.Date -import java.util.concurrent.atomic.AtomicLong -import javax.servlet.http.HttpServletResponse - -import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf} -import org.apache.spark.deploy.Command -import org.apache.spark.deploy.mesos.MesosDriverDescription -import org.apache.spark.deploy.rest._ -import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler -import org.apache.spark.util.Utils - -/** - * A server that responds to requests submitted by the [[RestSubmissionClient]]. - * All requests are forwarded to - * [[org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler]]. - * This is intended to be used in Mesos cluster mode only. - * For more details about the REST submission please refer to [[RestSubmissionServer]] javadocs. - */ -private[spark] class MesosRestServer( - host: String, - requestedPort: Int, - masterConf: SparkConf, - scheduler: MesosClusterScheduler) - extends RestSubmissionServer(host, requestedPort, masterConf) { - - protected override val submitRequestServlet = - new MesosSubmitRequestServlet(scheduler, masterConf) - protected override val killRequestServlet = - new MesosKillRequestServlet(scheduler, masterConf) - protected override val statusRequestServlet = - new MesosStatusRequestServlet(scheduler, masterConf) -} - -private[mesos] class MesosSubmitRequestServlet( - scheduler: MesosClusterScheduler, - conf: SparkConf) - extends SubmitRequestServlet { - - private val DEFAULT_SUPERVISE = false - private val DEFAULT_MEMORY = Utils.DEFAULT_DRIVER_MEM_MB // mb - private val DEFAULT_CORES = 1.0 - - private val nextDriverNumber = new AtomicLong(0) - private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs - private def newDriverId(submitDate: Date): String = { - "driver-%s-%04d".format( - createDateFormat.format(submitDate), nextDriverNumber.incrementAndGet()) - } - - /** - * Build a driver description from the fields specified in the submit request. - * - * This involves constructing a command that launches a mesos framework for the job. - * This does not currently consider fields used by python applications since python - * is not supported in mesos cluster mode yet. - */ - private def buildDriverDescription(request: CreateSubmissionRequest): MesosDriverDescription = { - // Required fields, including the main class because python is not yet supported - val appResource = Option(request.appResource).getOrElse { - throw new SubmitRestMissingFieldException("Application jar is missing.") - } - val mainClass = Option(request.mainClass).getOrElse { - throw new SubmitRestMissingFieldException("Main class is missing.") - } - - // Optional fields - val sparkProperties = request.sparkProperties - val driverExtraJavaOptions = sparkProperties.get("spark.driver.extraJavaOptions") - val driverExtraClassPath = sparkProperties.get("spark.driver.extraClassPath") - val driverExtraLibraryPath = sparkProperties.get("spark.driver.extraLibraryPath") - val superviseDriver = sparkProperties.get("spark.driver.supervise") - val driverMemory = sparkProperties.get("spark.driver.memory") - val driverCores = sparkProperties.get("spark.driver.cores") - val appArgs = request.appArgs - val environmentVariables = request.environmentVariables - val name = request.sparkProperties.getOrElse("spark.app.name", mainClass) - - // Construct driver description - val conf = new SparkConf(false).setAll(sparkProperties) - val extraClassPath = driverExtraClassPath.toSeq.flatMap(_.split(File.pathSeparator)) - val extraLibraryPath = driverExtraLibraryPath.toSeq.flatMap(_.split(File.pathSeparator)) - val extraJavaOpts = driverExtraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty) - val sparkJavaOpts = Utils.sparkJavaOpts(conf) - val javaOpts = sparkJavaOpts ++ extraJavaOpts - val command = new Command( - mainClass, appArgs, environmentVariables, extraClassPath, extraLibraryPath, javaOpts) - val actualSuperviseDriver = superviseDriver.map(_.toBoolean).getOrElse(DEFAULT_SUPERVISE) - val actualDriverMemory = driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY) - val actualDriverCores = driverCores.map(_.toDouble).getOrElse(DEFAULT_CORES) - val submitDate = new Date() - val submissionId = newDriverId(submitDate) - - new MesosDriverDescription( - name, appResource, actualDriverMemory, actualDriverCores, actualSuperviseDriver, - command, request.sparkProperties, submissionId, submitDate) - } - - protected override def handleSubmit( - requestMessageJson: String, - requestMessage: SubmitRestProtocolMessage, - responseServlet: HttpServletResponse): SubmitRestProtocolResponse = { - requestMessage match { - case submitRequest: CreateSubmissionRequest => - val driverDescription = buildDriverDescription(submitRequest) - val s = scheduler.submitDriver(driverDescription) - s.serverSparkVersion = sparkVersion - val unknownFields = findUnknownFields(requestMessageJson, requestMessage) - if (unknownFields.nonEmpty) { - // If there are fields that the server does not know about, warn the client - s.unknownFields = unknownFields - } - s - case unexpected => - responseServlet.setStatus(HttpServletResponse.SC_BAD_REQUEST) - handleError(s"Received message of unexpected type ${unexpected.messageType}.") - } - } -} - -private[mesos] class MesosKillRequestServlet(scheduler: MesosClusterScheduler, conf: SparkConf) - extends KillRequestServlet { - protected override def handleKill(submissionId: String): KillSubmissionResponse = { - val k = scheduler.killDriver(submissionId) - k.serverSparkVersion = sparkVersion - k - } -} - -private[mesos] class MesosStatusRequestServlet(scheduler: MesosClusterScheduler, conf: SparkConf) - extends StatusRequestServlet { - protected override def handleStatus(submissionId: String): SubmissionStatusResponse = { - val d = scheduler.getDriverStatus(submissionId) - d.serverSparkVersion = sparkVersion - d - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala deleted file mode 100644 index 680cfb7..0000000 --- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.executor - -import java.nio.ByteBuffer - -import scala.collection.JavaConverters._ - -import org.apache.mesos.{Executor => MesosExecutor, ExecutorDriver, MesosExecutorDriver} -import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _} -import org.apache.mesos.protobuf.ByteString - -import org.apache.spark.{SparkConf, SparkEnv, TaskState} -import org.apache.spark.TaskState.TaskState -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.internal.Logging -import org.apache.spark.scheduler.cluster.mesos.MesosTaskLaunchData -import org.apache.spark.util.Utils - -private[spark] class MesosExecutorBackend - extends MesosExecutor - with ExecutorBackend - with Logging { - - var executor: Executor = null - var driver: ExecutorDriver = null - - override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { - val mesosTaskId = TaskID.newBuilder().setValue(taskId.toString).build() - driver.sendStatusUpdate(MesosTaskStatus.newBuilder() - .setTaskId(mesosTaskId) - .setState(TaskState.toMesos(state)) - .setData(ByteString.copyFrom(data)) - .build()) - } - - override def registered( - driver: ExecutorDriver, - executorInfo: ExecutorInfo, - frameworkInfo: FrameworkInfo, - slaveInfo: SlaveInfo) { - - // Get num cores for this task from ExecutorInfo, created in MesosSchedulerBackend. - val cpusPerTask = executorInfo.getResourcesList.asScala - .find(_.getName == "cpus") - .map(_.getScalar.getValue.toInt) - .getOrElse(0) - val executorId = executorInfo.getExecutorId.getValue - - logInfo(s"Registered with Mesos as executor ID $executorId with $cpusPerTask cpus") - this.driver = driver - // Set a context class loader to be picked up by the serializer. Without this call - // the serializer would default to the null class loader, and fail to find Spark classes - // See SPARK-10986. - Thread.currentThread().setContextClassLoader(this.getClass.getClassLoader) - - val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) ++ - Seq[(String, String)](("spark.app.id", frameworkInfo.getId.getValue)) - val conf = new SparkConf(loadDefaults = true).setAll(properties) - val port = conf.getInt("spark.executor.port", 0) - val env = SparkEnv.createExecutorEnv( - conf, executorId, slaveInfo.getHostname, port, cpusPerTask, isLocal = false) - - executor = new Executor( - executorId, - slaveInfo.getHostname, - env) - } - - override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) { - val taskId = taskInfo.getTaskId.getValue.toLong - val taskData = MesosTaskLaunchData.fromByteString(taskInfo.getData) - if (executor == null) { - logError("Received launchTask but executor was null") - } else { - SparkHadoopUtil.get.runAsSparkUser { () => - executor.launchTask(this, taskId = taskId, attemptNumber = taskData.attemptNumber, - taskInfo.getName, taskData.serializedTask) - } - } - } - - override def error(d: ExecutorDriver, message: String) { - logError("Error from Mesos: " + message) - } - - override def killTask(d: ExecutorDriver, t: TaskID) { - if (executor == null) { - logError("Received KillTask but executor was null") - } else { - // TODO: Determine the 'interruptOnCancel' property set for the given job. - executor.killTask(t.getValue.toLong, interruptThread = false) - } - } - - override def reregistered(d: ExecutorDriver, p2: SlaveInfo) {} - - override def disconnected(d: ExecutorDriver) {} - - override def frameworkMessage(d: ExecutorDriver, data: Array[Byte]) {} - - override def shutdown(d: ExecutorDriver) {} -} - -/** - * Entry point for Mesos executor. - */ -private[spark] object MesosExecutorBackend extends Logging { - def main(args: Array[String]) { - Utils.initDaemon(log) - // Create a new Executor and start it running - val runner = new MesosExecutorBackend() - new MesosExecutorDriver(runner).run() - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala deleted file mode 100644 index 61ab3e8..0000000 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster.mesos - -import scala.collection.JavaConverters._ - -import org.apache.curator.framework.CuratorFramework -import org.apache.zookeeper.CreateMode -import org.apache.zookeeper.KeeperException.NoNodeException - -import org.apache.spark.SparkConf -import org.apache.spark.deploy.SparkCuratorUtil -import org.apache.spark.internal.Logging -import org.apache.spark.util.Utils - -/** - * Persistence engine factory that is responsible for creating new persistence engines - * to store Mesos cluster mode state. - */ -private[spark] abstract class MesosClusterPersistenceEngineFactory(conf: SparkConf) { - def createEngine(path: String): MesosClusterPersistenceEngine -} - -/** - * Mesos cluster persistence engine is responsible for persisting Mesos cluster mode - * specific state, so that on failover all the state can be recovered and the scheduler - * can resume managing the drivers. - */ -private[spark] trait MesosClusterPersistenceEngine { - def persist(name: String, obj: Object): Unit - def expunge(name: String): Unit - def fetch[T](name: String): Option[T] - def fetchAll[T](): Iterable[T] -} - -/** - * Zookeeper backed persistence engine factory. - * All Zk engines created from this factory shares the same Zookeeper client, so - * all of them reuses the same connection pool. - */ -private[spark] class ZookeeperMesosClusterPersistenceEngineFactory(conf: SparkConf) - extends MesosClusterPersistenceEngineFactory(conf) with Logging { - - lazy val zk = SparkCuratorUtil.newClient(conf) - - def createEngine(path: String): MesosClusterPersistenceEngine = { - new ZookeeperMesosClusterPersistenceEngine(path, zk, conf) - } -} - -/** - * Black hole persistence engine factory that creates black hole - * persistence engines, which stores nothing. - */ -private[spark] class BlackHoleMesosClusterPersistenceEngineFactory - extends MesosClusterPersistenceEngineFactory(null) { - def createEngine(path: String): MesosClusterPersistenceEngine = { - new BlackHoleMesosClusterPersistenceEngine - } -} - -/** - * Black hole persistence engine that stores nothing. - */ -private[spark] class BlackHoleMesosClusterPersistenceEngine extends MesosClusterPersistenceEngine { - override def persist(name: String, obj: Object): Unit = {} - override def fetch[T](name: String): Option[T] = None - override def expunge(name: String): Unit = {} - override def fetchAll[T](): Iterable[T] = Iterable.empty[T] -} - -/** - * Zookeeper based Mesos cluster persistence engine, that stores cluster mode state - * into Zookeeper. Each engine object is operating under one folder in Zookeeper, but - * reuses a shared Zookeeper client. - */ -private[spark] class ZookeeperMesosClusterPersistenceEngine( - baseDir: String, - zk: CuratorFramework, - conf: SparkConf) - extends MesosClusterPersistenceEngine with Logging { - private val WORKING_DIR = - conf.get("spark.deploy.zookeeper.dir", "/spark_mesos_dispatcher") + "/" + baseDir - - SparkCuratorUtil.mkdir(zk, WORKING_DIR) - - def path(name: String): String = { - WORKING_DIR + "/" + name - } - - override def expunge(name: String): Unit = { - zk.delete().forPath(path(name)) - } - - override def persist(name: String, obj: Object): Unit = { - val serialized = Utils.serialize(obj) - val zkPath = path(name) - zk.create().withMode(CreateMode.PERSISTENT).forPath(zkPath, serialized) - } - - override def fetch[T](name: String): Option[T] = { - val zkPath = path(name) - - try { - val fileData = zk.getData().forPath(zkPath) - Some(Utils.deserialize[T](fileData)) - } catch { - case e: NoNodeException => None - case e: Exception => - logWarning("Exception while reading persisted file, deleting", e) - zk.delete().forPath(zkPath) - None - } - } - - override def fetchAll[T](): Iterable[T] = { - zk.getChildren.forPath(WORKING_DIR).asScala.flatMap(fetch[T]) - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
