http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/dev/mima ---------------------------------------------------------------------- diff --git a/dev/mima b/dev/mima index c355349..11c4af2 100755 --- a/dev/mima +++ b/dev/mima @@ -24,7 +24,7 @@ set -e FWDIR="$(cd "`dirname "$0"`"/..; pwd)" cd "$FWDIR" -SPARK_PROFILES="-Pyarn -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive" +SPARK_PROFILES="-Pmesos -Pyarn -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive" TOOLS_CLASSPATH="$(build/sbt -DcopyDependencies=false "export tools/fullClasspath" | tail -n1)" OLD_DEPS_CLASSPATH="$(build/sbt -DcopyDependencies=false $SPARK_PROFILES "export oldDeps/fullClasspath" | tail -n1)"
http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/dev/scalastyle ---------------------------------------------------------------------- diff --git a/dev/scalastyle b/dev/scalastyle index 8fd3604..f3dec83 100755 --- a/dev/scalastyle +++ b/dev/scalastyle @@ -22,6 +22,7 @@ ERRORS=$(echo -e "q\n" \ | build/sbt \ -Pkinesis-asl \ + -Pmesos \ -Pyarn \ -Phive \ -Phive-thriftserver \ http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/dev/sparktestsupport/modules.py ---------------------------------------------------------------------- diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index ce57257..f2aa241 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -458,6 +458,13 @@ yarn = Module( ] ) +mesos = Module( + name="mesos", + dependencies=[], + source_file_regexes=["mesos/"], + sbt_test_goals=["mesos/test"] +) + # The root module is a dummy module which is used to run all of the tests. # No other modules should directly depend on this module. root = Module( http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/dev/test-dependencies.sh ---------------------------------------------------------------------- diff --git a/dev/test-dependencies.sh b/dev/test-dependencies.sh index 28e3d4d..4014f42 100755 --- a/dev/test-dependencies.sh +++ b/dev/test-dependencies.sh @@ -29,7 +29,7 @@ export LC_ALL=C # TODO: This would be much nicer to do in SBT, once SBT supports Maven-style resolution. # NOTE: These should match those in the release publishing script -HADOOP2_MODULE_PROFILES="-Phive-thriftserver -Pyarn -Phive" +HADOOP2_MODULE_PROFILES="-Phive-thriftserver -Pmesos -Pyarn -Phive" MVN="build/mvn" HADOOP_PROFILES=( hadoop-2.2 http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/docs/building-spark.md ---------------------------------------------------------------------- diff --git a/docs/building-spark.md b/docs/building-spark.md index 2c987cf..6908fc1 100644 --- a/docs/building-spark.md +++ b/docs/building-spark.md @@ -50,7 +50,7 @@ To create a Spark distribution like those distributed by the to be runnable, use `./dev/make-distribution.sh` in the project root directory. It can be configured with Maven profile settings and so on like the direct Maven build. Example: - ./dev/make-distribution.sh --name custom-spark --tgz -Psparkr -Phadoop-2.4 -Phive -Phive-thriftserver -Pyarn + ./dev/make-distribution.sh --name custom-spark --tgz -Psparkr -Phadoop-2.4 -Phive -Phive-thriftserver -Pmesos -Pyarn For more information on usage, run `./dev/make-distribution.sh --help` @@ -105,13 +105,17 @@ By default Spark will build with Hive 1.2.1 bindings. ## Packaging without Hadoop Dependencies for YARN -The assembly directory produced by `mvn package` will, by default, include all of Spark's -dependencies, including Hadoop and some of its ecosystem projects. On YARN deployments, this -causes multiple versions of these to appear on executor classpaths: the version packaged in +The assembly directory produced by `mvn package` will, by default, include all of Spark's +dependencies, including Hadoop and some of its ecosystem projects. On YARN deployments, this +causes multiple versions of these to appear on executor classpaths: the version packaged in the Spark assembly and the version on each node, included with `yarn.application.classpath`. -The `hadoop-provided` profile builds the assembly without including Hadoop-ecosystem projects, +The `hadoop-provided` profile builds the assembly without including Hadoop-ecosystem projects, like ZooKeeper and Hadoop itself. +## Building with Mesos support + + ./build/mvn -Pmesos -DskipTests clean package + ## Building for Scala 2.10 To produce a Spark package compiled with Scala 2.10, use the `-Dscala-2.10` property: @@ -263,17 +267,17 @@ The run-tests script also can be limited to a specific Python version or a speci ## Running R Tests -To run the SparkR tests you will need to install the R package `testthat` -(run `install.packages(testthat)` from R shell). You can run just the SparkR tests using +To run the SparkR tests you will need to install the R package `testthat` +(run `install.packages(testthat)` from R shell). You can run just the SparkR tests using the command: ./R/run-tests.sh ## Running Docker-based Integration Test Suites -In order to run Docker integration tests, you have to install the `docker` engine on your box. -The instructions for installation can be found at [the Docker site](https://docs.docker.com/engine/installation/). -Once installed, the `docker` service needs to be started, if not already running. +In order to run Docker integration tests, you have to install the `docker` engine on your box. +The instructions for installation can be found at [the Docker site](https://docs.docker.com/engine/installation/). +Once installed, the `docker` service needs to be started, if not already running. On Linux, this can be done by `sudo service docker start`. ./build/mvn install -DskipTests http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/mesos/pom.xml ---------------------------------------------------------------------- diff --git a/mesos/pom.xml b/mesos/pom.xml new file mode 100644 index 0000000..57cc26a --- /dev/null +++ b/mesos/pom.xml @@ -0,0 +1,109 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.spark</groupId> + <artifactId>spark-parent_2.11</artifactId> + <version>2.1.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>spark-mesos_2.11</artifactId> + <packaging>jar</packaging> + <name>Spark Project Mesos</name> + <properties> + <sbt.project.name>mesos</sbt.project.name> + <mesos.version>1.0.0</mesos.version> + <mesos.classifier>shaded-protobuf</mesos.classifier> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.mesos</groupId> + <artifactId>mesos</artifactId> + <version>${mesos.version}</version> + <classifier>${mesos.classifier}</classifier> + <exclusions> + <exclusion> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> + + <!-- Explicitly depend on shaded dependencies from the parent, since shaded deps aren't transitive --> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-server</artifactId> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-plus</artifactId> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-util</artifactId> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-http</artifactId> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-servlet</artifactId> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-servlets</artifactId> + </dependency> + <!-- End of shaded deps. --> + + </dependencies> + + + <build> + <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> + <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/mesos/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager ---------------------------------------------------------------------- diff --git a/mesos/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager b/mesos/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager new file mode 100644 index 0000000..12b6d5b --- /dev/null +++ b/mesos/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager @@ -0,0 +1 @@ +org.apache.spark.scheduler.cluster.mesos.MesosClusterManager http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala ---------------------------------------------------------------------- diff --git a/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala b/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala new file mode 100644 index 0000000..73b6ca3 --- /dev/null +++ b/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.mesos + +import java.util.concurrent.CountDownLatch + +import org.apache.spark.{SecurityManager, SparkConf} +import org.apache.spark.deploy.mesos.ui.MesosClusterUI +import org.apache.spark.deploy.rest.mesos.MesosRestServer +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.cluster.mesos._ +import org.apache.spark.util.{ShutdownHookManager, Utils} + +/* + * A dispatcher that is responsible for managing and launching drivers, and is intended to be + * used for Mesos cluster mode. The dispatcher is a long-running process started by the user in + * the cluster independently of Spark applications. + * It contains a [[MesosRestServer]] that listens for requests to submit drivers and a + * [[MesosClusterScheduler]] that processes these requests by negotiating with the Mesos master + * for resources. + * + * A typical new driver lifecycle is the following: + * - Driver submitted via spark-submit talking to the [[MesosRestServer]] + * - [[MesosRestServer]] queues the driver request to [[MesosClusterScheduler]] + * - [[MesosClusterScheduler]] gets resource offers and launches the drivers that are in queue + * + * This dispatcher supports both Mesos fine-grain or coarse-grain mode as the mode is configurable + * per driver launched. + * This class is needed since Mesos doesn't manage frameworks, so the dispatcher acts as + * a daemon to launch drivers as Mesos frameworks upon request. The dispatcher is also started and + * stopped by sbin/start-mesos-dispatcher and sbin/stop-mesos-dispatcher respectively. + */ +private[mesos] class MesosClusterDispatcher( + args: MesosClusterDispatcherArguments, + conf: SparkConf) + extends Logging { + + private val publicAddress = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(args.host) + private val recoveryMode = conf.get("spark.deploy.recoveryMode", "NONE").toUpperCase() + logInfo("Recovery mode in Mesos dispatcher set to: " + recoveryMode) + + private val engineFactory = recoveryMode match { + case "NONE" => new BlackHoleMesosClusterPersistenceEngineFactory + case "ZOOKEEPER" => new ZookeeperMesosClusterPersistenceEngineFactory(conf) + case _ => throw new IllegalArgumentException("Unsupported recovery mode: " + recoveryMode) + } + + private val scheduler = new MesosClusterScheduler(engineFactory, conf) + + private val server = new MesosRestServer(args.host, args.port, conf, scheduler) + private val webUi = new MesosClusterUI( + new SecurityManager(conf), + args.webUiPort, + conf, + publicAddress, + scheduler) + + private val shutdownLatch = new CountDownLatch(1) + + def start(): Unit = { + webUi.bind() + scheduler.frameworkUrl = conf.get("spark.mesos.dispatcher.webui.url", webUi.activeWebUiUrl) + scheduler.start() + server.start() + } + + def awaitShutdown(): Unit = { + shutdownLatch.await() + } + + def stop(): Unit = { + webUi.stop() + server.stop() + scheduler.stop() + shutdownLatch.countDown() + } +} + +private[mesos] object MesosClusterDispatcher extends Logging { + def main(args: Array[String]) { + Utils.initDaemon(log) + val conf = new SparkConf + val dispatcherArgs = new MesosClusterDispatcherArguments(args, conf) + conf.setMaster(dispatcherArgs.masterUrl) + conf.setAppName(dispatcherArgs.name) + dispatcherArgs.zookeeperUrl.foreach { z => + conf.set("spark.deploy.recoveryMode", "ZOOKEEPER") + conf.set("spark.deploy.zookeeper.url", z) + } + val dispatcher = new MesosClusterDispatcher(dispatcherArgs, conf) + dispatcher.start() + logDebug("Adding shutdown hook") // force eager creation of logger + ShutdownHookManager.addShutdownHook { () => + logInfo("Shutdown hook is shutting down dispatcher") + dispatcher.stop() + dispatcher.awaitShutdown() + } + dispatcher.awaitShutdown() + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala ---------------------------------------------------------------------- diff --git a/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala b/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala new file mode 100644 index 0000000..11e1344 --- /dev/null +++ b/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.mesos + +import scala.annotation.tailrec + +import org.apache.spark.SparkConf +import org.apache.spark.util.{IntParam, Utils} + + +private[mesos] class MesosClusterDispatcherArguments(args: Array[String], conf: SparkConf) { + var host = Utils.localHostName() + var port = 7077 + var name = "Spark Cluster" + var webUiPort = 8081 + var masterUrl: String = _ + var zookeeperUrl: Option[String] = None + var propertiesFile: String = _ + + parse(args.toList) + + propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile) + + @tailrec + private def parse(args: List[String]): Unit = args match { + case ("--host" | "-h") :: value :: tail => + Utils.checkHost(value, "Please use hostname " + value) + host = value + parse(tail) + + case ("--port" | "-p") :: IntParam(value) :: tail => + port = value + parse(tail) + + case ("--webui-port") :: IntParam(value) :: tail => + webUiPort = value + parse(tail) + + case ("--zk" | "-z") :: value :: tail => + zookeeperUrl = Some(value) + parse(tail) + + case ("--master" | "-m") :: value :: tail => + if (!value.startsWith("mesos://")) { + // scalastyle:off println + System.err.println("Cluster dispatcher only supports mesos (uri begins with mesos://)") + // scalastyle:on println + System.exit(1) + } + masterUrl = value.stripPrefix("mesos://") + parse(tail) + + case ("--name") :: value :: tail => + name = value + parse(tail) + + case ("--properties-file") :: value :: tail => + propertiesFile = value + parse(tail) + + case ("--help") :: tail => + printUsageAndExit(0) + + case Nil => + if (masterUrl == null) { + // scalastyle:off println + System.err.println("--master is required") + // scalastyle:on println + printUsageAndExit(1) + } + + case _ => + printUsageAndExit(1) + } + + private def printUsageAndExit(exitCode: Int): Unit = { + // scalastyle:off println + System.err.println( + "Usage: MesosClusterDispatcher [options]\n" + + "\n" + + "Options:\n" + + " -h HOST, --host HOST Hostname to listen on\n" + + " -p PORT, --port PORT Port to listen on (default: 7077)\n" + + " --webui-port WEBUI_PORT WebUI Port to listen on (default: 8081)\n" + + " --name NAME Framework name to show in Mesos UI\n" + + " -m --master MASTER URI for connecting to Mesos master\n" + + " -z --zk ZOOKEEPER Comma delimited URLs for connecting to \n" + + " Zookeeper for persistence\n" + + " --properties-file FILE Path to a custom Spark properties file.\n" + + " Default is conf/spark-defaults.conf.") + // scalastyle:on println + System.exit(exitCode) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala ---------------------------------------------------------------------- diff --git a/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala b/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala new file mode 100644 index 0000000..d4c7022 --- /dev/null +++ b/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.mesos + +import java.util.Date + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.Command +import org.apache.spark.scheduler.cluster.mesos.MesosClusterRetryState + +/** + * Describes a Spark driver that is submitted from the + * [[org.apache.spark.deploy.rest.mesos.MesosRestServer]], to be launched by + * [[org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler]]. + * @param jarUrl URL to the application jar + * @param mem Amount of memory for the driver + * @param cores Number of cores for the driver + * @param supervise Supervise the driver for long running app + * @param command The command to launch the driver. + * @param schedulerProperties Extra properties to pass the Mesos scheduler + */ +private[spark] class MesosDriverDescription( + val name: String, + val jarUrl: String, + val mem: Int, + val cores: Double, + val supervise: Boolean, + val command: Command, + schedulerProperties: Map[String, String], + val submissionId: String, + val submissionDate: Date, + val retryState: Option[MesosClusterRetryState] = None) + extends Serializable { + + val conf = new SparkConf(false) + schedulerProperties.foreach {case (k, v) => conf.set(k, v)} + + def copy( + name: String = name, + jarUrl: String = jarUrl, + mem: Int = mem, + cores: Double = cores, + supervise: Boolean = supervise, + command: Command = command, + schedulerProperties: SparkConf = conf, + submissionId: String = submissionId, + submissionDate: Date = submissionDate, + retryState: Option[MesosClusterRetryState] = retryState): MesosDriverDescription = { + + new MesosDriverDescription(name, jarUrl, mem, cores, supervise, command, conf.getAll.toMap, + submissionId, submissionDate, retryState) + } + + override def toString: String = s"MesosDriverDescription (${command.mainClass})" +} http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala ---------------------------------------------------------------------- diff --git a/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala b/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala new file mode 100644 index 0000000..6b297c4 --- /dev/null +++ b/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.mesos + +import java.nio.ByteBuffer +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} + +import scala.collection.JavaConverters._ + +import org.apache.spark.{SecurityManager, SparkConf} +import org.apache.spark.deploy.ExternalShuffleService +import org.apache.spark.internal.Logging +import org.apache.spark.network.client.{RpcResponseCallback, TransportClient} +import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler +import org.apache.spark.network.shuffle.protocol.BlockTransferMessage +import org.apache.spark.network.shuffle.protocol.mesos.{RegisterDriver, ShuffleServiceHeartbeat} +import org.apache.spark.network.util.TransportConf +import org.apache.spark.util.ThreadUtils + +/** + * An RPC endpoint that receives registration requests from Spark drivers running on Mesos. + * It detects driver termination and calls the cleanup callback to [[ExternalShuffleService]]. + */ +private[mesos] class MesosExternalShuffleBlockHandler( + transportConf: TransportConf, + cleanerIntervalS: Long) + extends ExternalShuffleBlockHandler(transportConf, null) with Logging { + + ThreadUtils.newDaemonSingleThreadScheduledExecutor("shuffle-cleaner-watcher") + .scheduleAtFixedRate(new CleanerThread(), 0, cleanerIntervalS, TimeUnit.SECONDS) + + // Stores a map of app id to app state (timeout value and last heartbeat) + private val connectedApps = new ConcurrentHashMap[String, AppState]() + + protected override def handleMessage( + message: BlockTransferMessage, + client: TransportClient, + callback: RpcResponseCallback): Unit = { + message match { + case RegisterDriverParam(appId, appState) => + val address = client.getSocketAddress + val timeout = appState.heartbeatTimeout + logInfo(s"Received registration request from app $appId (remote address $address, " + + s"heartbeat timeout $timeout ms).") + if (connectedApps.containsKey(appId)) { + logWarning(s"Received a registration request from app $appId, but it was already " + + s"registered") + } + connectedApps.put(appId, appState) + callback.onSuccess(ByteBuffer.allocate(0)) + case Heartbeat(appId) => + val address = client.getSocketAddress + Option(connectedApps.get(appId)) match { + case Some(existingAppState) => + logTrace(s"Received ShuffleServiceHeartbeat from app '$appId' (remote " + + s"address $address).") + existingAppState.lastHeartbeat = System.nanoTime() + case None => + logWarning(s"Received ShuffleServiceHeartbeat from an unknown app (remote " + + s"address $address, appId '$appId').") + } + case _ => super.handleMessage(message, client, callback) + } + } + + /** An extractor object for matching [[RegisterDriver]] message. */ + private object RegisterDriverParam { + def unapply(r: RegisterDriver): Option[(String, AppState)] = + Some((r.getAppId, new AppState(r.getHeartbeatTimeoutMs, System.nanoTime()))) + } + + private object Heartbeat { + def unapply(h: ShuffleServiceHeartbeat): Option[String] = Some(h.getAppId) + } + + private class AppState(val heartbeatTimeout: Long, @volatile var lastHeartbeat: Long) + + private class CleanerThread extends Runnable { + override def run(): Unit = { + val now = System.nanoTime() + connectedApps.asScala.foreach { case (appId, appState) => + if (now - appState.lastHeartbeat > appState.heartbeatTimeout * 1000 * 1000) { + logInfo(s"Application $appId timed out. Removing shuffle files.") + connectedApps.remove(appId) + applicationRemoved(appId, true) + } + } + } + } +} + +/** + * A wrapper of [[ExternalShuffleService]] that provides an additional endpoint for drivers + * to associate with. This allows the shuffle service to detect when a driver is terminated + * and can clean up the associated shuffle files. + */ +private[mesos] class MesosExternalShuffleService(conf: SparkConf, securityManager: SecurityManager) + extends ExternalShuffleService(conf, securityManager) { + + protected override def newShuffleBlockHandler( + conf: TransportConf): ExternalShuffleBlockHandler = { + val cleanerIntervalS = this.conf.getTimeAsSeconds("spark.shuffle.cleaner.interval", "30s") + new MesosExternalShuffleBlockHandler(conf, cleanerIntervalS) + } +} + +private[spark] object MesosExternalShuffleService extends Logging { + + def main(args: Array[String]): Unit = { + ExternalShuffleService.main(args, + (conf: SparkConf, sm: SecurityManager) => new MesosExternalShuffleService(conf, sm)) + } +} + + http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala ---------------------------------------------------------------------- diff --git a/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala b/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala new file mode 100644 index 0000000..cd98110 --- /dev/null +++ b/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.mesos.ui + +import javax.servlet.http.HttpServletRequest + +import scala.xml.Node + +import org.apache.spark.deploy.Command +import org.apache.spark.deploy.mesos.MesosDriverDescription +import org.apache.spark.scheduler.cluster.mesos.{MesosClusterRetryState, MesosClusterSubmissionState} +import org.apache.spark.ui.{UIUtils, WebUIPage} + +private[ui] class DriverPage(parent: MesosClusterUI) extends WebUIPage("driver") { + + override def render(request: HttpServletRequest): Seq[Node] = { + val driverId = request.getParameter("id") + require(driverId != null && driverId.nonEmpty, "Missing id parameter") + + val state = parent.scheduler.getDriverState(driverId) + if (state.isEmpty) { + val content = + <div> + <p>Cannot find driver {driverId}</p> + </div> + return UIUtils.basicSparkPage(content, s"Details for Job $driverId") + } + val driverState = state.get + val driverHeaders = Seq("Driver property", "Value") + val schedulerHeaders = Seq("Scheduler property", "Value") + val commandEnvHeaders = Seq("Command environment variable", "Value") + val launchedHeaders = Seq("Launched property", "Value") + val commandHeaders = Seq("Command property", "Value") + val retryHeaders = Seq("Last failed status", "Next retry time", "Retry count") + val driverDescription = Iterable.apply(driverState.description) + val submissionState = Iterable.apply(driverState.submissionState) + val command = Iterable.apply(driverState.description.command) + val schedulerProperties = Iterable.apply(driverState.description.conf.getAll.toMap) + val commandEnv = Iterable.apply(driverState.description.command.environment) + val driverTable = + UIUtils.listingTable(driverHeaders, driverRow, driverDescription) + val commandTable = + UIUtils.listingTable(commandHeaders, commandRow, command) + val commandEnvTable = + UIUtils.listingTable(commandEnvHeaders, propertiesRow, commandEnv) + val schedulerTable = + UIUtils.listingTable(schedulerHeaders, propertiesRow, schedulerProperties) + val launchedTable = + UIUtils.listingTable(launchedHeaders, launchedRow, submissionState) + val retryTable = + UIUtils.listingTable( + retryHeaders, retryRow, Iterable.apply(driverState.description.retryState)) + val content = + <p>Driver state information for driver id {driverId}</p> + <a href={UIUtils.prependBaseUri("/")}>Back to Drivers</a> + <div class="row-fluid"> + <div class="span12"> + <h4>Driver state: {driverState.state}</h4> + <h4>Driver properties</h4> + {driverTable} + <h4>Driver command</h4> + {commandTable} + <h4>Driver command environment</h4> + {commandEnvTable} + <h4>Scheduler properties</h4> + {schedulerTable} + <h4>Launched state</h4> + {launchedTable} + <h4>Retry state</h4> + {retryTable} + </div> + </div>; + + UIUtils.basicSparkPage(content, s"Details for Job $driverId") + } + + private def launchedRow(submissionState: Option[MesosClusterSubmissionState]): Seq[Node] = { + submissionState.map { state => + <tr> + <td>Mesos Slave ID</td> + <td>{state.slaveId.getValue}</td> + </tr> + <tr> + <td>Mesos Task ID</td> + <td>{state.taskId.getValue}</td> + </tr> + <tr> + <td>Launch Time</td> + <td>{state.startDate}</td> + </tr> + <tr> + <td>Finish Time</td> + <td>{state.finishDate.map(_.toString).getOrElse("")}</td> + </tr> + <tr> + <td>Last Task Status</td> + <td>{state.mesosTaskStatus.map(_.toString).getOrElse("")}</td> + </tr> + }.getOrElse(Seq[Node]()) + } + + private def propertiesRow(properties: collection.Map[String, String]): Seq[Node] = { + properties.map { case (k, v) => + <tr> + <td>{k}</td><td>{v}</td> + </tr> + }.toSeq + } + + private def commandRow(command: Command): Seq[Node] = { + <tr> + <td>Main class</td><td>{command.mainClass}</td> + </tr> + <tr> + <td>Arguments</td><td>{command.arguments.mkString(" ")}</td> + </tr> + <tr> + <td>Class path entries</td><td>{command.classPathEntries.mkString(" ")}</td> + </tr> + <tr> + <td>Java options</td><td>{command.javaOpts.mkString((" "))}</td> + </tr> + <tr> + <td>Library path entries</td><td>{command.libraryPathEntries.mkString((" "))}</td> + </tr> + } + + private def driverRow(driver: MesosDriverDescription): Seq[Node] = { + <tr> + <td>Name</td><td>{driver.name}</td> + </tr> + <tr> + <td>Id</td><td>{driver.submissionId}</td> + </tr> + <tr> + <td>Cores</td><td>{driver.cores}</td> + </tr> + <tr> + <td>Memory</td><td>{driver.mem}</td> + </tr> + <tr> + <td>Submitted</td><td>{driver.submissionDate}</td> + </tr> + <tr> + <td>Supervise</td><td>{driver.supervise}</td> + </tr> + } + + private def retryRow(retryState: Option[MesosClusterRetryState]): Seq[Node] = { + retryState.map { state => + <tr> + <td> + {state.lastFailureStatus} + </td> + <td> + {state.nextRetry} + </td> + <td> + {state.retries} + </td> + </tr> + }.getOrElse(Seq[Node]()) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala ---------------------------------------------------------------------- diff --git a/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala b/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala new file mode 100644 index 0000000..8dcbdaa --- /dev/null +++ b/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.mesos.ui + +import javax.servlet.http.HttpServletRequest + +import scala.xml.Node + +import org.apache.mesos.Protos.TaskStatus + +import org.apache.spark.deploy.mesos.MesosDriverDescription +import org.apache.spark.scheduler.cluster.mesos.MesosClusterSubmissionState +import org.apache.spark.ui.{UIUtils, WebUIPage} + +private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage("") { + private val historyServerURL = parent.conf.getOption("spark.mesos.dispatcher.historyServer.url") + + def render(request: HttpServletRequest): Seq[Node] = { + val state = parent.scheduler.getSchedulerState() + + val driverHeader = Seq("Driver ID") + val historyHeader = historyServerURL.map(url => Seq("History")).getOrElse(Nil) + val submissionHeader = Seq("Submit Date", "Main Class", "Driver Resources") + + val queuedHeaders = driverHeader ++ submissionHeader + val driverHeaders = driverHeader ++ historyHeader ++ submissionHeader ++ + Seq("Start Date", "Mesos Slave ID", "State") + val retryHeaders = Seq("Driver ID", "Submit Date", "Description") ++ + Seq("Last Failed Status", "Next Retry Time", "Attempt Count") + val queuedTable = UIUtils.listingTable(queuedHeaders, queuedRow, state.queuedDrivers) + val launchedTable = UIUtils.listingTable(driverHeaders, driverRow, state.launchedDrivers) + val finishedTable = UIUtils.listingTable(driverHeaders, driverRow, state.finishedDrivers) + val retryTable = UIUtils.listingTable(retryHeaders, retryRow, state.pendingRetryDrivers) + val content = + <p>Mesos Framework ID: {state.frameworkId}</p> + <div class="row-fluid"> + <div class="span12"> + <h4>Queued Drivers:</h4> + {queuedTable} + <h4>Launched Drivers:</h4> + {launchedTable} + <h4>Finished Drivers:</h4> + {finishedTable} + <h4>Supervise drivers waiting for retry:</h4> + {retryTable} + </div> + </div>; + UIUtils.basicSparkPage(content, "Spark Drivers for Mesos cluster") + } + + private def queuedRow(submission: MesosDriverDescription): Seq[Node] = { + val id = submission.submissionId + <tr> + <td><a href={s"driver?id=$id"}>{id}</a></td> + <td>{submission.submissionDate}</td> + <td>{submission.command.mainClass}</td> + <td>cpus: {submission.cores}, mem: {submission.mem}</td> + </tr> + } + + private def driverRow(state: MesosClusterSubmissionState): Seq[Node] = { + val id = state.driverDescription.submissionId + + val historyCol = if (historyServerURL.isDefined) { + <td> + <a href={s"${historyServerURL.get}/history/${state.frameworkId}"}> + {state.frameworkId} + </a> + </td> + } else Nil + + <tr> + <td><a href={s"driver?id=$id"}>{id}</a></td> + {historyCol} + <td>{state.driverDescription.submissionDate}</td> + <td>{state.driverDescription.command.mainClass}</td> + <td>cpus: {state.driverDescription.cores}, mem: {state.driverDescription.mem}</td> + <td>{state.startDate}</td> + <td>{state.slaveId.getValue}</td> + <td>{stateString(state.mesosTaskStatus)}</td> + </tr> + } + + private def retryRow(submission: MesosDriverDescription): Seq[Node] = { + val id = submission.submissionId + <tr> + <td><a href={s"driver?id=$id"}>{id}</a></td> + <td>{submission.submissionDate}</td> + <td>{submission.command.mainClass}</td> + <td>{submission.retryState.get.lastFailureStatus}</td> + <td>{submission.retryState.get.nextRetry}</td> + <td>{submission.retryState.get.retries}</td> + </tr> + } + + private def stateString(status: Option[TaskStatus]): String = { + if (status.isEmpty) { + return "" + } + val sb = new StringBuilder + val s = status.get + sb.append(s"State: ${s.getState}") + if (status.get.hasMessage) { + sb.append(s", Message: ${s.getMessage}") + } + if (status.get.hasHealthy) { + sb.append(s", Healthy: ${s.getHealthy}") + } + if (status.get.hasSource) { + sb.append(s", Source: ${s.getSource}") + } + if (status.get.hasReason) { + sb.append(s", Reason: ${s.getReason}") + } + if (status.get.hasTimestamp) { + sb.append(s", Time: ${s.getTimestamp}") + } + sb.toString() + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala ---------------------------------------------------------------------- diff --git a/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala b/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala new file mode 100644 index 0000000..6049789 --- /dev/null +++ b/mesos/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.mesos.ui + +import org.apache.spark.{SecurityManager, SparkConf} +import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler +import org.apache.spark.ui.{SparkUI, WebUI} +import org.apache.spark.ui.JettyUtils._ + +/** + * UI that displays driver results from the [[org.apache.spark.deploy.mesos.MesosClusterDispatcher]] + */ +private[spark] class MesosClusterUI( + securityManager: SecurityManager, + port: Int, + val conf: SparkConf, + dispatcherPublicAddress: String, + val scheduler: MesosClusterScheduler) + extends WebUI(securityManager, securityManager.getSSLOptions("mesos"), port, conf) { + + initialize() + + def activeWebUiUrl: String = "http://" + dispatcherPublicAddress + ":" + boundPort + + override def initialize() { + attachPage(new MesosClusterPage(this)) + attachPage(new DriverPage(this)) + attachHandler(createStaticHandler(MesosClusterUI.STATIC_RESOURCE_DIR, "/static")) + } +} + +private object MesosClusterUI { + val STATIC_RESOURCE_DIR = SparkUI.STATIC_RESOURCE_DIR +} http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala ---------------------------------------------------------------------- diff --git a/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala b/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala new file mode 100644 index 0000000..3b96488 --- /dev/null +++ b/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.rest.mesos + +import java.io.File +import java.text.SimpleDateFormat +import java.util.Date +import java.util.concurrent.atomic.AtomicLong +import javax.servlet.http.HttpServletResponse + +import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf} +import org.apache.spark.deploy.Command +import org.apache.spark.deploy.mesos.MesosDriverDescription +import org.apache.spark.deploy.rest._ +import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler +import org.apache.spark.util.Utils + +/** + * A server that responds to requests submitted by the [[RestSubmissionClient]]. + * All requests are forwarded to + * [[org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler]]. + * This is intended to be used in Mesos cluster mode only. + * For more details about the REST submission please refer to [[RestSubmissionServer]] javadocs. + */ +private[spark] class MesosRestServer( + host: String, + requestedPort: Int, + masterConf: SparkConf, + scheduler: MesosClusterScheduler) + extends RestSubmissionServer(host, requestedPort, masterConf) { + + protected override val submitRequestServlet = + new MesosSubmitRequestServlet(scheduler, masterConf) + protected override val killRequestServlet = + new MesosKillRequestServlet(scheduler, masterConf) + protected override val statusRequestServlet = + new MesosStatusRequestServlet(scheduler, masterConf) +} + +private[mesos] class MesosSubmitRequestServlet( + scheduler: MesosClusterScheduler, + conf: SparkConf) + extends SubmitRequestServlet { + + private val DEFAULT_SUPERVISE = false + private val DEFAULT_MEMORY = Utils.DEFAULT_DRIVER_MEM_MB // mb + private val DEFAULT_CORES = 1.0 + + private val nextDriverNumber = new AtomicLong(0) + private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs + private def newDriverId(submitDate: Date): String = { + "driver-%s-%04d".format( + createDateFormat.format(submitDate), nextDriverNumber.incrementAndGet()) + } + + /** + * Build a driver description from the fields specified in the submit request. + * + * This involves constructing a command that launches a mesos framework for the job. + * This does not currently consider fields used by python applications since python + * is not supported in mesos cluster mode yet. + */ + private def buildDriverDescription(request: CreateSubmissionRequest): MesosDriverDescription = { + // Required fields, including the main class because python is not yet supported + val appResource = Option(request.appResource).getOrElse { + throw new SubmitRestMissingFieldException("Application jar is missing.") + } + val mainClass = Option(request.mainClass).getOrElse { + throw new SubmitRestMissingFieldException("Main class is missing.") + } + + // Optional fields + val sparkProperties = request.sparkProperties + val driverExtraJavaOptions = sparkProperties.get("spark.driver.extraJavaOptions") + val driverExtraClassPath = sparkProperties.get("spark.driver.extraClassPath") + val driverExtraLibraryPath = sparkProperties.get("spark.driver.extraLibraryPath") + val superviseDriver = sparkProperties.get("spark.driver.supervise") + val driverMemory = sparkProperties.get("spark.driver.memory") + val driverCores = sparkProperties.get("spark.driver.cores") + val appArgs = request.appArgs + val environmentVariables = request.environmentVariables + val name = request.sparkProperties.getOrElse("spark.app.name", mainClass) + + // Construct driver description + val conf = new SparkConf(false).setAll(sparkProperties) + val extraClassPath = driverExtraClassPath.toSeq.flatMap(_.split(File.pathSeparator)) + val extraLibraryPath = driverExtraLibraryPath.toSeq.flatMap(_.split(File.pathSeparator)) + val extraJavaOpts = driverExtraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty) + val sparkJavaOpts = Utils.sparkJavaOpts(conf) + val javaOpts = sparkJavaOpts ++ extraJavaOpts + val command = new Command( + mainClass, appArgs, environmentVariables, extraClassPath, extraLibraryPath, javaOpts) + val actualSuperviseDriver = superviseDriver.map(_.toBoolean).getOrElse(DEFAULT_SUPERVISE) + val actualDriverMemory = driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY) + val actualDriverCores = driverCores.map(_.toDouble).getOrElse(DEFAULT_CORES) + val submitDate = new Date() + val submissionId = newDriverId(submitDate) + + new MesosDriverDescription( + name, appResource, actualDriverMemory, actualDriverCores, actualSuperviseDriver, + command, request.sparkProperties, submissionId, submitDate) + } + + protected override def handleSubmit( + requestMessageJson: String, + requestMessage: SubmitRestProtocolMessage, + responseServlet: HttpServletResponse): SubmitRestProtocolResponse = { + requestMessage match { + case submitRequest: CreateSubmissionRequest => + val driverDescription = buildDriverDescription(submitRequest) + val s = scheduler.submitDriver(driverDescription) + s.serverSparkVersion = sparkVersion + val unknownFields = findUnknownFields(requestMessageJson, requestMessage) + if (unknownFields.nonEmpty) { + // If there are fields that the server does not know about, warn the client + s.unknownFields = unknownFields + } + s + case unexpected => + responseServlet.setStatus(HttpServletResponse.SC_BAD_REQUEST) + handleError(s"Received message of unexpected type ${unexpected.messageType}.") + } + } +} + +private[mesos] class MesosKillRequestServlet(scheduler: MesosClusterScheduler, conf: SparkConf) + extends KillRequestServlet { + protected override def handleKill(submissionId: String): KillSubmissionResponse = { + val k = scheduler.killDriver(submissionId) + k.serverSparkVersion = sparkVersion + k + } +} + +private[mesos] class MesosStatusRequestServlet(scheduler: MesosClusterScheduler, conf: SparkConf) + extends StatusRequestServlet { + protected override def handleStatus(submissionId: String): SubmissionStatusResponse = { + val d = scheduler.getDriverStatus(submissionId) + d.serverSparkVersion = sparkVersion + d + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala ---------------------------------------------------------------------- diff --git a/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala new file mode 100644 index 0000000..1937bd3 --- /dev/null +++ b/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.nio.ByteBuffer + +import scala.collection.JavaConverters._ + +import org.apache.mesos.{Executor => MesosExecutor, ExecutorDriver, MesosExecutorDriver} +import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _} +import org.apache.mesos.protobuf.ByteString + +import org.apache.spark.{SparkConf, SparkEnv, TaskState} +import org.apache.spark.TaskState +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.cluster.mesos.{MesosSchedulerUtils, MesosTaskLaunchData} +import org.apache.spark.util.Utils + +private[spark] class MesosExecutorBackend + extends MesosExecutor + with MesosSchedulerUtils // TODO: fix + with ExecutorBackend + with Logging { + + var executor: Executor = null + var driver: ExecutorDriver = null + + override def statusUpdate(taskId: Long, state: TaskState.TaskState, data: ByteBuffer) { + val mesosTaskId = TaskID.newBuilder().setValue(taskId.toString).build() + driver.sendStatusUpdate(MesosTaskStatus.newBuilder() + .setTaskId(mesosTaskId) + .setState(taskStateToMesos(state)) + .setData(ByteString.copyFrom(data)) + .build()) + } + + override def registered( + driver: ExecutorDriver, + executorInfo: ExecutorInfo, + frameworkInfo: FrameworkInfo, + slaveInfo: SlaveInfo) { + + // Get num cores for this task from ExecutorInfo, created in MesosSchedulerBackend. + val cpusPerTask = executorInfo.getResourcesList.asScala + .find(_.getName == "cpus") + .map(_.getScalar.getValue.toInt) + .getOrElse(0) + val executorId = executorInfo.getExecutorId.getValue + + logInfo(s"Registered with Mesos as executor ID $executorId with $cpusPerTask cpus") + this.driver = driver + // Set a context class loader to be picked up by the serializer. Without this call + // the serializer would default to the null class loader, and fail to find Spark classes + // See SPARK-10986. + Thread.currentThread().setContextClassLoader(this.getClass.getClassLoader) + + val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) ++ + Seq[(String, String)](("spark.app.id", frameworkInfo.getId.getValue)) + val conf = new SparkConf(loadDefaults = true).setAll(properties) + val port = conf.getInt("spark.executor.port", 0) + val env = SparkEnv.createExecutorEnv( + conf, executorId, slaveInfo.getHostname, port, cpusPerTask, isLocal = false) + + executor = new Executor( + executorId, + slaveInfo.getHostname, + env) + } + + override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) { + val taskId = taskInfo.getTaskId.getValue.toLong + val taskData = MesosTaskLaunchData.fromByteString(taskInfo.getData) + if (executor == null) { + logError("Received launchTask but executor was null") + } else { + SparkHadoopUtil.get.runAsSparkUser { () => + executor.launchTask(this, taskId = taskId, attemptNumber = taskData.attemptNumber, + taskInfo.getName, taskData.serializedTask) + } + } + } + + override def error(d: ExecutorDriver, message: String) { + logError("Error from Mesos: " + message) + } + + override def killTask(d: ExecutorDriver, t: TaskID) { + if (executor == null) { + logError("Received KillTask but executor was null") + } else { + // TODO: Determine the 'interruptOnCancel' property set for the given job. + executor.killTask(t.getValue.toLong, interruptThread = false) + } + } + + override def reregistered(d: ExecutorDriver, p2: SlaveInfo) {} + + override def disconnected(d: ExecutorDriver) {} + + override def frameworkMessage(d: ExecutorDriver, data: Array[Byte]) {} + + override def shutdown(d: ExecutorDriver) {} +} + +/** + * Entry point for Mesos executor. + */ +private[spark] object MesosExecutorBackend extends Logging { + def main(args: Array[String]) { + Utils.initDaemon(log) + // Create a new Executor and start it running + val runner = new MesosExecutorBackend() + new MesosExecutorDriver(runner).run() + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala ---------------------------------------------------------------------- diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala new file mode 100644 index 0000000..a849c4a --- /dev/null +++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl} + +/** + * Cluster Manager for creation of Yarn scheduler and backend + */ +private[spark] class MesosClusterManager extends ExternalClusterManager { + private val MESOS_REGEX = """mesos://(.*)""".r + + override def canCreate(masterURL: String): Boolean = { + masterURL.startsWith("mesos") + } + + override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = { + new TaskSchedulerImpl(sc) + } + + override def createSchedulerBackend(sc: SparkContext, + masterURL: String, + scheduler: TaskScheduler): SchedulerBackend = { + val mesosUrl = MESOS_REGEX.findFirstMatchIn(masterURL).get.group(1) + val coarse = sc.conf.getBoolean("spark.mesos.coarse", defaultValue = true) + if (coarse) { + new MesosCoarseGrainedSchedulerBackend( + scheduler.asInstanceOf[TaskSchedulerImpl], + sc, + mesosUrl, + sc.env.securityManager) + } else { + new MesosFineGrainedSchedulerBackend( + scheduler.asInstanceOf[TaskSchedulerImpl], + sc, + mesosUrl) + } + } + + override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = { + scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend) + } +} + http://git-wip-us.apache.org/repos/asf/spark/blob/8e5475be/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala ---------------------------------------------------------------------- diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala new file mode 100644 index 0000000..61ab3e8 --- /dev/null +++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import scala.collection.JavaConverters._ + +import org.apache.curator.framework.CuratorFramework +import org.apache.zookeeper.CreateMode +import org.apache.zookeeper.KeeperException.NoNodeException + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkCuratorUtil +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +/** + * Persistence engine factory that is responsible for creating new persistence engines + * to store Mesos cluster mode state. + */ +private[spark] abstract class MesosClusterPersistenceEngineFactory(conf: SparkConf) { + def createEngine(path: String): MesosClusterPersistenceEngine +} + +/** + * Mesos cluster persistence engine is responsible for persisting Mesos cluster mode + * specific state, so that on failover all the state can be recovered and the scheduler + * can resume managing the drivers. + */ +private[spark] trait MesosClusterPersistenceEngine { + def persist(name: String, obj: Object): Unit + def expunge(name: String): Unit + def fetch[T](name: String): Option[T] + def fetchAll[T](): Iterable[T] +} + +/** + * Zookeeper backed persistence engine factory. + * All Zk engines created from this factory shares the same Zookeeper client, so + * all of them reuses the same connection pool. + */ +private[spark] class ZookeeperMesosClusterPersistenceEngineFactory(conf: SparkConf) + extends MesosClusterPersistenceEngineFactory(conf) with Logging { + + lazy val zk = SparkCuratorUtil.newClient(conf) + + def createEngine(path: String): MesosClusterPersistenceEngine = { + new ZookeeperMesosClusterPersistenceEngine(path, zk, conf) + } +} + +/** + * Black hole persistence engine factory that creates black hole + * persistence engines, which stores nothing. + */ +private[spark] class BlackHoleMesosClusterPersistenceEngineFactory + extends MesosClusterPersistenceEngineFactory(null) { + def createEngine(path: String): MesosClusterPersistenceEngine = { + new BlackHoleMesosClusterPersistenceEngine + } +} + +/** + * Black hole persistence engine that stores nothing. + */ +private[spark] class BlackHoleMesosClusterPersistenceEngine extends MesosClusterPersistenceEngine { + override def persist(name: String, obj: Object): Unit = {} + override def fetch[T](name: String): Option[T] = None + override def expunge(name: String): Unit = {} + override def fetchAll[T](): Iterable[T] = Iterable.empty[T] +} + +/** + * Zookeeper based Mesos cluster persistence engine, that stores cluster mode state + * into Zookeeper. Each engine object is operating under one folder in Zookeeper, but + * reuses a shared Zookeeper client. + */ +private[spark] class ZookeeperMesosClusterPersistenceEngine( + baseDir: String, + zk: CuratorFramework, + conf: SparkConf) + extends MesosClusterPersistenceEngine with Logging { + private val WORKING_DIR = + conf.get("spark.deploy.zookeeper.dir", "/spark_mesos_dispatcher") + "/" + baseDir + + SparkCuratorUtil.mkdir(zk, WORKING_DIR) + + def path(name: String): String = { + WORKING_DIR + "/" + name + } + + override def expunge(name: String): Unit = { + zk.delete().forPath(path(name)) + } + + override def persist(name: String, obj: Object): Unit = { + val serialized = Utils.serialize(obj) + val zkPath = path(name) + zk.create().withMode(CreateMode.PERSISTENT).forPath(zkPath, serialized) + } + + override def fetch[T](name: String): Option[T] = { + val zkPath = path(name) + + try { + val fileData = zk.getData().forPath(zkPath) + Some(Utils.deserialize[T](fileData)) + } catch { + case e: NoNodeException => None + case e: Exception => + logWarning("Exception while reading persisted file, deleting", e) + zk.delete().forPath(zkPath) + None + } + } + + override def fetchAll[T](): Iterable[T] = { + zk.getChildren.forPath(WORKING_DIR).asScala.flatMap(fetch[T]) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
